In this notebook, we build on the model training of Exploration 2 and the real-time listening and processing of Exploration 3 (see `explorations` folder), to build a single unified workflow. Running this allows you to, in this one self-contained notebook,
* specify the noises you want your model to recognize
* record training and testing data for those noises
* train a model on the data, and evaluate its performance
* use the model with a listener to recognize noises in real-time and act on them

At present, this can recognize percussive noises, like short clicks or consonant sounds ('t', 'p', ...). Also, it assumes any percussive sound it hears is something to be recognized: it will not reject noises it has not been trained on.

# Build and run a BeatBot here:

##### These will fail until the rest of the notebook has been run. Scroll down a bit to "Component Functions", and then select "Cell -> Run All Below" in the Jupyter menu. Then come back to rerun these.

First select your microphone.

In [None]:
device = 0 # select the microphone. Use sd.query_devices() to see options
print(sd.query_devices())

Record noise samples and train a model. (~100 samples per noise gives reasonable results.) Save the resulting audio data and model, if you wish:

In [None]:
my_model, my_recordings = build_beatbot(device=device)

Try out your new model:

In [None]:
# This will run for 10 sec, printing out any noise it hears during that time.
run_beatbot(my_model, print_noise, device=device, duration=10)

If you already have audio data or a model saved, you can load them with ...

In [None]:
# my_recordings = load_noise_sample_dict(filename='my_noise_data.npy')
# my_model = load_model(filename='my_model.pth')

You can also build on an old set of recordings with ...

In [None]:
# my_new_model, my_new_recordings = build_beatbot(device=device, starting_noise_data=my_recordings)

Or skip recording entirely (and testing, if you want) with

In [None]:
# my_new_model, my_new_recordings = build_beatbot(device=device, starting_noise_data=my_recordings,
#                                                 skip_recording=True, skip_testing_model=True)

### Keyboard control: up down left right escape

We want to play a videogame, like Crypt of the Necrodancer, with this noise control. Let's introduce some basic keyboard control.

In [None]:
import pyautogui

In [None]:
# basic keyboard control

def press_key(noise_heard):
    keyboard_mapping = {
        't': 'up',
        'k': 'right',
        'p': 'down',
        'tsk': 'left',
        'cluck': 'escape',
    }
    try:
        pyautogui.press( keyboard_mapping[noise_heard] )
    except:
        print('No keyboard mapping for "{}"'.format(noise_heard))
    print_noise( noise_heard )

And define a model that only recognizes these five noises, even if we recorded more earlier:

In [None]:
my_recordings_subset = { k: v for k, v in my_recordings.items() if k in ('t', 'k', 'p', 'tsk', 'cluck') }
necrodancer_model, _ = build_beatbot(device=device, skip_recording=True, starting_noise_data=my_recordings_subset)

Now we make it run for 20 min, switch over to the game, and play!

In [None]:
run_beatbot(necrodancer_model, press_key, device=device, duration=20*60)

# Component functions

The rest of this notebook is all of the pieces that go into making the above work. Most of this was developed in the Exploration notebooks, and adapted here into a more cohesive whole. You may wish to check out those notebooks.

## Real-time listening: general functions

Functions to continuously listen for noises, and pass them to a processing function.

In [None]:
import sounddevice as sd
import numpy as np
import queue
import time

In [None]:
########### PARAMETERS ###########

device = 0 # select the microphone. Use sd.query_devices() to see options
print(sd.query_devices())

BATCH_DURATION = 0.02      # listen for noises BATCH_DURATION (seconds) at a time
THRESHOLD_MULTIPLIER = 5   # detect a spike when the next batch is at least THRESHOLD_MULTIPLIER times bigger
THRESHOLD_ABSOLUTE = 0.005 # ignore any spikes that don't rise above this. Too many false positives without this
BATCHES_PER_NOISE = 3      # collect BATCHES_PER_NOISE batches of audio input per detected noise

samplerate = sd.query_devices(device, 'input')['default_samplerate']
# optional for future: set the FFT window size based on the sample rate

blocksize = int(samplerate * BATCH_DURATION) # get the block (batch) size in frames

In [None]:
########### Functions for continuous listening and processing ###########

# bundling these is easier than declaring them 'global' in the below
class listen:
    """ Helper variables for processing continuous audio input """
    
    def reset():
        listen.prev_max = 1.
        listen.batches_to_collect = 0
        listen.batches_collected = 0
        listen.current_noise = None
        listen.start = time.time()

        listen.processing_start = 0 # for timing the total processing time
        listen.processing_end = 0

        listen.q_batches = queue.Queue() # a FIFO queue
        listen.all_audio = []  # could use this to collect all audio (uncomment line in callback)
        listen.all_noises = [] # could use this to collect all noises. Use the processing_function to append
    
# The callback function for the sounddevice input stream
def callback(indata, frames, time_pa, status):
    """ Detect if a noise has been made, and add audio to the queue. """
    if status:
        print('STATUS: ', str(status))
    if any(indata):
        indata_copy = indata.copy()
        new_max = np.absolute(indata_copy).max()
        # listen.all_audio.append(indata_copy)
        
        # Gather audio data if more is required. Make sure to *copy* the input data.
        if listen.batches_to_collect > 0:
            listen.q_batches.put_nowait(indata_copy)
            listen.batches_collected  += 1
            listen.batches_to_collect -= 1
                
        # Otherwise, see if a new noise has been detected
        elif ( new_max > THRESHOLD_ABSOLUTE and
               new_max > THRESHOLD_MULTIPLIER * listen.prev_max ):
            
            listen.processing_start = time.time()
            
            listen.q_batches.put_nowait(indata_copy)
            listen.batches_collected += 1
            listen.batches_to_collect = BATCHES_PER_NOISE - 1 # get more batches
               
        listen.prev_max = new_max
        
    else:
        print('no input')

# Returns True if enough time has elapsed
def time_elapsed(duration):
    def _time_elapsed():
        return time.time() - listen.start > duration
    return _time_elapsed

# A helper to print the time it took to process a single noise recognition
def print_processing_time():
    listen.processing_end = time.time()
    print('Processing took {:.4f} sec\n'.format(
        listen.processing_end - listen.processing_start))

# The main generic real-time listening function
def listen_and_process(processing_function, stop_condition=time_elapsed(3),
                       device=device, print_after_processing=None):
    """ Listen continuously for noises until stop_condition() returns True (default: wait 3 sec).
    As each noises heard, process is using processing_function. Return all noises at the end. """

    listen.reset() # reinitialize helper variables
    
    with sd.InputStream(device=device, channels=1, callback=callback,
                        blocksize=blocksize,
                        samplerate=samplerate):
        print('Listening...')
        while True:
            
            # data collects if it meets the threshold. Process when enough data is in queue:
            if listen.batches_collected >= BATCHES_PER_NOISE:
                data = []
                for _ in range(BATCHES_PER_NOISE):
                    data.append( listen.q_batches.get_nowait() )
                listen.batches_collected -= BATCHES_PER_NOISE
                
                listen.current_noise = np.concatenate( data, axis=None )
                
                processing_function( listen.current_noise )
                
                # print something after processing, if desired
                print_after_processing() if print_after_processing else None
                    
            # listen until the condition is met
            if stop_condition():
                break
        print('Done.')

## Recording audio data for training/testing

We now apply the above functions to make a listener to record training and testing data for our model:

In [None]:
from IPython.display import clear_output

In [None]:
########## A listener to record training/testing data. ##########

# a helper function to get nonnegative integer input
def get_int_input():
    while True:
        response = input() # response is a string
        try:
            val = int(response)
            if val >= 0:
                break
            print('Integer must be non-negative.')
        except:
            print('Please enter an integer.')
        
    return val

# Use the generic listening function and add a user interface to record audio data for model training
def record_model_data(device=device, starting_noise_data={}):
    """ Prompts the user to label and record noise samples. Returns a dictionary with labels as keys
    and lists of flattened numpy arrays (one array per noise sample) as values. """
    
    noise_data_dict = starting_noise_data.copy()
    noise_count = 0
    
    # a helper to gather audio samples and increment the progress counter
    def gather_and_progress(label, total):
        def _gather_and_progress(rec):
            nonlocal noise_count
            
            listen.all_noises.append( rec )
            noise_count += 1
            print(noise_count, end=', ')

        return _gather_and_progress
    
    # Ask user how many samples to record
    same_num_for_each = False
    print('Would you like to record the same number of samples for each noise type? (enter y for yes)')
    answer = input()
    if 'y' in answer:
        same_num_for_each = True
        print('How many noise samples would you like to record for each?')
        num = get_int_input()
    else:
        print('We recommend recording similar numbers for each type, to avoid biasing the model.\n')
    
    # a helper to print the overall recording progress
    def print_progress(noise_data_dict):
        print('Noises recorded so far:', { k: len(v) for k, v in noise_data_dict.items() })
    
    # ask user for noise labels, and listen and record the desired number of samples
    while True:    
        print_progress(noise_data_dict)
        print('Enter text label for next noise (leave blank to exit):')
        label = input()
        if not label:
            return noise_data_dict
        if label in noise_data_dict:
            print('You have already recorded {} samples of this noise. You may now record more.'.format(
                    len(noise_data_dict[label]))
                 )
        if not same_num_for_each:
            print('How many noise samples would you like to record?')
            num = get_int_input()
            if num == 0:
                continue
        
        clear_output() # clear jupyter output
        print_progress(noise_data_dict)
        print('Please start recording.\n')
        print('"{}" noises recorded (out of {}): '.format(label, num))
        noise_count = 0
        listen_and_process(processing_function=gather_and_progress(label, num), 
                           stop_condition=lambda: noise_count >= num,
                           device=device,
                           print_after_processing=None)
        print('')

        # save the list of recorded noises
        if label in noise_data_dict:
            noise_data_dict[label] += listen.all_noises.copy()
        else:
            noise_data_dict[label] = listen.all_noises.copy()
        
    return noise_data_dict

In [None]:
# # TESTING
# my_recordings = record_model_data(device=0)
# print(my_recordings)

## Audio processing: preparing spectrograms

A function to create a spectrogram from a noise sample recording.

In [None]:
import torch
import torchaudio.transforms
import matplotlib.pyplot as plt

In [None]:
########### PARAMETERS ###########
N_MELS = 28                # the number of mel filterbanks in each spectrogram

In [None]:
def generate_spectrogram(noise_sample, samplerate=samplerate, n_mels=N_MELS):
    """ Takes a noise_sample as a flattened numpy.array,
    and returns a mel spectrogram as a 2D torch.tensor """
    
    # normalize to have unit mean, and compute the spectrogram
    normed_sample = torch.from_numpy(noise_sample) / noise_sample.mean()
    mel = torchaudio.transforms.MelSpectrogram(
        sample_rate=samplerate, n_mels=n_mels)(normed_sample)
    
    return mel.log2()

In [None]:
# # TESTING
# my_spectrogram = generate_spectrogram( noise_sample=my_recordings['t'][0] )
# plt.figure(figsize=(2, 2))
# plt.imshow(my_spectrogram)

## Neural net: preparing datasets

From a dictionary of labeled noise sample recordings, prepare the datasets and data loaders needed to train and test the model. Some of the model construction, training, and evaluation code here has been adapted from a [pytorch tutorial](https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html#sphx-glr-beginner-blitz-cifar10-tutorial-py).

In [None]:
from torch.utils.data import Dataset, DataLoader, Subset, ConcatDataset, random_split

In [None]:
# Prepare the noise data for handing to the convolutional neural network, for training and testing

class NoisesDataset(Dataset):
    """ Noises dataset. Takes a dictionary of recordings and returns spectrograms when data is requested. 
    A channel dimension is added to each spectrogram, as needed for the CNN. """

    def __init__(self, noise_data_dict, samplerate=samplerate, n_mels=N_MELS): 
        """ Initialization: 
        Takes a dictionary of noise samples, with labels as keys and lists of
        flattened numpy arrays (one array per noise sample) as values. 
        Computes spectrograms for each. """
        
        self.noise_data_dict = noise_data_dict
        self.noise_samples = []
        self.labels = []
        
        self.noise_str_to_int = {} # correspondences between integer and string labels
        self.noise_int_to_str = {} 
        i = 0
        
        for label, list_of_arrays in noise_data_dict.items():
            # extract samples and labels from the dictionary
            num_samples = len(noise_data_dict[label])
            self.noise_samples += noise_data_dict[label]
            self.labels += [label] * num_samples
            
            # assign a unique integer to each string label
            if label not in self.noise_str_to_int:
                self.noise_str_to_int[label] = i
                self.noise_int_to_str[i] = label
                i += 1
        
        # compute spectrograms, and convert labels to integers
        self.spectrograms = [ generate_spectrogram(s, samplerate, n_mels) 
                              for s in self.noise_samples ]
        self.labels = [ self.noise_str_to_int[ L ] for L in self.labels ]

    def __len__(self):
        " Return  the total number of samples "
        return len(self.labels)

    def __getitem__(self, sample_index):
        " Return one sample of data "
        # Load data and get (integer) label
        # Note the CNN will expect the first tensor dimension to be the channel, hence the unsqueeze
        X = self.spectrograms[sample_index].unsqueeze(0)
        y = self.labels[sample_index]

        return X, y

# adapted from https://stackoverflow.com/questions/53916594/typeerror-object-of-type-numpy-int64-has-no-len
def prepare_data_loaders(full_dataset, training_fraction=0.8, batch_size=8):
    """ Prepare data loaders for training and testing of the model. """

    # split into training and testing datasets
    train_size = int(training_fraction * len(full_dataset))
    test_size = len(full_dataset) - train_size
    train_dataset, test_dataset = random_split(full_dataset, [train_size, test_size])

    # create data loaders
    train_params = {
        'batch_size': batch_size,
        'shuffle': True,
        'num_workers': 1,
    }
    train_loader = DataLoader(dataset=train_dataset, **train_params)
    test_loader  = DataLoader(dataset=test_dataset)
    
    return train_loader, test_loader, train_dataset, test_dataset

def prepare_even_data_loaders(full_dataset, training_fraction=0.8, batch_size=8):
    """ Prepare data loaders for training and testing of the model, including 
    training_fraction of each type in the training dataset. """
    
    train_dataset = NoisesDataset({})
    test_dataset  = NoisesDataset({})
    
    # iterate through noise labels, adding training_fraction of each
    # to the training dataset/loader
    unique_int_labels = list(set(full_dataset.labels))
    dataset_element_labels = np.array([ d[1] for d in full_dataset ])
    for i in unique_int_labels:
        # get an array of indices, of noise samples with label i
        i_indices = np.nonzero(dataset_element_labels == i)[0]
        num_samples = len(i_indices)
        train_size = int(training_fraction * num_samples)
        test_size = num_samples - train_size

        # make datasets for just that noise label
        i_dataset = Subset(full_dataset, i_indices)
        i_train_dataset, i_test_dataset = random_split(i_dataset, [train_size, test_size])

        # add these datasets to the overall collections
        train_dataset = ConcatDataset([train_dataset, i_train_dataset])
        test_dataset  = ConcatDataset([test_dataset,  i_test_dataset])

    # create the data loaders
    train_params = {
        'batch_size': batch_size,
        'shuffle': True,
        'num_workers': 1,
    }
    train_loader = DataLoader(dataset=train_dataset, **train_params)
    test_loader  = DataLoader(dataset=test_dataset)
    
    return train_loader, test_loader, train_dataset, test_dataset

In [None]:
# # TESTING prepare_data_loaders
# my_dataset = NoisesDataset(my_recordings)
# my_train_loader, my_test_loader, my_train_dataset, my_test_dataset = prepare_data_loaders(my_dataset, batch_size=8)

# # get some random training spectrograms
# my_train_dataiter = iter(my_train_loader)
# my_spectrograms, my_labels = my_train_dataiter.next()
# my_batch_size = 8

# # show spectrograms and print labels
# fig, ax = plt.subplots(1, my_batch_size)
# for i in range(len(my_spectrograms)):
#     ax[i].imshow(my_spectrograms[i][0].numpy()) # the 0 selects the first (only) channel
# print(' '.join('{:>4s}'.format(my_dataset.noise_int_to_str[my_labels[j].item()]) for j in range(my_batch_size)))

In [None]:
# # TESTING prepare_even_data_loaders
# my_dataset = NoisesDataset(my_recordings)
# my_train_loader, my_test_loader, my_train_dataset, my_test_dataset = prepare_even_data_loaders(my_dataset, batch_size=8)

# # get some random training spectrograms
# my_train_dataiter = iter(my_train_loader)
# my_spectrograms, my_labels = my_train_dataiter.next()
# my_batch_size = 8

# # show spectrograms and print labels
# fig, ax = plt.subplots(1, my_batch_size)
# for i in range(len(my_spectrograms)):
#     ax[i].imshow(my_spectrograms[i][0].numpy()) # the 0 selects the first (only) channel
# print(' '.join('{:>4s}'.format(my_dataset.noise_int_to_str[my_labels[j].item()]) for j in range(my_batch_size)))

## Neural net: defining and training

Define the convolutional neural network, and train it.

In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
class Net(nn.Module):
    def __init__(self, image_size, noise_int_to_str):
        super(Net, self).__init__()
        
        # the spectrogram image size is needed to compute layer sizes
        self.image_size = image_size
        
        # the dictionary of noise labels is needed for translating predictions in the final layer
        self.noise_int_to_str = noise_int_to_str
        
        
        # image_size is a 2-tuple, the expected dimensions of each spectrogram
        # .... or a 3-tuple, if the channel has already been added
        if   len(image_size) == 2:
            h, w = image_size
        elif len(image_size) == 3:
            channel, h, w = image_size
        
        # number of output nodes, (square) kernel size, and pool size per convolution layer,
        # assuming the stride for pooling is the same as the pool size
        kernels = [3, 3]
        pool = 2
        
        # compute the number of input nodes for the first dense layer
        h_out, w_out = h, w
        for k in kernels:
            # the convolution.
            h_out += -k + 1
            w_out += -k + 1
            
            # the pool. (from help(torch.nn.MaxPool2d))
            h_out = int( (h_out - pool) / pool + 1 )
            w_out = int( (w_out - pool) / pool + 1 )
            
        self.image_out = h_out * w_out
        
        # define the layers. The numbers of nodes chosen do not have deep thought behind them.
        self.conv0 = nn.Conv2d(1, 32, kernels[0])
        self.pool = nn.MaxPool2d(2)
        self.conv1 = nn.Conv2d(32, 10, kernels[1])
        self.fc0 = nn.Linear(10 * self.image_out, 50)
        self.fc1 = nn.Linear(50, 10)
        # number of output nodes for final dense layer: the number of noise types        
        self.fc2 = nn.Linear(10, len(noise_int_to_str))
        
    def forward(self, x):
        x = self.pool(F.relu(self.conv0(x)))
        x = self.pool(F.relu(self.conv1(x)))
        x = x.view(-1, 10 * self.image_out)
        x = F.relu(self.fc0(x))
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

In [None]:
def train_net(net, epochs, train_loader, batch_progress=50):
    """ Use training data from train_loader to train net for a number of epochs,
    using a cross entropy loss function and Adam as the optimizer. """
    
    # the loss function and optimizing method
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(net.parameters())
    
    batch_num = 0
    for epoch in range(epochs):  # loop over the dataset multiple times
        
        batch_running_loss = 0.0
        
        for i, data in enumerate(train_loader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # accrue loss for printing
            batch_running_loss += loss.item()
            
            # print progress every batch_progress batches
            if batch_num % batch_progress == batch_progress-1:
                print('[{:d}, {:5d}] loss: {:.3f}'.format(
                  epoch + 1, i + 1, batch_running_loss / batch_progress))
                batch_running_loss = 0.0
                batch_num = 0
            
            batch_num += 1
        
    print('Finished Training')

In [None]:
# # TESTING
# my_net = Net(my_spectrogram.size(), my_dataset.noise_int_to_str)
# train_net(my_net, 50, my_train_loader, batch_progress=100)

## Evaluate model quality

Check the accuracy of the model against the testing and training sets, and compute the confusion matrix.

In [None]:
import itertools
from sklearn.metrics import confusion_matrix

In [None]:
def accuracy_rating(net, dataloader, dataset_label):
    """ Print the fraction of correct predictions on a data loader. """
    correct = 0
    total = 0
    all_targets = torch.tensor([], dtype=torch.long)
    all_predictions = torch.tensor([], dtype=torch.long)
    with torch.no_grad():
        for data in dataloader:
            spectrograms, labels = data
            outputs = net(spectrograms)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
            all_predictions = torch.cat( 
                (all_predictions, predicted), dim=0 )
            all_targets = torch.cat( 
                (all_targets, labels), dim=0 )
            
    print('Accuracy of the network on the {} {} spectrograms: {:.0f} %'.format(
        total,
        dataset_label,
        100 * correct / total))
    
    return all_predictions, all_targets

In [None]:
# adapted from https://deeplizard.com/learn/video/0LhiS6yu2qQ
def plot_confusion_matrix(predictions, targets, labels_int_to_str, 
                          normalize=False, title='Confusion matrix', cmap=plt.cm.Blues):
    """ Compute and display the confusion matrix. """
    
    stacked = torch.stack( [targets, predictions], dim=1 )
    all_int_labels = sorted(list(labels_int_to_str.keys()))
    num_labels = len(all_int_labels)
    confusion_matrix = torch.zeros(num_labels, num_labels, dtype=torch.int64)

    for pair in stacked:
        target_label, prediction_label = pair.tolist()
        confusion_matrix[target_label, prediction_label] += 1

    classes = [ labels_int_to_str[i] for i in all_int_labels ]    
    cm = confusion_matrix # rename for compactness
        
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        # print('Confusion matrix, without normalization:')
        pass
        
    size = min(0.7 * (num_labels + 1), 8)
    plt.figure(figsize=(size, size))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar(fraction=0.046, pad=0.04)
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt), horizontalalignment="center", 
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    
    return confusion_matrix

In [None]:
# # TESTING
# accuracy_rating(my_net, my_train_loader, 'training')
# preds, targets = accuracy_rating(my_net, my_test_loader, 'test')

# plot_confusion_matrix(preds, targets, my_dataset.noise_int_to_str);

## Real-time listening and recognition

With the trained model in hand, make a listener to recognize noises and act on them.

In [None]:
def get_prediction(model, noise_sample, samplerate=samplerate, n_mels=N_MELS):
    """ Build the spectrogram and use our model to recognize the noise """
    
    mel = generate_spectrogram(noise_sample, samplerate, n_mels)

    # change from torch.Size([A, B]) to torch.Size([1, 1, A, B])
    mel = mel[None, None, :, :]
    
    # run through the model and get prediction
    output = model(mel)
    energy, label = torch.max(output.data, 1)
    
    # return the string label of the noise
    return model.noise_int_to_str[label.item()]

def print_noise(noise_heard):
    """ Print the string label of the noise that has been heard. """
    print(noise_heard, end=", ")

In [None]:
def listen_recognize_and_respond(model, act_on_noise, device=device, duration=5):
    """ Continuously listen for noises for duration (sec), then recognize them
    with the model and respond with the function act_on_noise. """
    
    def processing_function(noise_sample):
        pred = get_prediction(model, noise_sample)
        act_on_noise(pred)
    
    listen_and_process(processing_function=processing_function, 
                           stop_condition=time_elapsed(duration),
                           device=device,
                           print_after_processing=None)

Note this supposes that the default samplerate and n_mels for get_prediction are the same as those used for the training dataset. This could be made more robust by allowing processing_function to accept the sample rate as well as a numpy array, and by recording the n_mels used in the dataset the model was trained on.

In [None]:
# # TESTING
# listen_recognize_and_respond(my_net, print_noise, device=2, duration=20)

## Saving/loading recordings and models

Functions to save (and load) recordings and models, if desired.

In [None]:
import os

Some helper functions to save or load generic files:

In [None]:
def save_file(data, filename, rewrite, basepath, extension, save_function): 
    """ Save data to basepath/filename.extension using save_function """
    
    # Get a nonempty filename. Require it to be unique unless rewrite=True.
    while True:
        if filename is None:
            print('Enter filename to write to (leave blank to cancel):')
            filename = input()

        if filename == '':
            print("No filename provided. Aborting.")
            return

        # Add the extension if it isn't already included
        if filename[-4:] != extension:
            filename += extension

        # Make the base directory if it doesn't already exist
        if not os.path.exists(basepath):
            os.makedirs(basepath)

        # Don't accidentally overwrite an existing file
        valid_filename = True
        if not rewrite:
            with os.scandir(basepath) as entries:
                for entry in entries:
                    if entry.name == filename:
                        print('File {} already exists. Use rewrite=True to overwrite.\n'.format(
                        './' + basepath + filename))
                        filename = None
                        valid_filename = False
        
        if valid_filename:
            break

    # Write the file, if we've made it this far
    path = basepath + filename
    save_function(data, path)
    print('Data saved to', path)
    
    return path
    
def load_file(filename, basepath, load_function):
    """ Load a file from basepath/filename """
    
    # check if the base directory exists
    if not os.path.exists(basepath):
        print("Base directory {} doesn't exist. Aborting.".format(basepath))
        return 

    # Get the filename if one wasn't provided, abort if empty
    if filename is None:
        # display the available files for user convenience
        print('Files in {} include:'.format(basepath))
        with os.scandir(basepath) as entries:
            for entry in entries:
                print(entry.name)

        print('\nEnter filename to load:')
        filename = input()
        if filename == '':
            print('Aborting.')
            return
        
    # Abort if the file doesn't exist
    with os.scandir(basepath) as entries:
        if filename not in [ entry.name for entry in entries ]:
            print('File does not exist. Aborting.')
            return 
   
    # Load the file, if we've made it this far.
    path = basepath + filename
    loaded_data = load_function(path)
    print('File loaded:', path)

    return loaded_data

Save or load noise sample dictionaries, with audio data:

In [None]:
DICT_BASEPATH = 'saved_noise_sample_dictionaries/'

def save_noise_sample_dict(noise_data_dict, filename=None, rewrite=False, basepath=DICT_BASEPATH): 
    """ Save the dictionary of noise sample recordings """
    
    def save_function(data, path):
        np.save(path, data)
        
    return save_file(data=noise_data_dict, filename=filename, rewrite=rewrite,
                    basepath=basepath, extension=".npy", save_function=save_function)
    
def load_noise_sample_dict(filename=None, basepath=DICT_BASEPATH):
    """ Load a dictionary of noise sample recordings """
    
    def load_function(path):
        return np.load(path, allow_pickle=True).item()
        
    return load_file(filename=filename, basepath=basepath, load_function=load_function)

In [None]:
# # TESTING
# save_noise_sample_dict(my_recordings)
# my_loaded_file = load_noise_sample_dict(filename='my_recordings.npy')
# my_loaded_file

Save or load trained models. This required saving (or loading) both the trained model parameters, as well as the image_size and noise_int_to_str dictionary needed to instantiate the model. These are done in tandem, so that loading a model automatically returns the fully restored model.

In [None]:
MODEL_BASEPATH = 'trained_models/'

def get_paired_filenames(filename):
    # If a filename is given, return the two associated files with appropriate extensions.
    if type(filename) is str:
        if   len(filename) > 4 and filename[-4:] == ".pth":
            params_filename = filename
            init_filename   = filename[:-4] + ".npy"
        elif len(filename) > 4 and filename[-4:] == ".npy":
            params_filename = filename[:-4] + ".pth"
            init_filename   = filename
        else:
            params_filename = filename + ".pth"
            init_filename   = filename + ".npy"
        return params_filename, init_filename
    else:
        return None, None

def save_model(model, filename=None, rewrite=False, basepath=MODEL_BASEPATH): 
    """ Save the trained model parameters, and also the image_size and noise_int_to_str dictionary """
    
    def save_function_parameters(data, path):
        torch.save(data.state_dict(), path)
        
    def save_function_init(data, path):
        np.save(path, (data.image_size, data.noise_int_to_str))
    
    # save the neural net parameters
    print('Saving the model parameters (.pth) ...')
    parameters_path = save_file(data=model, filename=filename, rewrite=rewrite,
                                basepath=basepath, extension=".pth",
                                save_function=save_function_parameters)
    
    # abort if that save failed
    if parameters_path is None:
        return
    
    # save the image_size and noise_int_to_str dictionary to the same directory
    # and filename, different extension. These are needed to initialize a new Net
    print('Saving the image_size and noise_int_to_str (.npy) ...')
    filename = parameters_path[len(MODEL_BASEPATH):-4]
    init_path = save_file(data=model, filename=filename, rewrite=rewrite,
                                basepath=basepath, extension=".npy",
                                save_function=save_function_init)
        
    return parameters_path, init_path
    
def load_model(filename=None, basepath=MODEL_BASEPATH):
    """ Load and initialize trained model """
    
    def load_function_params(path):
        state_dict = torch.load(path)
        return state_dict
    
    def load_function_init(path):
        image_size, noise_int_to_str = np.load(path, allow_pickle=True)
        return image_size, noise_int_to_str
    
    # If no filename is given, prompt for filenames. Otherwise, load the two associated files.
    params_filename, init_filename = get_paired_filenames(filename)
    if (params_filename, init_filename) == (None, None):
        print('Load the parameters file, with extension .pth\n')
        state_dict = load_file(filename=None, basepath=basepath, load_function=load_function_params)
        if state_dict is None:
            return
        print('\nLoad the initialization file, with extension .npy\n')
        model_init = load_file(filename=None, basepath=basepath, load_function=load_function_init)
    else:
        print('Loading the parameters file (.pth) ...')
        state_dict = load_file(filename=params_filename, basepath=basepath, load_function=load_function_params)
        print('\nLoading the initialization file (.npy) ...')
        model_init = load_file(filename=init_filename,   basepath=basepath, load_function=load_function_init)
    
    # Abort if either load failed.
    if (state_dict is None) or (model_init is None):
        print('Could not load both files. Aborting.')
        return
    
    # Build the model from the resulting data
    image_size, noise_int_to_str = model_init
    model = Net(image_size, noise_int_to_str)
    model.load_state_dict(state_dict)

    return model

In [None]:
# # TESTING
# save_model(my_net, filename='my_model')

In [None]:
# # TESTING
# new_model = load_model(filename='my_model')
# print(new_model)

# From recording to recognizing: all together

Here we define two functions:
* `build_beatbot` to record training audio + train the model + evaluate the model, and
* `run_beatbot` to continuously listen + recognize noises + act on them.

In [None]:
def build_beatbot(device=device, starting_noise_data={},
                  skip_recording=False,
                  batch_size=8, epochs=10, batch_progress=100,
                  skip_testing_model=False,
                  save_recordings_filename=None, rewrite_recordings_file=False,
                  save_model_filename=None,      rewrite_model_file=False):
    """ Record audio data, train a model, evaluate it, and optionally save the results.
    If skip_testing_model is True, use all data for training and skip the model testing."""
    
    # Record training data and construct the dataset
    if skip_recording:
        noise_data_dict = starting_noise_data
    else:
        noise_data_dict = record_model_data(device=device, starting_noise_data=starting_noise_data)
        
    dataset = NoisesDataset(noise_data_dict)
    
    # Prepare the dataloader. Use all data as training data if skip_testing_model is True.
    if skip_testing_model:
        training_fraction = 1
    else:
        training_fraction = 0.8
    train_loader, test_loader, _, _ = prepare_even_data_loaders(dataset, batch_size=batch_size,
                                                                training_fraction=training_fraction)
    
    # Build and train the neural net
    image_size = dataset[0][0].size()
    model = Net(image_size, dataset.noise_int_to_str)
    train_net(model, epochs, train_loader, batch_progress=batch_progress)
    
    # Evaluate the model and show the confusion matrix, or skip testing altogether.
    if not skip_testing_model:
        preds, targets = accuracy_rating(model, test_loader, 'test')
        plot_confusion_matrix(preds, targets, dataset.noise_int_to_str);
    
    # Offer to save the recordings and model
    print('\nWould you like to save your audio data?')
    save_noise_sample_dict( noise_data_dict, filename=save_recordings_filename, rewrite=rewrite_recordings_file )
    print('\nWould you like to save your model?')
    save_model( model, filename=save_model_filename, rewrite=rewrite_model_file )
    
    return model, noise_data_dict

In [None]:
# # TESTING
# my_model, my_recordings = build_beatbot(device=2, starting_noise_data=my_recordings)

Currently `run_beatbot` is just an alias to `listen_recognize_and_respond`, from earlier

In [None]:
run_beatbot = listen_recognize_and_respond

In [None]:
# # TESTING
# listen_recognize_and_respond(my_model, print_noise, device=0, duration=20)