This Jupyter notebook loads the raw amplitude and Mel spectrogram data files as numpy arrays.

Download the data files [here](https://console.cloud.google.com/storage/browser/cs181_practical_data).  This notebook assumes that the data files as located in the same directory.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pickle
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
from sklearn.metrics import balanced_accuracy_score # takes into account class sizes

### Load raw amplitude data.

In [None]:
# Load train data

X_amp_train = np.load("Xtrain_amp.npy")
y_amp_train = np.load("ytrain_amp.npy")

In [None]:
X_amp_train.shape

In [None]:
# Load test data

X_amp_test = np.load("Xtest_amp.npy")
y_amp_test = np.load("ytest_amp.npy")

In [None]:
X_amp_test.shape

### Load Mel spectrogram data.

In [None]:
# Load train data

X_mel_train = np.load("Xtrain_mel.npy")
y_mel_train = np.load("ytrain_mel.npy")

In [None]:
X_mel_train.shape

In [None]:
# Flatten X_mel_train's spectrogram features
X_mel_train_flat = X_mel_train.reshape(X_mel_train.shape[0], -1)
X_mel_train_flat.shape

In [None]:
# Load test data

X_mel_test = np.load("Xtest_mel.npy")
y_mel_test = np.load("ytest_mel.npy")

In [None]:
X_mel_test.shape

## Part A

#### Perform PCA on raw amplitude features
Compute a dimensionality reduction on the Xtrain_amp dataset and on the Xtest_amp dataset. This will allow us to not overload our model when training a logistic regression model with noisy data and only keep the 500 most significant components, which capture most of the variation.

### Make some exploratory plots of the data

In [None]:
# Make a plot of some of the raw frequencies so that we can explore the data
plt.figure(dpi=150, figsize=(12,4))
for i in range(3):
    
    # create our subplot
    plt.subplot(1,3,i+1)
    
    # plot our image
    plt.imshow(Xtrain_amp[i])
    
    # put our word label based on the 0 or 1
    if ytrain_amp[i] == 0:
        plt.xlabel("air_conditioner")
    elif ytrain_amp[i] == 1:
        plt.xlabel("car_horn")
    elif ytrain_amp[i] == 2:
        plt.xlabel("children_playing")
    elif ytrain_amp[i] == 3:
        plt.xlabel("dog_bark")
    elif ytrain_amp[i] == 4:
        plt.xlabel("drilling")
    elif ytrain_amp[i] == 5:
        plt.xlabel("engine_idling")
    elif ytrain_amp[i] == 6:
        plt.xlabel("gun_shot")
    elif ytrain_amp[i] == 7:
        plt.xlabel("jackhammer")
    elif ytrain_amp[i] == 8:
        plt.xlabel("siren")
    else:
        plt.xlabel("street_music")
        
    # beautify
    plt.xticks([])
    plt.yticks([])
    
# more beautifying
plt.suptitle("Select Raw Aplitude Images")
plt.tight_layout()
plt.show()

### Train a logistic regression model on the 500 most significant PCA components.

In [None]:
# create our PCA object that will calculate the first two components.
pca = PCA(n_components=500)

# fit our PCA to the training data, and transform our data into its 500-dimensional representation
# only want to fit to training data, we should not be getting any extra information from our test set
Xtrain_amp_pca = pca.fit_transform(Xtrain_amp)

# let's check the shape of our lower dim representation
print(f"Lower-Dim PCA Representation Train Data Shape: {Xtrain_amp_pca.shape}")


#### Visualize the first two components of the PCA to see what is being captured in the lower dimension

In [None]:
# scatter plot the two components just to see if the data is separable by eye
plt.figure(dpi=150, figsize=(8,6))

# iterate through all the digits
for sound_class in range(10):
    
    # get the samples that correspond to this digit
    samples = Xtrain_amp_pca[ytrain_amp == sound_class]
    
    # scatter plot the first two PCA component
    plt.scatter(samples[:,0], samples[:,1], label=str(sound_class))

# beautify
plt.xlabel("Principal Component 1")
plt.ylabel("Principal Component 2")
plt.legend(loc="upper right", ncol=2)
plt.title("2-Component PCA Visualization of Sound Classes")
plt.tight_layout()
plt.show()

In [None]:
# create our LogisticRegression classifier
lr_pca = LogisticRegression()

# fit our SVM model
model.fit(Xtrain_amp_pca, ytrain_amp)

# scale our testing X into a lower dimension
Xtest_amp_pca = pca.fit_transform(Xtest_amp)

# make our predictions          
lr_preds = model.predict(Xtest_amp_pca)
          
# calculate + display our accuracy
lr_accuracy = np.mean(ytest_amp == lr_preds)
print("Accuracy: ", lr_accuracy)


# calculate and report: precision, recall, f1, support
target_names = ['air_conditioner', 'car_horn', 'children_playing', 'dog_bark', 'drilling', 'engine_idling', 'gun_shot', 'jackhammer', 'siren', 'street_music']
print(classification_report(ytest_amp, lr_preds, target_names=target_names))

# calculate balanced accuracy, taking into account class sizes
print("Balanced Accuracy: ", round(balanced_accuracy_score(ytest_amp, lr_preds), 3))

### Perform PCA on the Mel spectogram features
Train a logistic regression model on the 500 most significant PCA components

In [None]:
# flatten the dataset, since SVM cannot take into account matrices
Xtrain_mel_flattened = Xtrain_mel.reshape(Xtrain_mel.shape[0], -1)
Xtest_mel_flattened = Xtest_mel.reshape(Xtest_mel.shape[0], -1)

# transform the dataset using PCA
Xtrain_mel_pca = pca.fit_transform(Xtrain_mel_flattened)

# fit the logistic regression model on the lower dimensional training data
lr_pca.fit(Xtrain_mel_pca, ytrain_mel)

# transform our test dataset into a lower dimension
Xtest_mel_pca = pca.fit_transform(Xtest_mel_flattened)

# make predictions for our test dataset from our Logistic regression fit on our train data
preds_mel = lr_pca.predict(Xtest_mel_pca)

# print the accuracy
print(f"Accuracy: {np.mean(preds_mel == ytest_mel)}")

# calculate and report: precision, recall, f1, support
target_names = ['air_conditioner', 'car_horn', 'children_playing', 'dog_bark', 'drilling', 'engine_idling', 'gun_shot', 'jackhammer', 'siren', 'street_music']
print(classification_report(ytest_mel, preds_mel, target_names=target_names))

# calculate balanced accuracy, taking into account class sizes
print("Balanced Accuracy: ", round(balanced_accuracy_score(ytest_mel, preds_mel), 3))

### Train nonlinear models on spectogram data

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

In [None]:
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

### First Model: Neural Network

In [None]:
# create a model
# select the hyperparameters we think appropriate
model = nn.Sequential(
    nn.Linear(in_features=44100, out_features=441),
    nn.ReLU(),
    nn.Linear(in_features=441, out_features=10),
)

In [None]:
# tell PyTorch to not track gradients
with torch.no_grad():
    
    # convert our input data to a float tensor
    # cannot pass numpy array into pytorch, have to BOTH introduce with torch.tensor but ALSO
    # have to convert to float
    inputs = torch.tensor(Xtest_amp).float()
    
    # pass our inputs straight into the model
    preds = model(inputs)
    
    # determine what shape the predictions are so we know what to do with them
    print(f"Shape of preds: {preds.shape}")


    # no need to softmax the output since we will just take the argmax
    # argmax and softmax would have the same highest value
    preds = preds.argmax(1)
    
    # check test accuracy
    print(f"test accuracy: {torch.sum(preds == torch.tensor(ytest_amp)) / ytest_amp.shape[0]}")
    


In [None]:
# moving the model to right device
model.to(device)

# specify loss function
loss_func = nn.CrossEntropyLoss()

# define our optimizer 
optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)

In [None]:
# metrics we want to collect
train_accuracy_list = []
train_loss_list = []
test_accuracy_list = []
test_loss_list = []


# train for a given number of epochs
for epoch in tqdm(range(20), desc="Epoch"):
        
    # get our input imgs and labels. We have to convert from NumPy (original) to torch.tensors()
    # PyTorch expects inputs as floats, and labels as longs (i.e., high-memory integers)
    inputs = torch.tensor(Xtrain_amp).float()
    labels = torch.tensor(ytrain_amp).long()
    
    # move our inputs and labels to the right device.
    inputs, labels = inputs.to(device), labels.to(device)

    # reset the gradient 
    optimizer.zero_grad()
    
    # compute our forward propogation
    # outputs are not softmaxed
    outputs = model(inputs) 
    
    # calculate the cross-entropy loss
    # this automatically applies softmax
    loss = loss_func(outputs, labels) 
    
    # calculate backpropogation
    # takes the gradient of the loss function w.r.t. all the parameters
    loss.backward() 
    
    # make a small step update to our parameters
    optimizer.step() 
    

    # update our train_loss
    # only inclue the .item() to extract the pure value from the tensor
     train_loss_list.append(loss.item()) 

    # calculate + record our train + test accuracy. 
    # we have finished training here, only taking eval metrics
    with torch.no_grad():

        # get our predictions with the current weights
        # we only want the indices, not the values
        _, predicted = torch.max(outputs.data, 1)
        
        # get our train accuracy
        train_accuracy = torch.sum(predicted == labels) / labels.size(0)
        
        # add to our list of accuracies
        train_accuracy_list.append(train_accuracy)
            
        # get our test inputs and labels same process as earlier
        test_inputs = torch.tensor(Xtest_amp).float()
        test_labels = torch.tensor(ytest_amp).long()

        # move to the right device
        test_inputs, test_labels = test_inputs.to(device), test_labels.to(device)

        # run our test set inputs through the network
        test_outputs = model(test_inputs)

        # get our test_loss
        test_loss = loss_func(test_outputs, test_labels)

        # record our test_loss
        # .item() tells pytorch we only want the value
        test_loss_list.append(test_loss.item())

        # make our predictions based on max
        _, test_predicted = torch.max(test_outputs.data, 1)

        # get our train accuracy
        test_accuracy = torch.sum(test_predicted == test_labels) / test_labels.size(0)
        
        # add to our list
        test_accuracy_list.append(test_accuracy)

In [None]:
fig, ax = plt.subplots(1, 2, figsize=(16, 5), dpi=200)

ax[0].plot(train_accuracy_list, label="Train Accuracy")
ax[0].plot(test_accuracy_list, label="Test Accuracy")
ax[0].set_title("Accuracy Over Epochs")
ax[0].legend()

ax[1].plot(train_loss_list, label="Train Loss")
ax[1].plot(test_loss_list, label="Test Loss")
ax[1].set_title("Loss Over Epochs")
ax[1].legend()

plt.suptitle("Simple nn.Sequential Fully-Connected Neural Network")
plt.tight_layout()
plt.show()

### Second Model: Random Forest

In [None]:
from sklearn.ensemble import RandomForestClassifier
# Try out a random forest model for classification
# RandomForest
model = RandomForestClassifier(n_estimators=500, max_depth=None, n_jobs=-1)
model.fit(Xtrain_amp, ytrain_amp)

# make our predictions
rf_preds = model.predict(Xtest_amp)
print(np.mean(rf_preds == ytest_amp))

### Third Model: SVM

In [None]:
from sklearn.svm import SVC

# create our SVM classifier
# C is proxy of soft or hard margin
model = SVC(C=2.0, kernel="rbf")

# SVM requires that each datapoint be a vector, and not a picture / matrix of pixels
# use Xtrain_mel_flattened from above
# fit our SVM model
model.fit(Xtrain_mel_flattened, ytrain_mel)

# make our predictions
# must also use the Xtest_mel_flattened
SVM_preds = model.predict(Xtest_mel_flattened)
          
# calculate + print our accuracy
SVM_accuracy = np.mean(ytest_bc == SVM_preds)
print(SVM_accuracy)

### Hyperparameter Tuning and Validation

Perform a hyperparameter search to maximize predictive accuracy for two model classes of your choice. 

### GridSearch for SVM

In [None]:
# these are the settings that we will tune: 'C', 'kernel'
param_grid = {'C' : [0.01, 0.05, 0.1, 0.5, 1.0], 
              'kernel' : ['rbf', 'sigmoid'],}

SVM = SVC()

# instantiate our gridsearch estimator
# cv=None defaults to the 5-fold cross validation
SVM_CV = GridSearchCV(estimator=SVM, param_grid=param_grid, n_jobs=-1, cv=None, verbose=1)
SVM_CV.fit(Xtrain_mel_flattened, ytrain_mel)

# for each of possible combinations, trains on 9/10, test on 1/10

# convert our results to a pd.DataFrame
SVM_results = pd.DataFrame(SVM_CV.cv_results_).sort_values(by=['rank_test_score'])
SVM_results.head(5)

### GridSearch for RandomForest

In [None]:
param_grid = {'n_estimators' : [50, 100, 500, 1000], 
              'max_depth' : ['None', ],}

# instantiate the model
RF= RandomForestClassifier()

# instantiate our gridsearch estimator
RF_CV = GridSearchCV(estimator=RF, param_grid=param_grid, n_jobs=-1, cv=None, verbose=1)
RF_CV.fit(Xtrain_mel_flattened, ytrain_mel)

# convert results to pd.DataFrame
RF_results = pd.DataFrame(RF_CV.cv_results_).sort_values(by=['rank_test_score'])
RF_results.head(5)

## CNN on Spectogram Data

In [None]:
# inherits the nn.Module class
class CNN(nn.Module):
    # only need the constructor for what layers you want 
    # and forward for how you want to feed forward
    # everything else for backprop, etc is inherited from nn.Module
    
    # constructor
    def __init__(self):
        
        # call the parent constructor (for nn.Module)
        super().__init__()
        
        ## first set of convolution + pooling: see documentation about specifics
        # conv2d = built in CNN
        # out_channels = we will find 16 filters
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3)
        
        ## to increase robustness ... again, this is optional reading
        self.pool1 = nn.MaxPool2d(2)
        
        # second set of convolution + pooling
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3)
        self.pool2 = nn.MaxPool2d(2)
        
        # dropout layer to randomly drop neurons: for regularization
        # randomly drops out parameters so that we don't overfit
        self.dropout = nn.Dropout()
        
        ## end with three linear layers, this is general practice
        self.fc1 = nn.Linear(800, 500)
        self.fc2 = nn.Linear(500, 100)
        self.fc3 = nn.Linear(100, 10)
    
    # define how the neural network processes ONE data point (which generalizes instantly into a whole dataset)
    # aka: given x, how do we get the output? Just pass it through the layers.
    def forward(self, x):
        
        # first set of convolution + pooling
        x = self.conv1(x)
        x = self.pool1(x)
        x = torch.relu(x)
        
        # second set of convolution + pooling
        x = self.conv2(x)
        x = self.pool2(x)
        x = torch.relu(x)
        
        # the drop out layer
        x = self.dropout(x)
        
        # flatten + linear layers
        x = torch.flatten(x, start_dim=1)
        x = self.fc1(x)
        x = torch.relu(x)
        x = self.fc2(x)
        x = torch.relu(x)
        x = self.fc3(x)
        
        return x

In [None]:
# instantiate our model
model_cnn = CNN()

# moving the model to right device
model_cnn.to(device)

# specify our loss function
loss_func = nn.CrossEntropyLoss()

# define our optimizer - could also do Adam, RMSprop, SGD
optimizer = optim.Adam(model_cnn.parameters(), lr=1e-3, weight_decay=1e-5)

In [None]:
# metrics we want to collect
train_accuracy_list = []
train_loss_list = []
test_accuracy_list = []
test_loss_list = []


# train for a given number of epochs
for epoch in tqdm(range(20), desc="Epoch"):
        
    # query data for inputs (images) + labels: [inputs, labels]
    inputs = torch.tensor(Xtrain_mel).float()
    labels = torch.tensor(ytrain_mel).long()
    
    # move to the right device.
    inputs, labels = inputs.to(device), labels.to(device)

    # reset the gradient
    optimizer.zero_grad()

    # forward prop, backward prop, make incremental step
    outputs = model_cnn(inputs) # implicitly calls the .forward() function
    loss = loss_func(outputs, labels) # calculate the loss
    loss.backward() # calculate the gradient
    optimizer.step() # take our incremental update of the parameters

    # update our train_loss
    train_loss_list.append(loss.item()) 

    # calculate + record our train + test accuracy (TRAINING!)
    with torch.no_grad():

        # get our predictions with the current weights: torch.max returns values, indices
        _, predicted = torch.max(outputs.data, 1)
        
        # get our train accuracy
        train_accuracy = torch.sum(predicted == labels) / labels.size(0)
        
        # add to our list
        train_accuracy_list.append(train_accuracy)
            
        # get our test inputs and labels
        test_inputs = torch.tensor(Xtest_digits_nn).float()
        test_labels = torch.tensor(ytest_digits).long()

        # move to the right device
        test_inputs, test_labels = test_inputs.to(device), test_labels.to(device)

        # run our test set inputs through the network
        test_outputs = model_cnn(test_inputs)

        # get our test_loss
        test_loss = loss_func(test_outputs, test_labels)

        # record our test_loss
        test_loss_list.append(test_loss.item())

        # make our predictions based on max.
        _, test_predicted = torch.max(test_outputs.data, 1)

        # get our train accuracy
        test_accuracy = torch.sum(test_predicted == test_labels) / test_labels.size(0)
        
        # add to our list
        test_accuracy_list.append(test_accuracy)

In [None]:
fig, ax = plt.subplots(1, 2, figsize=(16, 5), dpi=200)

ax[0].plot(train_accuracy_list, label="Train Accuracy")
ax[0].plot(test_accuracy_list, label="Test Accuracy")
ax[0].set_title("Accuracy Over Epochs")
ax[0].legend()

ax[1].plot(train_loss_list, label="Train Loss")
ax[1].plot(test_loss_list, label="Test Loss")
ax[1].set_title("Loss Over Epochs")
ax[1].legend()

plt.suptitle("Custom Convolutional Neural Network")
plt.tight_layout()
plt.show()

In [None]:
# make our test predictions with NO GRADIENT!
with torch.no_grad():
    
    # set our model to evaluation mode
    # don't do anything but make predictions from here on
    model_cnn.eval()
    
    # get our test inputs and labels
    test_inputs = torch.tensor(Xtest_mel).float()
    test_labels = torch.tensor(ytest_mel).long()
    
    # make our predictions and get the integer classes using argmax this time
    # find the biggest value for that class
    ypreds_test = model_cnn(test_inputs).argmax(dim=1)
    
    # convert our predictions into numpy
    # detach function removes what we just calcualted and jut gives us the value
    # need to convert back to numpy for everything to play nicely together bc sklearn is where 
    # a lot of our metrix are
    ypreds_test = ypreds_test.detach().numpy()
    
# calculate our test accuracy
print(f"Accuracy: {np.mean(ypreds_test == ytest_mel)}")

In [None]:
# compute + display our confusion matrix
plt.figure()
cfm = confusion_matrix(ytest_digits, ypreds_test)
sns.heatmap(cfm, annot=True, fmt='g')
plt.title("Confusion Matrix of NY Sounds CNN")
plt.xlabel("Predicted Class")
plt.ylabel("True Class")
plt.show()