# Data Preprocessing for Digit Recordings

Volunteers were shown digits on a LibreOffice Impress presentation, and instructed to write each digit anywhere on a sheet of paper using a pencil. Each digit is shown for 2 seconds before the next digit was displayed, and the digits were not displayed sequentially, so that volunteers could not anticipate the next digit. 

In [None]:
%matplotlib inline
from pathlib import Path
import librosa
from librosa import display
import IPython.display
import numpy as np
import pandas as pd
from tqdm import tqdm_notebook as tqdm
import sox

In [None]:
# Path to data directory and recordings directory
DATA = Path("../../data/")
RECORDINGS = DATA/"audio-recordings"

In [None]:
# The order of the digits in the recordings:
DIGITS_ORDER = "70345120789641253610648928573961203754901285394867"

The `writers.csv` file contains the filenames of all the recordings, along with the author ID (and names at present, though these will be removed for privacy). Author ID is the author.

In [None]:
writers = pd.read_csv(DATA/"writers.csv")
writers.head()

In [None]:
def play_file(filename):
    filename = str(filename)
    return IPython.display.Audio(filename)

In [None]:
play_file(RECORDINGS/writers["filename"][0])

In [None]:
def load_wav(filename):
    return librosa.core.load(filename, sr=None, mono=True)

In [None]:
data, sr = load_wav(RECORDINGS/writers["filename"][1])
IPython.display.Audio(data, rate=sr)

## Automatically detecting beeps

1. Apply a band pass filter at the frequency of the beep - NOTE: this doesn't actually happen due to a bug.
2. Assuming that the first and last beeps happen in the first and last 5 seconds respectively, using librosa's onset detector to get the times of the beeps


In [None]:
# The period is just from looking at the waveform in audacity
period = 0.00134 - 0.00075
print(f"Frequency = {1 / period}Hz")

In [None]:
tfm = sox.Transformer()
tfm.bandpass(1695)

In [None]:
def get_beep_times(data, sr, begin_window=5, end_window=5, delta=0.8):  # for delta, so far 0.6 to 0.8 seem to work.
    recording_duration = len(data) / sr
    recording_beggining = data[:begin_window*sr]
    recording_end = data[-end_window*sr:]
    begin_onset_events = librosa.onset.onset_detect(recording_beggining, sr, delta=delta, units="time")
    end_onset_events = librosa.onset.onset_detect(recording_end, sr, delta=delta, units="time")
    if len(begin_onset_events) >= 1 and len(end_onset_events) >= 1:
        # If there are multiple detected onsets, we just take the first one and hope for the best
        return begin_onset_events[0], end_onset_events[0] + recording_duration - end_window
    else:
        return -1

In [None]:
path = RECORDINGS/"Record-080.wav"
path_str = str(path)
filtered_path = path_str.replace(".wav", ".beepdetect.wav")
tfm.build(path_str, filtered_path)
data, sr = load_wav(filtered_path)
get_beep_times(data, sr)

## Splitting into individual digits and assigning labels

In [None]:
def process_recording(path, processed_path):
    path_str = str(path)
    filtered_path = path_str.replace(".wav", ".beepdetect.wav")
    tfm.build(path_str, filtered_path)
    data, sr = load_wav(path)
    filtered_data, sr = load_wav(filtered_path)
    beeps = get_beep_times(data, sr)
    if beeps == -1:
        print("Couldn't get times of beeps")
        return -1
    else:
        beep_difference = beeps[1] - beeps[0]
        digit_duration = beep_difference / 50
        digit_duration_samples = int(digit_duration * sr)
        # check the digit duration seems about right
        if 1.9 < digit_duration < 2.2:
            print(f"Digit duration: {digit_duration}")
            filenames = []
            
            digit_breaks_samples = np.linspace(beeps[0] * sr, beeps[1] * sr, 51, dtype=np.int32)
            for i in range(50):
                digit_samples = data[digit_breaks_samples[i]:digit_breaks_samples[i+1]]
                processed_path_filenames = list(processed_path.glob("*.wav"))
                if len(processed_path_filenames) == 0:
                    digit_filename = "000000.wav"
                else:
                    digit_filename_number = int(max(processed_path_filenames).name.replace(".wav", "")) + 1
                    digit_filename = str(digit_filename_number).zfill(6) + ".wav"
                librosa.output.write_wav(processed_path/digit_filename, digit_samples, sr=sr)
                filenames.append(digit_filename)
            return filenames
        else:
            print(f"Calculated digit duration outside expected range: {digit_duration}")
            return -1

In [None]:
# Delete the previous filtered and processed files (otherwise annoying warnings)
!rm data/audio-recordings/*.beepdetect.wav
!rm data/processed/*.wav

filenames = []
writer_ids = []
labels = []

for i in tqdm(range(len(writers))):
    filename, writer_name, writer_id = writers.iloc[i]
    rec_filenames = process_recording(RECORDINGS/filename, DATA/"processed")
    if rec_filenames != -1:
        filenames += rec_filenames
        writer_ids += [writer_id] * 50
        labels += DIGITS_ORDER

In [None]:
valid_pct = 0.2
valid_n = int(len(filenames) * valid_pct)
np.random.seed(42)
is_valid = np.random.permutation([1] * valid_n + [0] * (len(filenames) - valid_n))

In [None]:
 processed_df = pd.DataFrame(np.array([filenames,
                                       writer_ids,
                                       labels,
                                       is_valid]).transpose(),
                             columns=["filename",
                                      "writer_id",
                                      "label",
                                      "is_valid"])
processed_df.to_csv(DATA/"labels.csv", index=False)