<a href="https://colab.research.google.com/github/usef-kh/SpeechRecognition/blob/master/CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Load Kaggle API Token
from google.colab import files
!pip install -q kaggle > /dev/null
uploaded = files.upload()
!mkdir ~/.kaggle
!cp /content/kaggle.json ~/.kaggle/kaggle.json
!chmod 600 /root/.kaggle/kaggle.json

# Load Data
!kaggle competitions download -c tensorflow-speech-recognition-challenge > /dev/null

# Unzip Data
!apt-get install p7zip-full > /dev/null
!p7zip -d train.7z


mkdir: cannot create directory ‘/root/.kaggle’: File exists
100% 1.04G/1.04G [00:08<00:00, 127MB/s]

7-Zip (a) [64] 16.02 : Copyright (c) 1999-2016 Igor Pavlov : 2016-05-21
p7zip Version 16.02 (locale=en_US.UTF-8,Utf16=on,HugeFiles=on,64 bits,2 CPUs Intel(R) Xeon(R) CPU @ 2.30GHz (306F0),ASM,AES-NI)

Scanning the drive for archives:
  0M Scan         1 file, 1121103842 bytes (1070 MiB)

Extracting archive: train.7z
--
Path = train.7z
Type = 7z
Physical Size = 1121103842
Headers Size = 389133
Method = Delta LZMA2:24
Solid = +
Blocks = 2

  0%    
Would you like to replace the existing file:
  Path:     ./train/LICENSE
  Size:     18651 bytes (19 KiB)
  Modified: 2017-11-13 21:13:13
with the file from archive:
  Path:     train/LICENSE
  Size:     18651 bytes (19 KiB)
  Modified: 2017-11-13 21:13:13
? (Y)es / (N)o / (A)lways / (S)kip all / A(u)to rename all / (Q)uit? S

  0% 33 . train/LICENSE                        

<!-- ## Useful Imports and Functions -->

In [None]:
import librosa
import librosa.display
from scipy import signal
import numpy as np
from keras import backend as K
from sklearn.model_selection import train_test_split
from scipy.io import wavfile
import time
from IPython.display import Audio
from os import walk
from matplotlib import pyplot as plt
import csv
import keras

Using TensorFlow backend.


### Loading Data Helper funcitons

In [None]:
def generateFiles():
  train_audio_path = '/content/train/audio/'

  # Load all filenames into a dictionary so we can call on them easily
  files = {}
  for (dirpath, dirnames, filenames) in walk(train_audio_path):
    files[dirpath[21:]] = filenames

  files.pop('')
  files['_background_noise_'].remove('README.md')

  return files

In [None]:
def get_wav(file_name, nsamples=16000):
  wav = wavfile.read(file_name)[1]
  
  if wav.size < nsamples:
      audio = np.pad(wav, (nsamples - wav.size, 0), mode='constant')
  else:
      audio = wav[0:nsamples]
  return audio

def get_noise(filename, nsamples=16000, stepSize = 1000):
    wav = wavfile.read(filename)[1]

    noise = []
    for i in range((len(wav)-nsamples) // stepSize):
      start = i*stepSize
      subsample = wav[start: start + nsamples]
      
      if len(subsample) < nsamples:
          subsample = np.pad(wav, (nsamples - subsample.size, 0), mode='constant')
      noise.append(subsample)
    
    return noise

In [None]:
def loadData(files, withNoise = True):
  train_audio_path = '/content/train/audio/'
  
  xtrain, ytrain, noiseArray = [], [], []
  for label, filenames in files.items():
    if label == '_background_noise_' and withNoise:
  
      for filename in filenames:
          noise = get_noise(train_audio_path + label + '/' + filename)
        
          xtrain.extend(noise)
          ytrain.extend(['silence']*len(noise))
    
    else:
      for filename in filenames:
        audio = get_wav(train_audio_path + label + '/' + filename)
        xtrain.append(audio)
        ytrain.append(label)
    
  return np.array(xtrain).astype(np.float32), np.array(ytrain)

### Preprocessing

In [None]:
def onehot(A, mapping=None):
  labels = set(['yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go', 'silence', 'unknown'])

  if mapping is None:
    mapping = {}
    maptolable = {}
    for i, label in enumerate(labels):
      temp = [0] * len(labels)
      temp[i] = 1
      mapping[label] = temp
      maptolable[i] = label
      
  res = []
  for label in A:
    if label in labels:
      res.append(mapping[label])
    else:
      res.append(mapping['unknown'])
  
  return np.array(res), mapping, maptolable

### Other Functions

In [None]:
def f1(y_true, y_pred):
  """Macro F1 Score

  A custom metric function that computes the average of the f1 scores 
  across all classesin a multiclass classificaiton problem
  """
  def recall(y_true, y_pred):
    """Recall metric.

    Only computes a batch-wise average of recall.

    Computes the recall, a metric for multi-label classification of
    how many relevant items are selected.
    """
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

  def precision(y_true, y_pred):
    """Precision metric.

    Only computes a batch-wise average of precision.

    Computes the precision, a metric for multi-label classification of
    how many selected items are relevant.
    """
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision
    
  precision = precision(y_true, y_pred)
  recall = recall(y_true, y_pred)
  return 2*((precision * recall)/(precision + recall + K.epsilon()))

In [None]:
def swish(x, beta = 1):
    return (x * K.sigmoid(beta * x))

# Getting the Custom object and updating them 
from keras.utils.generic_utils import get_custom_objects 
from keras.layers import Activation 
  
# Below in place of swish you can take any custom key for the name  
get_custom_objects().update({'swish': Activation(swish)}) 

In [None]:
def performance_curves(history, metrics):
  def generate_plot(metric):
    train = history.history[metric]
    val = history.history['val_' + metric]
    x_axis = range(1, len(history.history[metric])+1)

    plt.figure()
    plt.plot(x_axis, train, label="Training " + metric)
    plt.plot(x_axis, val, label="Validation " + metric)

    plt.ylabel(metric)
    plt.xlabel('Epochs')
    plt.title('Epochs vs ' + metric)
    plt.legend()
    plt.show()

    if metric == 'loss':
      print('Minimum Validation Loss is:', str(min(val)))
      print('Epoch: ', str(val.index(min(val)) + 1))
    else:
      print('Maximum', metric, 'is:', str(max(val)))
      print('Epoch: ', str(val.index(max(val)) + 1))
  
  for metric in ['loss'] + metrics:
    generate_plot(metric)

## Implementing CNN

### Generating Dataset

In [None]:
files = generateFiles()
X, Y = loadData(files)

debug = False
if debug:
  # Look at dataset size
  print(len(X), len(Y))
  count = 0
  for label, samples in files.items():
    print(label, '\t', len(samples))
    count += len(samples)

  print("Number of Audio signals:", count)

  


64727 64727
yes 	 2377
house 	 1750
five 	 2357
cat 	 1733
four 	 2372
six 	 2369
two 	 2373
wow 	 1745
nine 	 2364
happy 	 1742
eight 	 2352
off 	 2357
dog 	 1746
_background_noise_ 	 6
up 	 2375
go 	 2372
right 	 2367
three 	 2356
bird 	 1731
marvin 	 1746
on 	 2367
tree 	 1733
left 	 2353
zero 	 2376
seven 	 2377
one 	 2370
bed 	 1713
sheila 	 1734
down 	 2359
stop 	 2380
no 	 2375
Number of Audio signals: 64727


In [None]:
Xfinal = X.reshape(-1, 16000, 1)

Yonehot, mapping, maptolable = onehot(Y)
Yonehot = Yonehot.reshape(-1, 1, 12)

xtrain, xval, ytrain, yval = train_test_split(Xfinal, Yonehot, test_size=0.2, random_state=127)

print('Number of Dimensions:', X.ndim)
print('Dataset')
print('\t', Xfinal.shape, '\t', Yonehot.shape)

print('\nTraining Data')
print('\t', xtrain.shape, '\t', ytrain.shape)

print('\nValidation Data')
print('\t', xval.shape, '\t', yval.shape)

### Building & Training Model

In [None]:
from keras.models import Sequential
from keras.layers import LSTM, Dense, Activation, Flatten, Dropout, BatchNormalization, Conv1D, MaxPooling1D
from keras.layers import Dense, Activation, Flatten, Dropout, BatchNormalization, Conv1D, MaxPooling1D
from tensorflow.keras.callbacks import TensorBoard
from keras.regularizers import l2

model = Sequential()
model.add(Conv1D(32, kernel_size=10, strides = 4, activation = 'relu', input_shape = xtrain.shape[1:]))
model.add(BatchNormalization())
model.add(MaxPooling1D(2))
model.add(Dropout(0.2))

model.add(Conv1D(128, kernel_size=1, strides = 1, activation = 'relu'))
model.add(BatchNormalization())
model.add(Dropout(0.2))

model.add(Conv1D(32, kernel_size=10, strides = 4, activation = 'relu'))
model.add(BatchNormalization())
model.add(MaxPooling1D(3))
model.add(Dropout(0.2))

model.add(Conv1D(128, kernel_size=1, strides = 1, activation = 'relu'))
model.add(BatchNormalization())
model.add(Dropout(0.2))

model.add(Conv1D(64, kernel_size=10, strides = 4, activation = 'relu'))
model.add(BatchNormalization())
model.add(MaxPooling1D(2))
model.add(Dropout(0.2))

model.add(Conv1D(128, kernel_size=10, strides = 4, activation = 'relu'))
model.add(BatchNormalization())
model.add(MaxPooling1D(2))
model.add(Dropout(0.2))

model.add(Dense(32, activation = 'relu'))
model.add(Dropout(0.2))

model.add(Dense(len(mapping), activation = 'softmax'))

model.summary()

model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=[f1, 'accuracy'])

In [None]:
history = model.fit(xtrain, ytrain, batch_size=128, validation_data=(xval, yval), epochs=85, shuffle=True, verbose=1)
performance_curves(history, ['accuracy', 'f1'])

In [None]:
model.save('CNN')

## Generating Test Data Predictions

In [None]:
# !p7zip -d test.7z > /dev/null

In [None]:
# mappings = {}
# maptoLabel = {}
# with open('mappings.csv', mode='r') as infile:
#     reader = csv.reader(infile)

#     for i, row in enumerate(reader):
#         temp = [0]*12
#         temp[i] = 1
#         mappings[row[0]] = temp
#         maptoLabel[i] = row[0]

# test = []
# with open('sample_submission.csv',mode='r') as infile:
#     reader = csv.reader(infile)

#     for row in reader:
#         test.append(row[0])

# test.pop(0)
# print(len(test), test[0])


In [None]:
# model = keras.models.load_model('CNN', custom_objects={'f1':f1})

# test_audio_path = '/content/test/audio'

# ypred = []
# for filename in test:
#   audio = get_wav(test_audio_path + '/' + filename)
#   audio = audio.reshape(-1, 16000, 1)
#   pred = model.predict(audio)
#   ypred.append(np.argmax(pred[0][0], axis=0))

In [None]:
# import pandas as pd
# final = []

# for pred in ypred:
#   final.append(maptoLabel[pred])


# df = pd.DataFrame(final)


# df.to_csv ('final_df.csv', index = False)

## Data Augmentation

In [None]:
# import random

noise = []
noisyX = []
noisyY = []
train_audio_path = '/content/train/audio/'
  
for filename in files['_background_noise_']:
  noise_samples = get_noise(train_audio_path + '_background_noise_/' + filename)
  noise.append(noise_samples)

lennoise = min([len(noise[i]) for i in range(6)])
  for x, y in zip(X, Y):
    for i in range(6):
      idx = random.randint(1, lennoise)
      noise_sample += noise[i][idx]
    noisyX.append(x + noise_sample * random.random()/60)
    noisyY.append(y)

print(len(X), len(Y))
print(len(noisyX), len(noisyY))

import IPython.display as ipd
ipd.Audio(noisyX[1], rate=16000)

ipd.Audio(X[0], rate=16000)