# Parsing

Once we've "detected" bits, we need to find packets and parse them. This notebook walks through parsing a sample bluetooth packet and verifying that our parser works as intended.

But first... imports!

In [2]:
%matplotlib inline
import numpy as np
from scipy import signal
from numpy.fft import fft, fftfreq
import matplotlib.pyplot as plt
from pprint import pprint

import os,sys,inspect
currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
parentdir = os.path.dirname(currentdir)
sys.path.insert(0,parentdir)

from nmigen import Module, Memory

from alldigitalradio.parsing import Chunk, chunk, flip, num, Format, o, lsb_num
from alldigitalradio.io.numpy import make_callable
from onebitbt.parser import PacketParser
from serialcommander.printer import TextMemoryPrinter
from serialcommander.uart import UART

First, let's start with a sample packet, that I _think_ I pulled from https://github.com/JiaoXianjun/BTLE, but I can't find it anymore so it may have come from somewhere else

In [3]:
samples = """
...
0 1 0 1 0 1 0 1 0 1 1 0 1 0 1 1 0 1 1 1 1 1 0 1 ...
1 0 0 1 0 0 0 1 0 1 1 1 0 0 0 1 1 0 1 1 0 0 0 1 ...
1 1 0 0 0 0 1 1 0 1 1 1 0 0 1 1 1 1 0 0 1 1 0 0 ...
0 0 1 1 0 0 0 1 0 0 1 1 0 0 1 0 1 0 0 0 1 1 0 1 ...
0 0 0 0 0 1 0 0 1 1 1 0 1 1 1 0 0 0 0 0 1 1 0 0 ...
0 0 1 0 1 0 0 0 1 1 1 1 0 0 1 0 0 0 1 0 1 0 0 1 ...
0 0 1 0 1 1 1 0 0 0 0 0 0 1 1 1 1 0 0 0 0 1 1 1 ...
0 1 1 1 1 0 1 0 1 0 1 1 0 1 1 1 1 1 0 0 1 1 0 1 ...
0 0 0 1 1 1 1 0 1 1 0 0 0 0 1 1 0 0 1 1 1 1 0 1 
"""
bits = list(map(int, filter(lambda s : s != '...', samples.split())))

These bits are demodulated but scrambled (i.e. pseudorandomly flipped, a.k.a. "whitened").

In bluetooth, the dewhitening is done by XORing a packet against a sequence generated by a Linear Feedback Shift Register initialized to the channel number.

The way to tell if we're doing this correctly is that the resulting bits make any sense as a packet (which is explored below).

In [4]:
# Set up the Linear Feedback Shift Register to generate the (de)whitening string
def lfsr(init, length):
    shift = lambda s: [s[-1]] + s[0:3] + [s[3] ^ s[-1]] + s[4:6]

    # Initialize state to the init in binary bits with 6, with an additional 1 bit on the LSB
    state = [1] + num(init, bits=6, lsb=True)

    out = [0]*length
    for i in range(length):
        out[i] = state[-1]
        state = shift(state)
    return out

def whiten(bits, channel=37):
  # First 40 bits are not whitened
  w = lfsr(channel, len(bits) - 40)
  return bits[:40] + [bits[40 + i] ^ w[i] for i in range(len(bits) - 40)]

dwbits = whiten(bits)

Next we'll use the packet decoder utilities from `alldigitalradio` to pull out different parts of a test packet. 

Note that I've iterated back and forth between the format and the output to figure out the appropriate number of payloads (and their size) in this specific packet. In other words, this is parsed "by hand" by looking at the payload lengths as they're decoded.

In [5]:
format = Format(
    preamble=8, # Alternating bits for synchronization and detection
    access_address=32, # In an advertising packet this is always set to a specific address
    ble_packet=Format(
        pdu_type=4,
        rfu_0=2,
        tx_add=1,
        rx_add=1,
        length=8,
        payload=Format(
            device_addr=48, # The sending device
            payload_1=Format(
              length=8, 
              kind=8,
              payload=8),
            payload_2=Format(
              length=8,
              kind=8,
              payload=6*8)
        ),
    ),
    crc=24
)

read, parsed = format.parse(dwbits)
pprint(parsed.json(), compact=True)

OrderedDict([('preamble', [0, 1, 0, 1, 0, 1, 0, 1]),
             ('access_address',
              [0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1, 0, 0,
               0, 1, 0, 1, 1, 1, 0, 0, 0, 1]),
             ('ble_packet',
              OrderedDict([('pdu_type', [0, 0, 0, 0]), ('rfu_0', [0, 0]),
                           ('tx_add', [0]), ('rx_add', [0]),
                           ('length', [1, 0, 0, 0, 1, 0, 0, 0]),
                           ('payload',
                            OrderedDict([('device_addr',
                                          [1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 0, 0,
                                           1, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 1,
                                           1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 0,
                                           1, 0, 1, 1, 0, 0, 0, 0, 1, 0, 0,
                                           1]),
                                         ('payload_1',
                                          Order

Some quick notes on this packet:

1. The access address is the reserved "Advertising Address" that devices advertise to (but everyone listens on). In other BLE communication, this address is the intended recipient of a packet.
2. The pdu_type is 0, which stands for ADV_IND, a.k.a. general advertisement
3. The payload has a device addr, and then two payloads.
    - The first is a flags payload
    - The second is a partial list of 16 bit service UUIDs indicating what this device can do.

Note that advertising packets are pretty diverse. Sometimes they have names, sometimes they don't. Sometimes they list services, sometimes they don't.

Now let's check and verify the CRC

In [6]:
def crc(data):
    state = 0x555555
    for i in range(len(data)):
        ni = (0x1 & (state >> 23)) ^ data[i]
        state = (((state << 1) | ni) ^ ni*0b11001011010) & 0xFFFFFF
    return num(state, 24, lsb=True)

print(crc(parsed.ble_packet.bits()))
print(parsed.crc)

[0, 0, 0, 1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1]
[0, 0, 0, 1, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 0, 1, 1, 1]


They match!

# Validating the Hardware

Let's first validate that our parser gets so far as to match a CRC against this sample packet from above.

In [7]:
parser = PacketParser()
parser = make_callable(parser, inputs=[parser.bitstream, parser.sample], outputs=[parser.crc_matches])

# The parser assumes we've already found the preamble and (unwhitened) access address
# Also add an extra bit at the end because CRC matching is not instant, so give it a chance to match
i = 0
for b in bits[40:] + [0]:
    crc_matches = parser(b, 1)
    if crc_matches:
        print("CRC Matched after {} bits".format(i))
        break
    # The parser state machine expects that there is at least one clock cycle between samples
    parser(0, 0)
    i += 1

CRC Matched after 176 bits


Ok, now let's test a packet that advertises a name. I don't have a copy of the bits for this handy, so let's just make one...

In [8]:
packet = Chunk(
    preamble=[0, 1, 0, 1, 0, 1, 0, 1],
    access_address=flip(o(0x8e, 0x89, 0xbe, 0xd6)),

    ble_packet=Chunk(
      pdu_type=flip([0, 0, 1, 0]), # Connectible undirected advertising event (BLE advertising)
      rfu_0=[0]*2, # Reserved for future use
      tx_add=[0],
      rx_add=[0],
      length=[0]*8, # Filled in later

      payload=Chunk(
        device_addr=flip(o(0x90, 0xd7, 0xeb, 0xb1, 0x92, 0x99)),
        payload_1=chunk(
          length=flip(o(0x02)), 
          kind=flip(o(0x01)), # Flag
          data=flip(o(0x05))), # Limited discoverable, BR/EDR not supported
        payload_2=chunk(
            length=flip(o(1 + len("I LOVE MINDY"))),
            kind=flip(o(0x08)), # Short name
            name=o(*[ord(c) for c in "I LOVE MINDY"], lsb=True))),
    ),

    crc=[0]*24
)

packet.ble_packet.length = num(len(packet.ble_packet.payload.bits())//8, 8, msb=True)
packet.crc = crc(packet.bits()[8+32:-24])
bits = whiten(packet.bits())

Now let's hook up a printer to the parser, which prints out the name of any device found (if it advertises a name)

In [11]:
m = Module()
m.submodules.printer = printer = TextMemoryPrinter(Memory(width=8, depth=32), 32)
m.submodules.parser = parser = PacketParser(printer=printer)

parser = make_callable(m, inputs=[parser.bitstream, parser.sample, printer.tx_ack], outputs=[parser.crc_matches, printer.tx_data, printer.tx_rdy])

# Run things a couple times, to validate that the parser properly resets between packets
for _ in range(2):
    i = 0
    for b in bits[40:] + [0]*20:
        crc_matches, ts_data, tx_rdy = parser(b, 1, 0)
        if crc_matches:
            print("CRC Matched after {} bits".format(i))
            break
        # The parser state machine expects that there is at least one clock cycle between samples
        parser(0, 0)
        i += 1

    # Read out from the printer until we hit a newline
    while True:
        crc_matches, tx_data, tx_rdy = parser(0, 0, 0)
        if tx_rdy:
            char = chr(tx_data)
            print(char, end='')
            parser(0, 0, 1)
            if char == '\n':
                break
                
    # Let the FSM state machine reset itself
    for i in range(2):
        parser(0, 0, 0)

<ipython-input-11-bc77d793018f>:2: DriverConflict: Memory '$memory' is accessed from multiple fragments: top.parser, top.printer; hierarchy will be flattened
  m.submodules.printer = printer = TextMemoryPrinter(Memory(width=8, depth=32), 32)


CRC Matched after 224 bits
I LOVE MINDY
CRC Matched after 224 bits
I LOVE MINDY
