In [10]:
import struct
import bitstruct
from pcapng import FileScanner
from pcapng.blocks import EnhancedPacket

In [3]:
PACKET_HEADER_LENGTH = 42
SOURCE_OFFSET = 26
SOURCE_LENGTH = 4

ETHER_HEADER_LENGTH = 4
ETHER_BODY_OFFSET = 2
EVENT_STRIDE_BYTES = 12

CHECK_XY_RANGE = range(85, 95)
CHECK_QT_RANGE = range(125, 135)

In [4]:
def translate_msg_to_photon_list(
    msg: bytes,
) -> tuple[int, list[tuple[int, int, int]], int]:
    header_bytes, msg = (
        msg[:ETHER_HEADER_LENGTH],
        msg[ETHER_HEADER_LENGTH + ETHER_BODY_OFFSET :],
    )
    n_events, packet_id = struct.unpack("<HH", header_bytes)
    n_events //= 2

    events = []
    n_failed = 0

    # event_data = msg[0:EVENT_STRIDE_BYTES * n_events]
    # all_data = bitstruct.unpack("u16"*n_events*3, event_data)
    for i in range(n_events):
        xy_data = msg[
            EVENT_STRIDE_BYTES * i : EVENT_STRIDE_BYTES * i + EVENT_STRIDE_BYTES // 2
        ]
        qt_data = msg[
            EVENT_STRIDE_BYTES * i
            + EVENT_STRIDE_BYTES // 2 : EVENT_STRIDE_BYTES * (i + 1)
        ]
        x, y, check_xy = bitstruct.unpack("u16u16u16<", xy_data)
        q, t, check_qt = bitstruct.unpack("u16u16u16<", qt_data)
        # Sometimes xy and qt are swapped so we can check static bytes to see if they need to be swapped back

        if check_qt in CHECK_QT_RANGE:
            events.append((x, y, t))
        else:
            events.append((q, t, y))

        # It seems like the values have changed after a long downtime
        # TODO: figure out a robust way to do this check
        # if (check_xy in CHECK_XY_RANGE) and (check_qt in CHECK_QT_RANGE):
        #     events.append((x, y, t))
        # elif (check_xy in CHECK_QT_RANGE) and (check_qt in CHECK_XY_RANGE):
        #     events.append((q, t, y))
        # else:
        #     n_failed += 1

    return packet_id, events, n_failed

In [16]:
packet_path = r"C:\Users\SPEEM\Documents\speem\speem\scripts\correct packets.pcapng"

with open(packet_path, "rb") as f:
    file_scanner = FileScanner(f)
    for block in file_scanner:
        if isinstance(block, EnhancedPacket):
            # header =block.
            # msg = block.packet_data[]
            packet_id, events, n_failed = translate_msg_to_photon_list(
                block.packet_data
            )
            print(packet_id, n_failed)

: 