In [1]:
import os
import wave

import matplotlib
import numpy as np

from bokeh.io import push_notebook, show, output_notebook
from bokeh.plotting import figure
from bokeh.models import BoxAnnotation, Range1d, Label

output_notebook()

import scipy.fftpack as fft
from scipy.signal import butter, lfilter, freqz

from functools import reduce
import itertools

import matplotlib.pyplot as py


%matplotlib notebook

## Reading the file

First, we need to read in the data file. The `wave` package from the standard library makes this pretty easy, but it only provides a raw byte stream, instead of a list of integers. We need to convert this stream of bytes into a stream of sample values.

The `get_samples` function will handle reading all of the data from an open wav file, correctly decoding the raw byte stream into integers. It works on both 16-bit and 24-bit wav files.

`read_file` is a simple wrapper around `get_samples`, which takes the path to a wav file and returns both the samples and the sample rate (in Hertz).

In [2]:
def sign_extend(bits, value):
    """ Sign-extend arbitrary length samples """
    sign_bit = 1 << (bits - 1)
    return (value & (sign_bit - 1)) - (value & sign_bit)

def get_samples(w):
    w.rewind()
    num_samples = w.getnframes()
    sample_depth = w.getsampwidth()
    raw_data = w.readframes(num_samples)
    for i in range(0, num_samples * sample_depth, sample_depth):
        yield sign_extend(
            sample_depth * 8,
            reduce(
                lambda x, y: x<<8 | y,
                reversed(raw_data[i:i+sample_depth])
            ))

def read_file(file_name):
    w = wave.open(file_name, "rb")
    samples = np.array(list(get_samples(w)))
    fs = w.getframerate()
    w.close()
    return samples, fs

# Read the file and store both the samples and samplerate
s, fs = read_file("DashButton.wav")
nyq = fs / 2

# Limit the file to the first data packet
s = s[:35000]

## Filtering

I had played around with high-pass filtering the raw data to isolate the carrier from low-frequency noise. In reality, the proper approach is probably to bandpass filter the data, but our data is clean enough that it doesn't really matter.

In the code below, we don't actually filter the data.

In [4]:
def butter_highpass(cutoff, fs, order=5):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='high', analog=False)
    return b, a

def butter_highpass_filter(data, cutoff, fs, order=5):
    b, a = butter_highpass(cutoff, fs, order=order)
    y = lfilter(b, a, data)
    return y


hpf_cutoff = 16000
#filtered = butter_highpass_filter(s, hpf_cutoff, fs, order=2)

# Bypass the HPF to see if it affects anything
filtered = s

## Spectrum Analysis

After reading the data, it can be nice to do a quick spectral analysis. Here we do a very large (8192-point) FFT, and we can see that the carrier is betwee 17khz and 20khz.

It's also interesting to point out the effects of the antialiasing filter in the audio interface used to record this data, visible as a drop off in frequency content above 20 KHz.

In [5]:
fftsize = 2**14

spec = np.abs(fft.rfft(filtered, fftsize))
bin_centers = fft.rfftfreq(fftsize, 1/fs) / 1000

# Find the peak bin index and frequency
peak_bin = np.argmax(spec)
peak_freq = bin_centers[peak_bin]
peak_freq_ampl = spec[peak_bin]

# Convert output to dB relative to peak bin and clip to -40 dB
spec_db = 10 * np.log10(spec / peak_freq_ampl)
spec_db = np.clip(spec_db, -40, 0.0)

# Pretty plot!
plt = figure(
    title="Spectral Content (%d point FFT)" % fftsize,
    x_axis_label="Frequency (KHz)",
    y_axis_label="dB",
    y_range=Range1d(-40, 2, bounds="auto"),
    x_range=Range1d(0, nyq/1000, bounds="auto")
)
plt.line(bin_centers, spec_db)
plt.circle(peak_freq, 0.0, fill_color="red")
plt.add_layout(Label(x=peak_freq, y=0, text='%.2f KHz' % (peak_freq)))

show(plt)

## Envelope Recovery

The data is modulated using amplitude-shift keying, with 4 discrete amplitude levels used. To pick out the levels, we'll first run a simple envelope detector on the samples. This one is composed of a full-wave rectifier (the absolute value function) and a simple moving average, which low-pass filters the rectified wave.

We then plot both the raw samples and the post-detector output with each other.

In [6]:
def gen_kernel(initial_width, order):
    k = np.ones(initial_width) / initial_width
    for i in range(order):
        k = np.convolve(k, k)
    return k

def full_wave_envelope(f, window_size=10, lpf_order=0, output_gain=1.0):
    """ Simple envelope detector
    
    This full-wave rectifies the signal and low-pass filters it
    with a multipass moving average filter.
    """
    # Full-wave rectify the signal (absolute value)
    return output_gain * np.convolve(np.abs(f), gen_kernel(window_size, lpf_order), mode='same')
    

# Demodulate to baseband
# Output gain is only used here for the sake of plotting
baseband = full_wave_envelope(
    s,
    window_size=20,
    lpf_order=5,
    output_gain=1.8,
)

plt = figure(
    title="Raw signal and detected envelope",
    x_axis_label="Sample Number",
    y_axis_label="Raw Value",
    y_range=Range1d(1.1 * np.min(s), 1.1 * np.max(s)),
    x_range=Range1d(0, 5000, bounds=(0, len(baseband))),
    tools="xpan,xwheel_zoom,reset",
    active_scroll="xwheel_zoom"
)

x = range(0, len(baseband))
plt.line(x, s)
plt.line(x, baseband, line_color="red")

show(plt)

## Peak Detection

The output of the envelope detector is well-suited for running a simple peak detection algorithm on. Instead of doing clock recovery, we use an asynchronous peak detector. This one loops through the samples, and if the sample is larger than all of the samples around it within `search_range`, then it is considered a peak.

In [7]:
def find_peaks(s, search_range=30):
    """ Find peak locations and values in a signal s """
    for i in range(search_range, len(s) - search_range):
        window = s[i-search_range:i+search_range]
        if np.max(window) == s[i]:
            yield (i, s[i])

# Normalize the data to the range [0, 1.0]
norm = baseband / np.max(baseband)

# Threshold out noise
threshold = 0.04
peaks = [p for p in find_peaks(norm, 30) if p[1] > threshold]

# Plot the envelope and detected peaks
plt = figure(
    title="Baseband and detected peaks",
    x_axis_label="Sample Number",
    y_axis_label="Normalized Value",
    y_range=Range1d(0, 1.1),
    x_range=Range1d(0, 15000, bounds=(0, len(norm))),
    tools="xpan,xwheel_zoom,crosshair,reset",
    active_scroll="xwheel_zoom"
)
plt.line(range(len(norm)), norm)
plt.circle([p[0] for p in peaks], [p[1] for p in peaks], fill_color="red")

# TODO: Programatically determine bin thresholds based on first 10 symbols
bin_thresholds = np.array([0.165, 0.3, 0.7, 2.0])
bin_no = lambda x: np.argmax(bin_thresholds >= x)

# Add some cool range colors to the graph
prev_box_top = threshold
range_colors = itertools.cycle(['green', 'yellow'])
for box_top in bin_thresholds:
    plt.add_layout(BoxAnnotation(
        bottom=prev_box_top,
        top=box_top,
        fill_alpha=0.05,
        fill_color=next(range_colors)
    ))
    prev_box_top = box_top

show(plt)

## Converting the stream of symbols into bytes

The following decodes the stream of symbols into a stream of bytes, essentially using a shift register to read in two bits at a time until it has a complete byte. It then outputs that byte using the `yield` keyword and resets its shift register.

In [8]:
def decode_stream(symbols):
    """ Decodes the stream of symbols into a stream of bytes """
    
    # Mapping of symbol number to binary value
    symbol_map = [0b11, 0b10, 0b01, 0b00]
    
    # Decoding state
    bits = 0
    cur_byte = 0
    
    # Loop through symbols
    for s in symbols:
        cur_byte = cur_byte | symbol_map[s] << bits
        bits += 2
        if bits == 8:
            yield(cur_byte)
            bits = 0
            cur_byte = 0

preamble_symbols = 10
symbols = [bin_no(p[1]) for p in peaks[preamble_symbols:]]
payload = bytes(decode_stream(symbols))

payload

b'"jj{d\'Y\x07Network\x08Password\x08USAmazonUS'