In [1]:
import os
import wave
from scipy.io import wavfile
import sys
import librosa
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models, datasets
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_recall_fscore_support
import nlpaug.augmenter.audio as naa

In [2]:
def get_files_in_folder(folder_path):
    file_names = []
    file_paths = []

    # Loop through all files in the folder
    for filename in os.listdir(folder_path):
        file_paths.append(os.path.join(folder_path, filename))

    return file_paths
    
def create_training(labels, n, folder_path_ini):
    X_file_paths = []
    Y = []
    cnt = 0
    for j in["go","down","left","right"]:
        temp = []
        folder_path = folder_path_ini + j
        temp.append(get_files_in_folder(folder_path))
        temp = temp[-1]
        temp = temp[0:n]
        for i in temp:
            X_file_paths.append(i)
            Y.append(j)
        
    return X_file_paths, Y

def create_log_mel_spectrogram(X_file_paths, n_mels, target_len, threshold):
    
    X = []
    cnt = 0
    for file_path in X_file_paths:
        y, sr = librosa.load(file_path)
        aug = naa.VtlpAug(sampling_rate=sr, factor=(0.9, 1.1))
        augmented_y = aug.augment(y)
        #maintain the length of the sound upto a certain target len by padding or trimming
        y = librosa.util.fix_length(augmented_y[0], size = target_len)
        mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr, n_mels=n_mels)
        # Convert to log scale
        log_mel_spectrogram = np.abs(librosa.power_to_db(mel_spectrogram, ref=np.max))
        stand_spectrogram = standardize_spectrogram(log_mel_spectrogram)
        trim_spectrogram = trim_low_power(stand_spectrogram, threshold, n_mels) 
        X.append(trim_spectrogram)
    X = np.stack(X)
    return X

def trim_low_power(stand_spectogram, threshold, n_mels):
    
    tp = np.sum(stand_spectogram, axis = 0)
    tp = np.where(tp<threshold*np.max(tp), tp, 0)
    for j in range(len(tp)):
        if(tp[j] == 0):
            for k in range(n_mels):
                stand_spectogram[k][j] = 0
    return stand_spectogram

def standardize_spectrogram(spectrogram):
    
    mean = np.mean(spectrogram)
    std = np.std(spectrogram)
    standardized_spectrogram = (spectrogram - mean) / std
    return standardized_spectrogram

def one_h_t(Y):
    enc = OneHotEncoder()
    labels_reshaped = np.array(Y).reshape(-1,1)
    Y = enc.fit_transform( labels_reshaped ).toarray()
    return Y

def shuffle_data(X, Y):
    perm = np.random.permutation(len(Y))
    shuffled_X = X[perm]
    shuffled_Y = Y[perm]
    return shuffled_X, shuffled_Y

def split_data(X,Y,test_size):
    X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2)
    return X_train, X_test, Y_train, Y_test

def CNN(input_shape, output_shape):
    
    model = models.Sequential()
    
    # Convolutional layers
    model.add(layers.Conv2D(32, 3, activation='relu', input_shape=input_shape))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Conv2D(64, (3, 3), activation='relu'))

    # Dense layers
    model.add(layers.Flatten())
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.Dense(output_shape, activation='softmax'))  

    return model

def test_results(Y_pred,Y_test):
    Y_pred = tf.argmax(Y_pred, axis=1).numpy()
    Y_test = tf.argmax(Y_test, axis=1).numpy()
    results = precision_recall_fscore_support(Y_test, Y_pred, average='micro')
    return results

In [3]:
n = 300
n_mels = 128
target_len = 22050
test_size = 0.2
threshold = 0.9

folder_path_ini = "D:/mini_speech_commands/mini_speech_commands\\"
labels = ["go","down","left","right"]
X_file_paths, Y = create_training(labels, n, folder_path_ini)
X = create_log_mel_spectrogram(X_file_paths, n_mels, target_len, threshold)
Y = one_h_t(Y)
X, Y  = shuffle_data(X, Y)
X_train, X_test, Y_train, Y_test = split_data(X, Y, test_size=0.3)



In [4]:
model = CNN((X.shape[1], X.shape[2], 1), Y.shape[-1])
model.compile(loss=tf.keras.losses.categorical_crossentropy,
              optimizer=tf.keras.optimizers.Adam(),
              metrics=['accuracy'])
model.summary()
# Train the model
history = model.fit(X_train, Y_train, epochs=15)
# Evaluate the model
Y_pred = model.predict(X_test)
results = test_results(Y_pred,Y_test)
print(results)

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 126, 42, 32)       320       
                                                                 
 max_pooling2d (MaxPooling2D  (None, 63, 21, 32)       0         
 )                                                               
                                                                 
 conv2d_1 (Conv2D)           (None, 61, 19, 64)        18496     
                                                                 
 max_pooling2d_1 (MaxPooling  (None, 30, 9, 64)        0         
 2D)                                                             
                                                                 
 conv2d_2 (Conv2D)           (None, 28, 7, 64)         36928     
                                                                 
 flatten (Flatten)           (None, 12544)             0