In [1]:
import uhd
import numpy as np
from commpy import filters
import matplotlib.pyplot as plt

usrp = uhd.usrp.MultiUSRP()

[INFO] [UHD] linux; GNU C++ version 11.2.0; Boost_107400; UHD_4.1.0.5-3
[INFO] [B200] Detected Device: B210
[INFO] [B200] Operating over USB 3.
[INFO] [B200] Initialize CODEC control...
[INFO] [B200] Initialize Radio control...
[INFO] [B200] Performing register loopback test... 
[INFO] [B200] Register loopback test passed
[INFO] [B200] Performing register loopback test... 
[INFO] [B200] Register loopback test passed
[INFO] [B200] Setting master clock rate selection to 'automatic'.
[INFO] [B200] Asking for clock rate 16.000000 MHz... 
[INFO] [B200] Actually got clock rate 16.000000 MHz.


In [33]:
# Uncomment and run this before closing the notebook
# del usrp

In [6]:
# source: https://stackoverflow.com/a/55850496
def crc16(data : bytearray, poly=0x8005):
    crc = 0xFFFF
    for i in range(len(data)):
        crc ^= data[i] << 8
        for _ in range(0,8):
            if (crc & 0x8000) > 0:
                crc = (crc << 1) ^ poly
            else:
                crc = crc << 1
    return crc & 0xFFFF

In [7]:
class ModulatorDemodulator:
    def __init__(self, Fsamp: float, Fsym: float = None, sps: float = None,
                 Tsym: float = None) -> None:
        """Initialize the modulator."""
        self.Fsamp = Fsamp

        if Fsym is not None:
            self.Fsym = Fsym
            self.sps = Fsamp/Fsym
            self.Tsym = 1/Fsym
        elif sps is not None:
            self.sps = sps
            self.Fsym = Fsamp/sps
            self.Tsym = 1/self.Fsym
        elif Tsym is not None:
            self.Tsym = Tsym
            self.Fsym = 1/Tsym
            self.sps = Fsamp/Tsym

        if np.abs(self.sps - round(self.sps)) > 1e-5:
            print(f"Warning: sps={self.sps} is not an integer!")
        else:
            self.sps = int(self.sps)

        self.mod_order = 0
        self.mod_index = None

        self.filter_t = None
        self.filter_h = None

        self.N = 0

    def apply_filter(self, Nsamp: int = None, Nsps: int = 17, alpha: float = 0.35
                     ) -> None:
        """Applies a root-raised-cosine filter to the samples.

        This filter is useful for matched filtering.

        Args:
            N (int): The length of the filter, in samples. By default, this is
                set to `int(sps * Nsps + 1)`.
            Nsps (int): The length of the filter, in symbols. Set to 17 by
                default. If `N` is specified, this parameter is ignored.
            alpha (float): The roll-off factor of the filter. Set to 0.35 by
                default.
        """
        if self.filter_t is None or self.filter_h is None:
            if Nsamp is None:
                Nsamp = int(self.sps*Nsps + 1)
            if self.mod_index is not None:
                pass
                self.filter_t, self.filter_h = filters.rrcosfilter(
                    N=Nsamp, alpha=alpha, Ts=self.Tsym/(2*self.mod_index), Fs=self.Fsamp)
            else:
                self.filter_t, self.filter_h = filters.rrcosfilter(
                    N=Nsamp, alpha=alpha, Ts=self.Tsym, Fs=self.Fsamp)

        self.samples = np.convolve(
            self.samples, self.filter_h).astype(np.complex64)
        self.N = len(self.samples)

    def normalize(self, factor=1.0):
        """Normalize the samples, and multiply the by the given factor.

        Divides the samples by the value of the one with the largest magnitude.
        For example, this sets the samples to be between -2**14 and 2**14, with
        a total range of 2**15:

        `modulator.normalize(factor=2**14)`

        Args:
            factor (float): The factor to multiply the samples by. By default,
                this is set to 1.0.
        """
        self.samples /= np.max(np.abs(self.samples))
        self.samples *= factor

    def shift_frequency(self, freq_offset):
        max_shift = self.Fsamp / (2 * self.mod_order)

        if freq_offset > max_shift:
            print("Warning: Frequency offset is too high and will be clipped.")
            freq_offset = max_shift

        t = np.arange(self.N) / self.Fsamp
        self.samples *= np.exp(2.0j * np.pi * freq_offset * t)

    def get_data(self):
        return self.data

    def set_data(self, data):
        """Set data containing values for each symbol"""
        self.data = data

    def get_samples(self):
        return self.samples

    def set_samples(self, samples):
        self.samples = samples
        self.N = len(self.samples)

    def show_psd(self, unit='MHz'):
        psd = np.fft.fft(self.samples/np.max(np.abs(self.samples))) / self.N
        psd = np.fft.fftshift(psd)
        psd = 10 * np.log10(np.abs(psd))
        f = np.linspace(-self.Fsamp/2.0, self.Fsamp/2.0, len(psd))

        assert unit in ["Hz", "kHz", "MHz", "GHz"]
        f /= 10**(["Hz", "kHz", "MHz", "GHz"].index(unit) * 3)

        plt.plot(f, psd)
        plt.ylabel("Power (dBFS)")
        plt.xlabel(f"Frequency ({unit})")


class Modulator(ModulatorDemodulator):
    """Creates a modulator."""

    def __init__(self, Fsamp, Fsym=None, sps=None, Tsym=None, order=2) -> None:
        super().__init__(Fsamp, Fsym=Fsym, sps=sps, Tsym=Tsym)

        self.mod_order = order

    def modulate(self, symbols=[0.0, 1.0], premable=[]):
        bin_data = []
        if len(premable) > 0:
            bin_data.extend(premable)
        bin_data.extend(self.data)

        self.samples = np.array([symbols[b] for b in bin_data])
        self.samples = np.repeat(self.samples, int(self.sps))
        self.samples = self.samples.astype(np.complex64)
        self.N = len(self.samples)


class FSKModulator(Modulator):
    def __init__(self, Fsamp, Fsym=None, sps=None, Tsym=None, h=None, order=2) -> None:
        super().__init__(Fsamp, Fsym, sps, Tsym, order)

        if h is not None:
            self.mod_index = h
            self.Fdev = self.Fsym * h
        else:
            raise ValueError("FSK Mod Index Required")

    def modulate(self):
        q_offsets_vals = [-1, 1]
        q_offsets = [q_offsets_vals[int(b)] for b in self.data]

        sample_offsets = np.repeat(q_offsets, self.sps)

        t = np.arange(len(sample_offsets))/self.Fsamp
        self.samples = np.exp(1j*2*np.pi*t*self.Fdev)
        self.samples.imag *= sample_offsets
        self.N = len(self.samples)

In [8]:
class PN9Generator:
    def __init__(self):
        self.val = 0x1FF

    def __iter__(self):
        pass

    def __next__(self):
        old_val = self.val

        for i in range(8):
            new_msb = (self.val >> 0 & 1) ^ (
                self.val >> 5 & 1)  # XOR of bits 0 and 5
            self.val = (self.val >> 1) | (new_msb << 8)  # Shift and append

        return old_val & 0xFF

def whiten(data: bytes):
    pn9 = PN9Generator()

    return [next(pn9) ^ x for x in data]

In [32]:
Fsym = 7415

Fsamp = Fsym * 16
sps = int(Fsamp / Fsym)
Tsym = 1/Fsym
h = 0.5

mod = FSKModulator(Fsamp, Fsym, h=h)

# command header, message
data = [0xAB, 0xCD, 0x01, 0x11] + list(b"hello there")
uart_len = len(data) + 2 # data + HWID
rf_len = uart_len + 3

# 32 bit preamble, 32 bit sync word
header = [0x55, 0x55, 0x55, 0x55, 0xD3, 0x91, 0xD3, 0x91]

# header, length byte, flags, command, hwid
packet = [rf_len, 0x40] + data + [0x00, 0x00]

# Add CRC
crc_data = crc16(packet)
packet.extend([crc_data & 0xFF, crc_data >> 8])

# Whitening
packet = whiten(packet)

# FEC
# TODO

# Combine header and packet
packet = header + packet

# Convert to bits
bits = "".join([f'{x:08b}' for x in packet])

# Modulate
mod.set_data(bits)
mod.modulate()
mod.apply_filter()
mod.normalize()

samples = mod.get_samples()

# Transmit
usrp.send_waveform(samples, 1, 437e6, Fsamp, [0], 60)

[INFO] [B200] Asking for clock rate 60.743680 MHz... 
[INFO] [B200] OK
[INFO] [B200] Asking for clock rate 60.743680 MHz... 
[INFO] [B200] OK
[INFO] [B200] Asking for clock rate 60.743680 MHz... 
[INFO] [B200] OK
analog frontend filter bandwidth (0.2 MHz).


[[ 4.9125642e-06+0.0000000e+00j  5.5497119e-05-9.5839380e-07j
   1.4926451e-04-1.1766930e-05j ... -3.8743791e-01-7.2511649e-01j
  -2.8789085e-01-6.9162136e-01j -1.7728028e-01-6.4817643e-01j]]


118640