# Test for GPU

In [None]:
!nvidia-smi

# Get Training Data

In [None]:
def get_file(filename, url):
    """
    get file with wget from url if the file does not exist
    """
    COMMAND = f"""if [ -f {filename} ];
    then
        echo '{filename} already exists.';
    else
        wget '{url}';
    fi"""
    ! eval "{COMMAND}"

url_prefix = "https://github.com/ntueecamp/22-software-workshop/releases/download/dataset/"

file_names = [
    "training_spectrogram.npz.aa",
    "training_spectrogram.npz.ab",
    "validation_spectrogram.npz",
    "test_spectrogram.npz"
]

for f in file_names:
    get_file(f, url_prefix+f)

! if [ -f training_spectrogram.npz ]; \
then \
    echo "training_spectrogram.npz already exists."; \
else \
    cat training_spectrogram.npz.a* > training_spectrogram.npz; \
    echo "done merging files"; \
fi

# Training

This treats the spectrograms of the words like images

In [None]:
# Import all the things we will need
import datetime
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import regularizers
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D, BatchNormalization
from tensorflow.data import Dataset
import matplotlib.pyplot as plt
import gc

In [None]:
# clear out any old logs
!rm -rf ./logs/ 

In [None]:
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))

In [None]:
# List of the words in categorical order
command_words = [
    'forward',
    'backward',
    'left',
    'right',
    '_invalid',
]

In [None]:
# Load up the sprectrograms and labels
training_spectrogram = np.load('training_spectrogram.npz')
validation_spectrogram = np.load('validation_spectrogram.npz')
test_spectrogram = np.load('test_spectrogram.npz')

In [None]:
# plot a distribution of the words
plt.hist(training_spectrogram['Y'], bins=range(0,len(command_words)+1), align='left')

In [None]:
unique, counts = np.unique(training_spectrogram['Y'], return_counts=True)
print(unique, counts)
dict(zip([command_words[i] for i in unique], counts))

In [None]:
# extract the data from the files
X_train = training_spectrogram['X']
X_validate = validation_spectrogram['X']
Y_train = tf.one_hot(training_spectrogram['Y'], len(command_words))
Y_validate = tf.one_hot(validation_spectrogram['Y'], len(command_words))

# get the width and height of the spectrogram "image"
IMG_WIDTH=X_train[0].shape[0]
IMG_HEIGHT=X_train[0].shape[1]

train_size = len(X_train)

In [None]:
# create the datasets for training
batch_size = 32

train_dataset = Dataset.from_tensor_slices(
    (X_train, Y_train)
).repeat(
    count=-1
).shuffle(
    train_size
).batch(
    batch_size
)

validation_dataset = Dataset.from_tensor_slices((X_validate, Y_validate)).batch(X_validate.shape[0]//10)

In [None]:
del X_train
del X_validate
del Y_train
del Y_validate

gc.collect()

In [None]:
model = Sequential([
    Conv2D(4, 3, 
           padding='same',
           activation='relu',
           kernel_regularizer=regularizers.l2(0.001),
           name='conv_layer1',
           input_shape=(IMG_WIDTH, IMG_HEIGHT, 1)),
    MaxPooling2D(name='max_pooling1', pool_size=(2,2)),
    Conv2D(4, 3, 
           padding='same',
           activation='relu',
           kernel_regularizer=regularizers.l2(0.001),
           name='conv_layer2'),
    MaxPooling2D(name='max_pooling3', pool_size=(2,2)),
    Flatten(),
    Dropout(0.1),
    Dense(
        80,
        activation='relu',
        kernel_regularizer=regularizers.l2(0.001),
        name='hidden_layer1'
    ),
    Dropout(0.1),
    Dense(
        len(command_words), 
        activation='softmax',
        kernel_regularizer=regularizers.l2(0.001),
        name='output'
    )
])
model.summary()

In [None]:
epochs=3

model.compile(optimizer='adam',
              loss=tf.keras.losses.CategoricalCrossentropy(),
              metrics=['accuracy'])

# Logging to tensorboard
We log the training stats along with the confusion matrix of the test data - should we be using the validation data

# Train model

In [None]:
model.fit(
    train_dataset,
    steps_per_epoch=train_size // batch_size,
    epochs=epochs,
    validation_data=validation_dataset,
    validation_steps=10,
)

In [None]:
model.save("trained.model")

# Testing the Model

In [None]:
X_test = test_spectrogram['X']
Y_test = tf.one_hot(test_spectrogram['Y'], len(command_words))

test_dataset = Dataset.from_tensor_slices((X_test, Y_test)).batch(len(X_test))

In [None]:
model2 =  keras.models.load_model("trained.model")

In [None]:
results = model2.evaluate(X_test, tf.cast(Y_test, tf.float32), batch_size=128)

In [None]:
predictions = model2.predict(X_test, 128)

In [None]:
import itertools


def plot_confusion_matrix(cm, class_names):
    """
  Returns a matplotlib figure containing the plotted confusion matrix.

  Args:
    cm (array, shape = [n, n]): a confusion matrix of integer classes
    class_names (array, shape = [n]): String names of the integer classes
  """
    cm = cm.numpy()
    # Normalize the confusion matrix.[c]rea
    cm = np.around(cm.astype("float") / cm.sum(axis=1)[:, np.newaxis], decimals=2)

    figure = plt.figure(figsize=(8, 8))
    plt.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
    plt.title("Confusion matrix")
    plt.colorbar()
    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=45)
    plt.yticks(tick_marks, class_names)

    # Use white text if squares are dark; otherwise black.
    threshold = cm.max() / 2.0
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        color = "white" if cm[i, j] > threshold else "black"
        plt.text(j, i, cm[i, j], horizontalalignment="center", color=color)

    plt.tight_layout()
    plt.ylabel("True label")
    plt.xlabel("Predicted label")
    plt.show()
#     return figure

In [None]:
cm = tf.math.confusion_matrix(
    labels=tf.argmax(Y_test, 1), predictions=tf.argmax(predictions, 1)
)

plot_confusion_matrix(cm, command_words)