# Unit Testing for IPARO Evaluation System

## Add Necessary Imports
The `unittest` module from Python is a built-in library for unit testing that will ensure that the code works properly.

In [14]:
%run iparo.ipynb

example.com
Hello IPARO
2000-01-01 00:00:00
Number of operations IPNS performed:
{'get': 100, 'update': 100}
Number of operations IPFS performed:
{'hash': 100, 'store': 100, 'retrieve': 100}
Looking for node with sequence number: 56
Node found with contents: Node 56
Number of operations IPNS performed:
{'get': 1, 'update': 0}
Number of operations IPFS performed:
{'hash': 0, 'store': 0, 'retrieve': 44}
Number of operations IPNS performed:
{'get': 99, 'update': 100}
Number of operations IPFS performed:
{'hash': 100, 'store': 100, 'retrieve': 5049}
Looking for node with sequence number: 27
Node found with contents: Node 27
Number of operations IPNS performed:
{'get': 1, 'update': 0}
Number of operations IPFS performed:
{'hash': 0, 'store': 0, 'retrieve': 2}
Number of operations IPNS performed:
{'get': 100, 'update': 100}
Number of operations IPFS performed:
{'hash': 100, 'store': 100, 'retrieve': 199}
Looking for node with sequence number: 10
Node found with contents: Node 10
Number of op

In [15]:
import unittest
from datetime import timedelta
from math import ceil, log2, exp2

## Some Useful Test Constants and Helper Methods

In [16]:
time1 = datetime.now()
time2 = time1 + timedelta(seconds=1)
URL = "https://www.example.com"
URL1 = "https://www.example1.com"
URL2 = "https://www.example2.com"
CID1 = "abcdefg"
CID2 = "bcdefgh"
iparo1 = IPARO(content=b"123456", timestamp=time1, url=URL)
iparo2 = IPARO(content=b"1234567", timestamp=time2, url=URL)

In [17]:
def add_nodes(num_nodes: int) -> list[IPARO]:
    iparos = []
    for i in range(num_nodes):
        content = generate_random_content_string()
        iparo = IPARO(content=content, timestamp=time1 + timedelta(seconds=i), url=URL)
        iparos.append(iparo)
        cid = ipfs.store(iparo)
        ipfs.link(cid, SingleStrategy(), LinkStrategyParams(URL))
        ipns.update(URL, cid)
    return iparos

def generate_random_content_string() -> bytes:
    # Contains all printable characters in the original ASCII format, which are represented by codes from 32 to 126.
    contents = bytes([random.randint(32, 126) for _ in range(100)])
    return contents


def test_strategy(strategy) -> list[int]:
    lengths = []
    for i in range(100):
        content = generate_random_content_string()
        iparo = IPARO(content=content, timestamp=time1 + timedelta(seconds=i), url=URL)
        cid = ipfs.store(iparo)
        params = LinkStrategyParams(URL)
        link_col = ipfs.link(cid, strategy, params)
        ipns.update(URL, cid)
        lengths.append(len(link_col.links))
    return lengths


def test_strategy_verbose(strategy) -> tuple[list[int], list[str], list[IPARO]]:
    lengths = []
    cids = []
    iparos = []
    for i in range(100):
        content = generate_random_content_string()
        iparo = IPARO(content=content, timestamp=time1 + timedelta(seconds=i), url=URL)
        cid = ipfs.store(iparo)
        params = LinkStrategyParams(URL)
        link_col = ipfs.link(cid, strategy, params)
        ipns.update(URL, cid)
        iparos.append(iparo)
        cids.append(cid)
        lengths.append(len(link_col.links))
    return lengths, cids, iparos

## IPARO

In [18]:
class IPAROTest(unittest.TestCase):

    def test_has_content(self):
        content = iparo1.content
        self.assertIsInstance(content, bytes)
        self.assertEqual(content, b"123456")

    def test_has_timestamp(self):
        timestamp = iparo1.timestamp
        self.assertIsInstance(timestamp, datetime)
        self.assertEqual(timestamp, time1)

    def test_has_url(self):
        url = iparo1.url
        self.assertIsInstance(url, str)
        self.assertEqual(url, URL)

    def test_can_be_serialized(self):
        self.assertIsInstance(str(iparo1), str)

    def test_is_immutable(self):
        def f():
            iparo1.timestamp = datetime.now()

        self.assertRaises(Exception, f)


## IPNS

In [19]:
class IPNSTest(unittest.TestCase):
    iparo1 = IPARO(content=b"123456", timestamp=time1, url="https://www.example.com")

    # def test_can_store_versions(self):
    #     ipfs.store()
    def setUp(self):
        ipns.reset_data()
        ipns.reset_counts()

    def test_has_no_urls_initially(self):
        self.assertDictEqual(ipns.data, {})
        self.assertDictEqual(ipns.version_counts, {})

    def test_returns_none_accessing_invalid_url(self):
        ipns.update(URL, CID1)
        self.assertIsNone(ipns.get_cid(URL1))

    def test_counts_are_zero_initially(self):
        self.assertEqual(ipns.update_count, 0)
        self.assertEqual(ipns.get_count, 0)
        self.assertNotIn(URL, ipns.version_counts)

    def test_can_update_url(self):
        ipns.update(URL, CID1)
        self.assertDictEqual(ipns.data, {URL: CID1})
        self.assertEqual(ipns.update_count, 1)
        self.assertEqual(ipns.version_counts[URL], 1)

    def test_can_update_url_twice(self):
        ipns.update(URL, CID1)
        ipns.update(URL, CID2)
        self.assertDictEqual(ipns.data, {URL: CID2})
        self.assertEqual(ipns.update_count, 2)
        self.assertEqual(ipns.version_counts[URL], 2)

    def test_can_update_two_different_urls(self):
        ipns.update(URL1, CID2)
        ipns.update(URL2, CID1)
        self.assertDictEqual(ipns.data, {URL1: CID2, URL2: CID1})
        self.assertEqual(ipns.update_count, 2)
        self.assertEqual(ipns.version_counts[URL1], 1)
        self.assertEqual(ipns.version_counts[URL2], 1)

    def test_can_retrieve_correct_cid(self):
        ipns.update(URL1, CID1)
        ipns.update(URL2, CID2)
        cid = ipns.get_cid(URL1)
        self.assertEqual(cid, CID1)
        self.assertEqual(ipns.get_count, 1)

    def test_can_retrieve_correct_cid_twice(self):
        ipns.update(URL1, CID1)
        ipns.update(URL2, CID2)
        cid1 = ipns.get_cid(URL2)
        cid2 = ipns.get_cid(URL1)
        self.assertEqual(cid1, CID2)
        self.assertEqual(cid2, CID1)
        self.assertEqual(ipns.get_count, 2)

## IPFS

In [20]:
class IPFSTest(unittest.TestCase):

    def tearDown(self):
        ipns.reset_data()
        ipns.reset_counts()
        ipfs.reset_data()
        ipfs.reset_counts()

    def test_ipfs_counters_should_be_zero_initially(self):
        self.assertEqual(ipfs.hash_count, 0)
        self.assertEqual(ipfs.retrieve_count, 0)
        self.assertEqual(ipfs.store_count, 0)

    def test_ipfs_should_hash(self):
        cid = ipfs.hash(iparo1)
        self.assertEqual(ipfs.hash_count, 1)
        self.assertRegex(cid, "Qm.*")

    def test_ipfs_should_hash_twice(self):
        ipfs.hash(iparo1)
        ipfs.hash(iparo2)
        self.assertEqual(ipfs.hash_count, 2)

    def test_ipfs_should_initially_have_no_iparos(self):
        self.assertDictEqual(ipfs.data, {})

    def test_ipfs_should_store_one_iparo(self):
        cid = ipfs.store(iparo1)
        self.assertIn(cid, ipfs.data)

    def test_ipfs_hash_counts_should_update(self):
        ipfs.store(iparo1)
        self.assertEqual(ipfs.hash_count, 1)

    def test_ipfs_should_retrieve_iparos(self):
        cid = ipfs.store(iparo1)
        iparo = ipfs.retrieve(cid)
        self.assertEqual(iparo, iparo1)

    def test_ipfs_should_count_retrievals(self):
        cid = ipfs.store(iparo1)
        ipfs.retrieve(cid)
        self.assertEqual(ipfs.retrieve_count, 1)

    def test_ipfs_should_retrieve_iparos_twice(self):
        cid = ipfs.store(iparo1)
        cid2 = ipfs.store(iparo2)
        node1 = ipfs.retrieve(cid)
        node2 = ipfs.retrieve(cid2)
        self.assertEqual(node1, iparo1)
        self.assertEqual(node2, iparo2)

    # Note: The converse of the proposition implied by the test is not necessarily true.
    # That is, different IPAROs can have the same CIDs, but the same IPARO cannot have
    # two different CIDs. We can see this in more detail when we get there.
    def test_different_cids_should_yield_different_iparos(self):
        cid = ipfs.store(iparo1)
        cid2 = ipfs.store(iparo2)
        node1 = ipfs.retrieve(cid)
        node2 = ipfs.retrieve(cid2)
        self.assertNotEqual(node1, node2)

    def test_ipfs_should_update_retrieval_counts_twice(self):
        cid = ipfs.store(iparo1)
        cid2 = ipfs.store(iparo2)
        ipfs.retrieve(cid)
        ipfs.retrieve(cid2)
        self.assertEqual(ipfs.retrieve_count, 2)

    def test_ipfs_should_store_iparo_cids_with_iparo(self):
        cid = ipfs.store(iparo1)
        self.assertEqual(cid, ipfs.hash(iparo1))

    def test_ipfs_should_store_two_iparos(self):
        cid = ipfs.store(iparo1)
        cid2 = ipfs.store(iparo2)
        self.assertIn(cid, ipfs.data)
        self.assertIn(cid2, ipfs.data)

    def test_ipfs_hash_counts_should_update_twice(self):
        ipfs.store(iparo1)
        ipfs.store(iparo2)
        self.assertEqual(ipfs.hash_count, 2)

    def test_ipfs_should_store_two_iparos_correctly(self):
        cid = ipfs.store(iparo1)
        cid2 = ipfs.store(iparo2)
        self.assertEqual(cid, ipfs.hash(iparo1))
        self.assertEqual(cid2, ipfs.hash(iparo2))

    def test_ipfs_should_add_link_to_iparos(self):
        cid = ipfs.store(iparo1)
        ipns.update(URL, cid)
        cid2 = ipfs.store(iparo2)
        ipfs.link(cid2, SingleStrategy(), LinkStrategyParams(URL))
        cid2_links = ipfs.links[cid2].links
        self.assertIn(cid2, ipfs.links)
        self.assertEqual(len(cid2_links), 1)

    def test_ipfs_should_add_correct_link_to_iparos(self):
        cid = ipfs.store(iparo1)
        ipns.update(URL, cid)
        cid2 = ipfs.store(iparo2)
        cid2_links = ipfs.link(cid2, SingleStrategy(), LinkStrategyParams(URL)).links
        cid2_link = cid2_links[0]
        self.assertEqual(cid2_link.cid, cid)
        self.assertEqual(cid2_link.seq_num, 0)
        self.assertEqual(cid2_link.timestamp, iparo1.timestamp)

    def test_ipfs_versions_should_initially_be_empty(self):
        cids = ipfs.get_all_cids(URL)
        self.assertEqual(len(cids), 0)

    def test_ipfs_versions_should_update_on_one_insert(self):
        cid = ipfs.store(iparo1)
        ipns.update(URL, cid)
        cids = ipfs.get_all_cids(URL)
        self.assertEqual(len(cids), 1)

    def test_ipfs_versions_should_update_on_two_inserts(self):
        cid = ipfs.store(iparo1)
        ipns.update(URL, cid)
        cid2 = ipfs.store(iparo2)
        ipfs.link(cid2, SingleStrategy(), LinkStrategyParams(URL))
        ipns.update(URL, cid2)
        cids = ipfs.get_all_cids(URL)
        self.assertSetEqual(set(cids), {cid, cid2})

    def test_ipfs_should_retrieve_by_number(self):
        iparos = add_nodes(3)

        cid = ipfs.retrieve_by_number(URL, 0)
        iparo = ipfs.retrieve(cid)
        self.assertEqual(iparo, iparos[0])

    def test_ipfs_should_retrieve_latest_time_before_target_timestamp_by_default(self):
        iparos = add_nodes(100)

        cid = ipfs.retrieve_by_timestamp(URL, time1 + timedelta(seconds=54, milliseconds=999))
        iparo = ipfs.retrieve(cid)
        self.assertEqual(iparo, iparos[54])

    def test_ipfs_should_retrieve_no_iparo_if_no_iparo_is_inserted_into_ipfs(self):
        cids = {ipfs.retrieve_by_timestamp(URL, time1, Mode.CLOSEST),
                ipfs.retrieve_by_timestamp(URL, time1, Mode.EARLIEST_AFTER),
                ipfs.retrieve_by_timestamp(URL, time1)}

        self.assertSetEqual(cids, {None})

    def test_ipfs_should_retrieve_earliest_time_after_target_timestamp(self):
        iparos = add_nodes(100)

        cid = ipfs.retrieve_by_timestamp(URL, time1 + timedelta(seconds=54, milliseconds=1), Mode.EARLIEST_AFTER)
        iparo = ipfs.retrieve(cid)
        self.assertEqual(iparo, iparos[55])

    def test_ipfs_should_retrieve_target_timestamp_if_there_exists_a_cid_with_the_target_timestamp(self):
        iparos = add_nodes(100)

        cid = ipfs.retrieve_by_timestamp(URL, time1 + timedelta(seconds=54), Mode.EARLIEST_AFTER)
        cid2 = ipfs.retrieve_by_timestamp(URL, time1 + timedelta(seconds=54), Mode.CLOSEST)
        cid3 = ipfs.retrieve_by_timestamp(URL, time1 + timedelta(seconds=54))

        cids = {cid, cid2, cid3}
        iparo = ipfs.retrieve(cid)

        self.assertEqual(iparo, iparos[54])
        self.assertSetEqual(cids, {cid})

    def test_ipfs_should_retrieve_earlier_time_if_two_closest_timestamp_are_equally_distant(self):
        iparos = add_nodes(100)

        cid = ipfs.retrieve_by_timestamp(URL, time1 + timedelta(seconds=54, milliseconds=500), Mode.CLOSEST)
        iparo = ipfs.retrieve(cid)
        self.assertEqual(iparo, iparos[54])

    def test_ipfs_should_retrieve_earlier_time_if_closest_timestamp_is_earlier(self):
        iparos = add_nodes(100)

        cid = ipfs.retrieve_by_timestamp(URL, time1 + timedelta(seconds=54, milliseconds=499), Mode.CLOSEST)
        iparo = ipfs.retrieve(cid)
        self.assertEqual(iparo, iparos[54])

    def test_ipfs_should_retrieve_earlier_time_if_closest_timestamp_is_later(self):
        iparos = add_nodes(100)

        cid = ipfs.retrieve_by_timestamp(URL, time1 + timedelta(seconds=54, milliseconds=501), Mode.CLOSEST)
        iparo = ipfs.retrieve(cid)
        self.assertEqual(iparo, iparos[55])

    def test_ipfs_should_retrieve_earliest_time_after_timestamp(self):
        iparos = add_nodes(100)

        cid = ipfs.retrieve_by_timestamp(URL, time1 + timedelta(seconds=54, milliseconds=501), Mode.CLOSEST)
        iparo = ipfs.retrieve(cid)
        self.assertEqual(iparo, iparos[55])

### IPARO Strategies

In [49]:
class IPAROStrategyTest(unittest.TestCase):

    def setUp(self):
        ipns.reset_data()
        ipns.reset_counts()
        ipfs.reset_data()
        ipfs.reset_counts()

    def test_retrieval_strategy_must_work_for_first_index(self):
        single_strategy = SingleStrategy()
        lengths, cids, _ = test_strategy_verbose(single_strategy)
        retrieval_strategy = NumberRetrievalStrategy(URL, 0)
        cid = retrieval_strategy.retrieve()
        self.assertEqual(cid, cids[0])

    def test_retrieval_strategy_must_work_for_last_index(self):
        single_strategy = SingleStrategy()
        lengths, cids, _ = test_strategy_verbose(single_strategy)
        retrieval_strategy = NumberRetrievalStrategy(URL, 99)
        cid = retrieval_strategy.retrieve()
        self.assertEqual(cid, cids[99])

    def test_strategy_params_should_not_interfere_with_counts_early(self):
        LinkStrategyParams(URL)
        self.assertEqual(ipfs.retrieve_count, 0)
        self.assertEqual(ipns.get_count, 0)

    def test_strategy_params_should_cache_the_latest_cid_once_computed(self):
        params = LinkStrategyParams(URL)
        _ = params.latest_cid
        _ = params.latest_cid
        self.assertEqual(ipfs.retrieve_count, 0)
        self.assertEqual(ipns.get_count, 1)

    def test_strategy_params_should_cache_the_latest_node_once_computed(self):
        params = LinkStrategyParams(URL)
        _ = params.latest_node
        _ = params.latest_node
        self.assertEqual(ipfs.retrieve_count, 1)
        self.assertEqual(ipns.get_count, 1)

    def test_strategy_params_should_cache_the_latest_node_links_once_computed(self):
        params = LinkStrategyParams(URL)
        _ = params.latest_node_links
        _ = params.latest_node_links
        self.assertEqual(ipfs.retrieve_count, 1)
        self.assertEqual(ipns.get_count, 1)

    def test_strategy_params_should_cache_the_link_to_latest_node_once_computed(self):
        params = LinkStrategyParams(URL)
        _ = params.link
        _ = params.link
        self.assertEqual(ipfs.retrieve_count, 1)
        self.assertEqual(ipns.get_count, 1)

    def test_links_should_be_of_type_iparo_link_collection(self):
        content = generate_random_content_string()
        iparo = IPARO(content=content, timestamp=time1, url=URL)
        cid = ipfs.store(iparo)
        links = ipfs.link(cid, ComprehensiveStrategy(), LinkStrategyParams(URL))
        self.assertIsInstance(links, IPAROLinkCollection)

    def test_single_strategy_should_link_to_only_one_node(self):
        lengths = test_strategy(SingleStrategy())
        expected_lengths = [min(i, 1) for i in range(100)]
        self.assertListEqual(lengths, expected_lengths)

    def test_comprehensive_strategy_should_link_to_all_previous_nodes(self):
        lengths = test_strategy(ComprehensiveStrategy())
        expected_lengths = list(range(100))

        self.assertListEqual(lengths, expected_lengths)

    def test_comprehensive_strategy_should_link_to_the_right_nodes(self):
        lengths, cids, iparos = test_strategy_verbose(ComprehensiveStrategy())
        linked_cids = {link.cid for link in ipfs.retrieve_links(cids[99]).links}
        self.assertSetEqual(linked_cids, set(cids[:99]))

    def test_previous_should_link_to_at_most_two_nodes(self):
        lengths, cids, iparos = test_strategy_verbose(PreviousStrategy())

        expected_lengths = [min(i, 2) for i in range(100)]
        self.assertListEqual(lengths, expected_lengths)

    def test_five_previous_should_link_to_at_most_six_nodes(self):
        lengths = test_strategy(KPreviousStrategy(k=5))
        expected_lengths = [min(i, 6) for i in range(100)]

        self.assertListEqual(lengths, expected_lengths)

    def test_random_strategy_should_respect_limits(self):
        lengths = test_strategy(KRandomStrategy(k_min=5, k_max=10))

        self.assertLessEqual(max(lengths), 11)
        self.assertLessEqual(min(lengths[1:]), 1)

    def test_exponential_strategy_should_have_logarithmic_number_of_links(self):
        strategy = SequentialExponentialStrategy(k=2)
        lengths = test_strategy(strategy)
        expected_lengths = [0]
        expected_lengths.extend([1 + ceil(log2(i)) for i in range(1, 100)])

        self.assertListEqual(lengths, expected_lengths)

    def test_exponential_strategy_should_have_the_right_nodes(self):
        strategy = SequentialExponentialStrategy(k=2)

        def get_sequence_numbers(k: int):
            if k == 0:
                return []
            numbers = {0}
            i = 0
            s = k - 1
            while s >= 0:
                numbers.add(int(s))
                i += 1
                s = k - exp2(i)

            return sorted(numbers)

        sequence_numbers = []
        expected_sequence_numbers = [get_sequence_numbers(i) for i in range(100)]
        for i in range(100):
            content = generate_random_content_string()
            iparo = IPARO(content=content, timestamp=time1 + timedelta(seconds=i), url=URL)
            cid = ipfs.store(iparo)
            params = LinkStrategyParams(URL)
            links = ipfs.link(cid, strategy, params)
            current_numbers = sorted(link.seq_num for link in links.links)
            sequence_numbers.append(current_numbers)

            ipns.update(URL, cid)

        self.assertListEqual(sequence_numbers, expected_sequence_numbers)


    def test_sequential_uniform_strategy_should_have_at_most_n_links(self):
        strategy = SequentialUniformNPriorStrategy(n=10)
        lengths = test_strategy(strategy)
        expected_lengths = [min(i, 11) for i in range(100)]

        self.assertListEqual(lengths, expected_lengths)

    def test_temporal_uniform_strategy_should_split_into_five_roughly_equal_time_intervals(self):
        expected_cids = []

        # Add IPAROs in a linear-wise fashion
        for i in range(5):
            for j in range(i+1):
                    
                content = generate_random_content_string()
                iparo = IPARO(content=content, timestamp=time1 + timedelta(seconds=i, milliseconds=j), url=URL)
                cid = ipfs.store(iparo)
                if j == 0:
                    expected_cids.append(cid)
                params = LinkStrategyParams(URL)
                link_col = ipfs.link(cid, SingleStrategy(), params)
                ipns.update(URL, cid)

        # Add the latest node with 5 seconds time.
        content = generate_random_content_string()
        iparo = IPARO(content=, timestamp=time1 + timedelta(seconds=5), url=URL)
        cid = ipfs.store(iparo)
        params = LinkStrategyParams(URL)
        link_col = ipfs.link(cid, SingleStrategy(), params)
        ipns.update(URL, cid)
        expected_cids.append(cid)
        
        expected_cids = sorted(expected_cids)

        # Add the latest IPARO using a temporally uniform linking strategy, k=5.
        content = generate_random_content_string()
        iparo = IPARO(content=content, timestamp=time1 + timedelta(seconds=6), url=URL)
        cid = ipfs.store(iparo)
        params = LinkStrategyParams(URL)
        link_col = ipfs.link(cid, TemporallyUniformStrategy(4), params)
        ipns.update(URL, cid)
        cids = sorted(link.cid for link in link_col.links)

        self.assertListEqual(cids, expected_cids)

## Master Test

In [50]:
loader = unittest.TestLoader()
suite = unittest.TestSuite()
suite.addTests(loader.loadTestsFromTestCase(IPAROTest))
suite.addTests(loader.loadTestsFromTestCase(IPNSTest))
suite.addTests(loader.loadTestsFromTestCase(IPFSTest))
suite.addTests(loader.loadTestsFromTestCase(IPAROStrategyTest))

runner = unittest.TextTestRunner(verbosity=3)
result = runner.run(suite)

test_can_be_serialized (__main__.IPAROTest.test_can_be_serialized) ... ok
test_has_content (__main__.IPAROTest.test_has_content) ... ok
test_has_timestamp (__main__.IPAROTest.test_has_timestamp) ... ok
test_has_url (__main__.IPAROTest.test_has_url) ... ok
test_is_immutable (__main__.IPAROTest.test_is_immutable) ... ok
test_can_retrieve_correct_cid (__main__.IPNSTest.test_can_retrieve_correct_cid) ... ok
test_can_retrieve_correct_cid_twice (__main__.IPNSTest.test_can_retrieve_correct_cid_twice) ... ok
test_can_update_two_different_urls (__main__.IPNSTest.test_can_update_two_different_urls) ... ok
test_can_update_url (__main__.IPNSTest.test_can_update_url) ... ok
test_can_update_url_twice (__main__.IPNSTest.test_can_update_url_twice) ... ok
test_counts_are_zero_initially (__main__.IPNSTest.test_counts_are_zero_initially) ... ok
test_has_no_urls_initially (__main__.IPNSTest.test_has_no_urls_initially) ... ok
test_returns_none_accessing_invalid_url (__main__.IPNSTest.test_returns_none_acce