<a href="https://colab.research.google.com/github/memelordmaddy/DevCom/blob/main/DevComPS2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Importing necessary libraries

In [1]:
import random
import datetime
import uuid

Device class definition, I've added additional documentation for my convenience

In [2]:
# List of keys for generating dummy data
_DATA_KEYS = ["a", "b", "c"]

class Device:
    def __init__(self, id):
        """
        Initializes a device object with the provided ID.

        Args:
            id (str): The unique identifier for the device.
        """
        self._id = id
        self.records = []  # List to store received data
        self.sent = []     # List to store data sent by this device

    def obtainData(self) -> dict:
        """
        Returns a single new data point (record) from the device.

        Identified by type `record`. `timestamp` records when the record was sent, `dev_id` is the device id.
        `data` is the data collected by the device, consisting of dummy values for keys defined in _DATA_KEYS.

        Returns:
            dict: A dictionary representing a data point.
        """
        if random.random() < 0.4:
           # Sometimes there's no new data
            return {}

        # Generate dummy data for each key
        rec = {
            'type': 'record',
            'timestamp': datetime.datetime.now().isoformat(),
            'dev_id': self._id,
            'data': {kee: str(uuid.uuid4()) for kee in _DATA_KEYS}
        }
        # Append the generated record to the list of data sent by this device
        self.sent.append(rec)
        return rec

    def probe(self) -> dict:
        """
        Returns a probe request to be sent to the SyncService.

        Identified by type `probe`. `from` is the index number from which the device is asking for the data.

        Returns:
            dict: A dictionary representing a probe request.
        """
        if random.random() < 0.5:
            # Sometimes the device forgets to probe the SyncService
            return {}

        # Generate and return a probe request with the device ID and the index from which data is requested
        return {'type': 'probe', 'dev_id': self._id, 'from': len(self.records)}

    def onMessage(self, data: dict):
        """
        Receives updates from the server and updates the device records.

        Args:
            data (dict): A dictionary containing the update message received from the server.
        """
        if random.random() < 0.6:
            # Sometimes devices make mistakes. Let's hope the SyncService handles such failures.
            return

        if data['type'] == 'update':
            # Extract the index from which data is received
            _from = data['from']
            if _from > len(self.records):
                # If the index is greater than the current number of records, eat five star and do nothing
                return
            self.records = self.records[:_from] + data['data']

Modified the syncserive and on message function, i've also added two extra functions. Incase it take time to show results change the number of iterations to 100.

In [5]:
class SyncService:
    def __init__(self):
        # Initialize the necessary attributes and data structures here
        self.aggregated_data = []

    def onMessage(self, data: dict):
        """
        Handle messages received from devices.
        Return the desired information in the correct format (type `update`)
        in response to a `probe`. No return value required on handling a `record`.
        """
        message_type = data.get('type')

        if message_type == 'record':
            # aggregate the data
            self.aggregateData(data)
            return {}

        elif message_type == 'probe':
            # Respond to probe requests with aggregated data
            return self.Update(data)

        elif message_type == 'update':
            # Update the data
            return {}

        else:
            # return an emoty dictionary
            return {}

    def aggregateData(self, data: dict): # function to aggregrate data
        self.aggregated_data.append(data)

    def Update(self, probe_request: dict) -> dict: # function to update data
        from_index = probe_request.get('from', 0)
        update_message = {
            'type': 'update',
            'from': from_index,
            'data': self.aggregated_data[from_index:]
        }

        return update_message

def testSyncing():
    devices = [Device(f"dev_{i}") for i in range(10)]
    syn = SyncService()

    _N = int(1e6) # try changing to 100 to get results quicker
    for i in range(_N):
        for _dev in devices:
            data = _dev.obtainData()
            if data:
                syn.onMessage(data)

            probe_request = _dev.probe()
            if probe_request:
                syn.onMessage(probe_request)

    done = False
    while not done:
        for _dev in devices:
            probe_request = _dev.probe()
            if probe_request:
                syn.onMessage(probe_request)
        num_recs = len(devices[0].records)
        done = all([len(_dev.records) == num_recs for _dev in devices])

    # added code to give the results as text
    print("Aggregated data:")
    for data in syn.aggregated_data:
        print(data)

    ver_start = [0] * len(devices)
    for i, rec in enumerate(devices[0].records):
        _dev_idx = int(rec['dev_id'].split("_")[-1])
        assertEquivalent(rec, devices[_dev_idx].sent[ver_start[_dev_idx]])
        for _dev in devices[1:]:
            assertEquivalent(rec, _dev.records[i])
        ver_start[_dev_idx] += 1

    print("\nrealised 10^6 is huge.")

testSyncing()

def assertEquivalent(d1: dict, d2: dict):
    assert d1['dev_id'] == d2['dev_id']
    assert d1['timestamp'] == d2['timestamp']
    for kee in _DATA_KEYS:
        assert d1['data'][kee] == d2['data'][kee]

testSyncing()

Aggregated data:
{'type': 'record', 'timestamp': '2024-01-28T18:16:35.373308', 'dev_id': 'dev_1', 'data': {'a': 'c3f7eaef-9ae5-4346-b256-616e0a2a1937', 'b': 'c8bc6f12-1c22-45e1-9f2c-208495b8997a', 'c': '5514711d-5a08-4684-9fba-f6a7dafa0347'}}
{'type': 'record', 'timestamp': '2024-01-28T18:16:35.373374', 'dev_id': 'dev_2', 'data': {'a': 'c2a25dde-281e-4d41-9503-735b3d297a96', 'b': '88f57b4c-33eb-4628-946f-a4a85e6756f2', 'c': '16e6b799-5255-43ef-9c88-8003a900748d'}}
{'type': 'record', 'timestamp': '2024-01-28T18:16:35.373410', 'dev_id': 'dev_3', 'data': {'a': 'a795bae5-e7b4-4b37-a13f-5316d613f896', 'b': '49599ec8-1c1a-40cd-9690-f6f0bc061226', 'c': '9daf28ef-f372-4d76-a248-404413959d05'}}
{'type': 'record', 'timestamp': '2024-01-28T18:16:35.373746', 'dev_id': 'dev_4', 'data': {'a': 'e6f1d101-aa16-436d-9634-043b56abb55d', 'b': '16d552cd-f45c-4bed-8ec2-68b938714025', 'c': 'e2510fa6-9b3c-475a-a354-e413947e44f3'}}
{'type': 'record', 'timestamp': '2024-01-28T18:16:35.373801', 'dev_id': 'dev_5'