In [244]:
# pip install rich

import rich, math
from rich.console import Console
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, ClassVar
from functools import reduce

In [400]:
BLE_BANDWIDTH = 1000 * 1000 * 2 # b/s
BLE_BANDWIDTH_B_MS = BLE_BANDWIDTH / 8 / 1000 # B/ms
FIRST_PRIMES_BEFORE_200 = [2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199]

@dataclass
class Packet:
    mtu: int = 256 # B

    @property
    def toa(self):
        return self.mtu / BLE_BANDWIDTH_B_MS

PACKET = Packet()
TIME_PADDING = 1 # ms

SLOT_INFO = {
    "empty": {
        "direction": "n/a",
        "is_shared": False,
        "abbreviation": "E",
        "color": "white",
    },
    "data-down": {
        "direction": "down",
        "is_shared": True,
        "abbreviation": "D",
        "color": "green",
    },
    "data-up": {
        "direction": "up",
        "is_shared": False,
        "abbreviation": "U",
        "color": "yellow",
    },
    "beacon": {
        "direction": "down",
        "is_shared": True,
        "abbreviation": "B",
        "color": "red",
    },
    "join-request": {
        "direction": "up",
        "is_shared": True,
        "abbreviation": "J",
        "color": "purple",
    },
    "join-response": {
        "direction": "down",
        "is_shared": True,
        "abbreviation": "R",
        "color": "deep_pink3",
    },
}

@dataclass
class Slot:
    type: str
    start: Optional[float] = None

    def __post_init__(self):
        assert self.type in SLOT_INFO, f"Unknown slot type: {self.type}"

    @staticmethod
    def from_abbreviation(abbrev):
        for type, info in SLOT_INFO.items():
            if info["abbreviation"] == abbrev:
                return Slot(type)
        raise ValueError(f"Unknown slot abbreviation: {ord(abbrev)}")

    @property
    def direction(self):
        return SLOT_INFO[self.type]["direction"]

    @property
    def is_shared(self):
        return SLOT_INFO[self.type]["is_shared"]

    @property
    def duration(self):
        return PACKET.toa + TIME_PADDING

    @property
    def end(self):
        return self.start + self.duration

    def __str__(self):
        return f"{self.type} slot, from {self.start:.2f} to {self.end:.2f} ({PACKET.mtu} B Packet)"
    
    def __repr__(self):
        return self.__str__()

    def repr_nice(self):
        """
        Return a single character with rich-based background color (based on type).
        """
        abbrev = SLOT_INFO[self.type]['abbreviation']
        color = SLOT_INFO[self.type]["color"]
        # add invisible []'s because space doesn't work well with newlines
        hidden_left = rich.text.Text("[", style=f'bold {color} on {color}')
        hidden_right = rich.text.Text("]", style=f'bold {color} on {color}')
        chr = rich.text.Text(abbrev, style=f'bold white on {color}')
        return hidden_left + chr + hidden_right
        # return abbrev

class SlotFactory:
    def beacon(n=3):
        return [Slot("beacon") for _ in range(n)]

    def join_request(n):
        return [Slot("join-request") for _ in range(n)]

    def data_down(n):
        return [Slot("data-down") for _ in range(n)]

    def data_up(n):
        return [Slot("data-up") for _ in range(n)]

@dataclass
class Gateway:
    max_nodes: int = 80

GATEWAY = Gateway()

@dataclass
class Slotframe:
    slots: list[Slot]

    @staticmethod
    def from_nested(nested_slots):
        """Creates a Slotframe from nested slots, supporting both lists and single Slot objects."""
        # Use reduce to flatten nested structures automatically
        def flatten(acc, item):
            if isinstance(item, list):
                return acc + item
            return acc + [item]
        
        slots = reduce(flatten, nested_slots if isinstance(nested_slots, list) else [nested_slots], [])
        return Slotframe(slots)

    @staticmethod
    def build(abbreviations):
        """Creates a Slotframe from a string of abbreviations, e.g., 'BDDUJ' """
        slots = [Slot.from_abbreviation(abbrev) for abbrev in abbreviations if abbrev != " " and abbrev != "\n"]
        start = 0
        for slot in slots:
            slot.start = start
            start += slot.duration
        return Slotframe(slots)

    @staticmethod
    def build_blocks(assoc_slots, data_slots, repetitions=1):
        """Creates a Slotframe from a string of abbreviations, e.g., 'BDDUJ' """
        abbreviations = (assoc_slots + data_slots) * repetitions
        return Slotframe.build(abbreviations)

    def __str__(self):
        return f"Slotframe: {len(self.slots)} slots"

    @property
    def slot_duration(self):
        return self.slots[0].duration

    @property
    def duration(self):
        return len(self.slots) * self.slot_duration

    def count_slots_per_type(self, abbreviations=False):
        """Count the number of slots per type."""
        counts = {}
        for slot in self.slots:
            if slot.type not in counts:
                counts[slot.type] = 1
            else:
                counts[slot.type] += 1
        if abbreviations:
            return {SLOT_INFO[type]["abbreviation"]: count for type, count in counts.items()}
        return counts

    def max_nodes(self):
        """Return the maximum number of nodes that can be scheduled in the slotframe."""
        return self.count_slots_per_type().get("data-up", 0)

    def closest_prime_number(self):
        """Find the closest prime number to the number of slots."""
        return min(FIRST_PRIMES_BEFORE_200, key=lambda x: abs(x - len(self.slots)))
    
    def find_first_slot(self, type):
        """Find the first slot of a given type, including index."""
        return next(((i, slot) for i, slot in enumerate(self.slots) if slot.type == type), (None, None))
    
    def find_last_slot(self, type):
        """Find the last slot of a given type, including index."""
        return next(((j, slot) for j, slot in reversed(list(enumerate(self.slots))) if slot.type == type), (None, None))

    def show(self, otap_size=10_000):
        """Show the schedule of the slotframe. Reuse the repr_nice method of the Slot class."""
        console = Console()
        schedule = [slot.repr_nice() for slot in self.slots]
        console.print(f"Slotframe with {len(self.slots)} slots of {self.slot_duration:.2f} ms each (total {self.duration:.2f} ms):")
        console.print(*schedule, end="\n")
        console.print(f"""\
- Max nodes: {self.max_nodes()} (number of data-up slots)

- Number of slots per type: {self.count_slots_per_type()}
- Ratio of data-down to data-up slots: {round(self.count_slots_per_type()['data-down'] / self.count_slots_per_type()['data-up'], 2)}
- Closest prime number: {self.closest_prime_number()}
""")
        return self


# Slotframe.build(f"""\
# BJD
# {"DUUU" * 5}
# """).show()

Slotframe.build_blocks(assoc_slots="BJR", data_slots="DUUU", repetitions=3).show()

Slotframe(slots=[beacon slot, from 0.00 to 2.02 (256 B Packet), join-request slot, from 2.02 to 4.05 (256 B Packet), join-response slot, from 4.05 to 6.07 (256 B Packet), data-down slot, from 6.07 to 8.10 (256 B Packet), data-up slot, from 8.10 to 10.12 (256 B Packet), data-up slot, from 10.12 to 12.14 (256 B Packet), data-up slot, from 12.14 to 14.17 (256 B Packet), beacon slot, from 14.17 to 16.19 (256 B Packet), join-request slot, from 16.19 to 18.22 (256 B Packet), join-response slot, from 18.22 to 20.24 (256 B Packet), data-down slot, from 20.24 to 22.26 (256 B Packet), data-up slot, from 22.26 to 24.29 (256 B Packet), data-up slot, from 24.29 to 26.31 (256 B Packet), data-up slot, from 26.31 to 28.34 (256 B Packet), beacon slot, from 28.34 to 30.36 (256 B Packet), join-request slot, from 30.36 to 32.38 (256 B Packet), join-response slot, from 32.38 to 34.41 (256 B Packet), data-down slot, from 34.41 to 36.43 (256 B Packet), data-up slot, from 36.43 to 38.46 (256 B Packet), data-u

In [396]:
def otap_duration(sf, size=10_000):
    """Calculate the duration of an OTAP session."""
    swarmit_chunk_size = 128 # B
    packets = math.ceil(size / swarmit_chunk_size)
    raw_duration = round(packets * sf.slot_duration, 2)
    data_down_slots = sf.count_slots_per_type()["data-down"]
    needed_slotframes = math.ceil(packets / data_down_slots)
    duration = round(needed_slotframes * sf.duration, 2)
    console = Console()
    console.print(f"""\
OTAP update:

- Image size: {math.ceil(size / 1024)} kB
- Duration: {round(duration / 1000, 2)} s

- Packets: {packets}
- swarmit_chunk_size: {swarmit_chunk_size} B
- Needed slotframes: {needed_slotframes}
""")
    # return duration

In [360]:
def latency(sf):
    def worst_case_downlink():
        i, first_down = sf.find_first_slot("data-down")
        j, last_up = sf.find_last_slot("data-up")
        return round(last_up.end - first_down.start, 2)
    def worst_case_uplink():
        i, first_up = sf.find_first_slot("data-up")
        j, last_down = sf.find_last_slot("data-down")
        return round(last_down.end - first_up.start, 2)
    console = Console()
    console.print(f"""\
Latency:
                  
- Worst case downlink: {worst_case_downlink()} ms
- Worst case uplink: {worst_case_uplink()} ms
""")

In [401]:
sf = Slotframe.build_blocks(
    "BJR", # slots for association: beacon, join-request, join-response
    "DUUUUUUUU" * 4, # slots for data: one data-down, several data-up (repeat a few times)
    3, # repeat pattern 3 times within the slotframe, each Beacon slot uses a different BLE advertising channel
).show()

latency(sf)
otap_duration(sf, size=30_000)

In [409]:
sf = Slotframe.build_blocks(
    "BJR", # slots for association: beacon, join-request, join-response
    "DUUUUDUUUU" * 12, # slots for data: one data-down, several data-up (repeat a few times)
    1, # do not repeat pattern
).show()

latency(sf)
otap_duration(sf, size=30_000)