In [0]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


# Processing new data into the training and testing sets. Do once, then save splits in .npz files

In [0]:
# only need if error occurs for pickled file
!pip install numpy==1.16.2  #need for loading pickled file
import numpy as np

In [0]:
datapath = '/content/drive/My Drive/DCASE19 ASC/LogTFSqueezedDoubleEmp/'  #set path to files

# using the DCASE 2019 train and evaluate data splits
trainpath = '/content/drive/My Drive/DCASE19 ASC/TAU-urban-acoustic-scenes-2019-development/evaluation_setup/fold1_train.csv'
testpath = '/content/drive/My Drive/DCASE19 ASC/TAU-urban-acoustic-scenes-2019-development/evaluation_setup/fold1_evaluate.csv'

import csv
import os
import numpy as np
training_set=[]
training_labels=[]
i=0
with open(trainpath,'r') as t:
  reader = csv.reader(t,delimiter='\t')
  next(reader)
  for row in reader:
    #filename=row[0]
    filename, _ = os.path.splitext(os.path.basename(row[0]))
    print(i,filename)
    temp = np.load(datapath+filename+'.npy') #loads file
    training_set.append(temp)
    training_labels.append(row[1])
    i=i+1

#if program freezes on the first file loading, restart runtime

In [0]:
testing_set=[]
testing_labels=[]
i=0
with open(testpath,'r') as t:
  reader = csv.reader(t,delimiter='\t')
  next(reader)
  for row in reader:
    #filename=row[0]
    filename, _ = os.path.splitext(os.path.basename(row[0]))
    print(i,filename)
    temp = np.load(datapath+filename+'.npy') #loads file
    testing_set.append(temp)
    testing_labels.append(row[1])
    i=i+1

In [0]:
testing_set[7].shape  # 1.(128,501), 

(128, 501)

In [0]:
###### used only for mel data as some are of different dimensions (128,501) vs (128,500)
# process the data into array format. reshape to fit NN input shape required

def even_out_list2array(training_set):   #use for melspectrogram data. some are uneven size (128,501) vs (128,500), remove 1 sample from extras
  #lst_=[]
  for i in range(len(training_set)):
    training_set[i] = training_set[i][0:128,0:500]
    #lst_.append(training_set)
  #X = np.asarray(lst_)
  #del lst_
  X = np.asarray(training_set)
  print(type(X))
  X = np.reshape(X, X.shape + (1,))
  print(X.shape)
  return X

X = even_out_list2array(training_set)  
X_test = even_out_list2array(testing_set)  




In [0]:
# fix input representations as proper datatype and shape
X=np.asarray(training_set)
#print(type(X))
X_test = np.array(testing_set)
#print(X[0].shape)
X = np.reshape(X, X.shape + (1,))  #reshape to (freq, time, 1) for Conv2D input
X_test = np.reshape(X_test,X_test.shape + (1,))
#print(X[0].shape)
print(X_test.shape)

(4185, 117, 234, 1)


In [0]:
# save the train and test data splits to drive file 
savepath = '/content/drive/My Drive/ICASSP_proj_data/'
np.savez(savepath+'training_set_LogTFSqueezedDoubleEmp', X=X, training_labels=training_labels)
np.savez(savepath+'testing_set_LogTFSqueezedDoubleEmp', X_test=X_test, testing_labels=testing_labels)

In [0]:
# check files loaded properly by plot
tmppsd = []
sampleaddrs=[] 
for i in range(0,10):
  tmppsd.append(X[np.argmax(Y==i)])  #find first instance where argument is true, return index
  sampleaddrs.append(training_labels[np.argmax(Y==i)])
  print(training_labels[np.argmax(Y==i)])

f = plt.figure(figsize=(50,10))
for i in range(0,10):
  psd = np.reshape(tmppsd[i], (128,500)) # (117,234))
  ax = f.add_subplot(5,2, i+1)
  ax.imshow(psd[0:78],cmap='gray_r', vmin=0, vmax=255, origin='lower')
  plt.title(sampleaddrs[i])
  #plt.subplot(5,2, i+1)
  #plt.imshow(psd[0:100],cmap='gray_r', vmin=0, vmax=255, origin='lower')
  #plt.title(sampleaddrs[i])
  

#f.subplots_adjust(hspace=2, wspace=0.3)
plt.show()


# Begin

In [0]:
!pip install -q keras

In [0]:
import matplotlib.pyplot as plt 
def confmat(cm, classes, normalize=False, title=None, cmap=plt.cm.Blues):  #use for nice plot of confusion matrix
  if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
  else:
        print('Confusion matrix, without normalization')
        
        
  fig, ax = plt.subplots()
  im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
  ax.figure.colorbar(im, ax=ax)
  # We want to show all ticks...
  ax.set(xticks=np.arange(cm.shape[1]),
         yticks=np.arange(cm.shape[0]),
         # ... and label them with the respective list entries
         xticklabels=classes, yticklabels=classes,
         title=title,
         ylabel='True label',
         xlabel='Predicted label')

  # Rotate the tick labels and set their alignment.
  plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
           rotation_mode="anchor")

  # Loop over data dimensions and create text annotations.
  fmt = '.2f' if normalize else 'd'
  thresh = cm.max() / 2.
  for i in range(cm.shape[0]):
      for j in range(cm.shape[1]):
          ax.text(j, i, format(cm[i, j], fmt),
                  ha="center", va="center",
                  color="white" if cm[i, j] > thresh else "black")
  fig.tight_layout()
  return ax

def plot_loss(history):
  plt.subplot(1,2,1)
  plt.plot(history.history['loss'])
  plt.plot(history.history['val_loss'])
  plt.title('model loss')
  plt.ylabel('loss')
  plt.xlabel('epoch')
  plt.legend(['train','validation'], loc='upper right')
  plt.subplot(1,2,2)
  plt.plot(history.history['acc'])
  plt.plot(history.history['val_acc'])
  plt.title('model acc')
  plt.ylabel('acc')
  plt.xlabel('epoch')
  plt.legend(['train','validation'], loc='lower right')
  plt.tight_layout()
  plt.show()  

In [0]:
def stats(mdl,mdlname,X_test):
  if mdlname:
    mdl.load_weights('/content/drive/My Drive/DCASE19 ASC/checkpoints/'+str(mdlname)+'.hdf5')
  from sklearn.metrics import accuracy_score, recall_score, precision_score, f1_score

  yhat_d = mdl.predict(X_test)
  pred_d = yhat_d.argmax(axis=1)
  acc = accuracy_score(Y_test,pred_d)*100
  rec = recall_score(Y_test,pred_d,average='weighted')
  pre = precision_score(Y_test,pred_d,average='weighted')
  f = f1_score(Y_test,pred_d,average='weighted')

  return acc, rec, pre, f
  '''
  print(accuracy_score(Y_test,pred_d)*100)
  print(classification_report(Y_test,pred_d,target_names=labels))
  cm_d = confusion_matrix(Y_test,pred_d)
  confmat(cm_d, labels)
  '''


In [0]:
def ci(data):  #determine 95% confidence interval 
  m = np.mean(data)
  s = np.std(data)
  from math import sqrt
  c = 1.96 *(s/sqrt(len(data)))  #95% confidence interval
  print('average',m,'with confidence interval',c)
  return c 

In [0]:
# load variable files of train and test set

import numpy as np
emp =3
if emp==1:
  tr = '/content/drive/My Drive/ICASSP_proj_data/training_set_TFSqueezedSingleEmp.npz'
  te = '/content/drive/My Drive/ICASSP_proj_data/testing_set_TFSqueezedSingleEmp.npz'
elif emp==2:
  tr = '/content/drive/My Drive/ICASSP_proj_data/training_set_TFSqueezedDoubleEmp.npz'
  te = '/content/drive/My Drive/ICASSP_proj_data/testing_set_TFSqueezedDoubleEmp.npz'
elif emp==3:
  tr = '/content/drive/My Drive/ICASSP_proj_data/training_set_LogTFSqueezedDoubleEmp.npz'
  te = '/content/drive/My Drive/ICASSP_proj_data/testing_set_LogTFSqueezedDoubleEmp.npz'
else:
  tr = '/content/drive/My Drive/ICASSP_proj_data/training_set_NormalPSD.npz'#PSDdB.npz'  #NormalPSD.npz'
  te = '/content/drive/My Drive/ICASSP_proj_data/testing_set_NormalPSD.npz'#PSDdB.npz'  #NormalPSD.npz'


npzfile1 = np.load(tr)
X = npzfile1['X']  #training features
training_labels=npzfile1['training_labels']

npzfile2 = np.load(te)
X_test = npzfile2['X_test']
testing_labels = npzfile2['testing_labels']
print(X.shape)
print(tr)

(9185, 117, 234, 1)
/content/drive/My Drive/ICASSP_proj_data/training_set_LogTFSqueezedDoubleEmp.npz


In [0]:
# checking data formatting 
print(type(X))
X = np.asarray(X)
X_test = np.asarray(X_test)
print(type(X))

<class 'numpy.ndarray'>
<class 'numpy.ndarray'>


In [0]:
# process the data labels. one hot encoding

from sklearn.preprocessing import LabelEncoder

Y=np.array(training_labels)
label_encoder = LabelEncoder()  #text to numeric label. alphabetical order
Y = label_encoder.fit_transform(Y)

Y_test=np.array(testing_labels)
Y_test = label_encoder.fit_transform(Y_test)

labels=np.unique(training_labels)

# checking the text label to integer to categorical worked
print(labels)
from keras.utils import to_categorical
Y_cat = to_categorical(Y)
Y_test_cat = to_categorical(Y_test)
print(Y_cat[3])
print(Y[3])
print(training_labels[3])

['airport' 'bus' 'metro' 'metro_station' 'park' 'public_square'
 'shopping_mall' 'street_pedestrian' 'street_traffic' 'tram']


Using TensorFlow backend.


[0. 0. 0. 0. 0. 0. 0. 1. 0. 0.]
7
street_pedestrian


In [0]:
freq = X[0].shape[0] #  no. of rows
tm = X[0].shape[1] # no. of columns
numClasses = len(labels)

import tensorflow as tf
from keras.models import Sequential, Model
from keras.layers import LSTM, Dense, Flatten, Bidirectional, Permute, Reshape, Activation, add
from keras.layers import Dropout, Conv2D, BatchNormalization, MaxPooling2D, TimeDistributed, AveragePooling2D, Input
from keras.callbacks import *
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report

#training parameters
ep = 200  # number of epochs
bs = 32  # batch size

#callbacks settings
def caller(name):
  filepath= '/content/drive/My Drive/DCASE19 ASC/checkpoints/'+name+'.hdf5'
  checkpoint = ModelCheckpoint(filepath, monitor='val_acc', mode='max',verbose=1, save_best_only=True)
  es = EarlyStopping(monitor='val_acc',mode='max',patience=10,verbose=1,restore_best_weights=True)

  callbacks_list = [es] #,checkpoint] 
  return callbacks_list


In [0]:
X.shape

(9185, 117, 234, 1)

# Model 1 - baseline


In [0]:
acc=[]  #use to track results
pre=[]
rec=[]
f=[]

In [0]:
#X_ = np.transpose(X, (0,2,1,3))  #234 x 117
#X_test_ = np.transpose(X_test, (0,2,1,3))

drate = 0.5

model = Sequential()  #initialize network 

model.add(Conv2D(filters=8, kernel_size=(3,3), activation='relu', input_shape=(freq, tm, 1)))  #original
#model.add(Conv2D(filters=8, kernel_size=(3,3), activation='relu', input_shape=(X_.shape[1], X_.shape[2], X_.shape[3])))  #transposed input
#model.add(Conv2D(filters=32, kernel_size=(7,7), activation='relu', input_shape=(freq,tm,1)))  #larger filter, kernel

model.add(MaxPooling2D(pool_size=(2,2)))
#model.add(BatchNormalization())  

model.add(Conv2D(filters=8, kernel_size=(3,3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2)))
#model.add(BatchNormalization())

model.add(Conv2D(filters=8, kernel_size=(3,3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2,2)))
#model.add(BatchNormalization())


model.add(TimeDistributed(Flatten()))  
# lstm input needs to be 3D

model.add(Dropout(drate))  

model.add(Bidirectional(LSTM(40,return_sequences=True)))


model.add(Dropout(drate))

model.add(Flatten())
model.add(Dense(numClasses,activation='softmax'))

callbacks_list = caller('model_')


model.compile(optimizer='Adam',loss='categorical_crossentropy',metrics=['acc'])  #original default

model.summary()



history = model.fit(X, Y_cat, batch_size=32, epochs=100, validation_data=(X_test,Y_test_cat)) #, callbacks=callbacks_list)
a,b,c,d = stats(model, None, X_test)
acc.append(a)
rec.append(b)
pre.append(c)
f.append(d)

#history = model.fit(X_, Y_cat, batch_size=32, epochs=100, validation_data=(X_test_,Y_test_cat)) #, callbacks=callbacks_list)



In [0]:
print(acc)
print(rec)
print(pre)
print(f)

In [0]:
ci(acc)
ci(pre)
ci(rec)
ci(f)

In [0]:
plot_loss(history)  

In [0]:
# to get confusion matrix
yhat_d = model.predict(X_test)
pred = yhat_d.argmax(axis=1)
cm_d = confusion_matrix(Y_test,pred)
confmat(cm_d, labels)

# Model 1 with skip-connection

In [0]:
input_this = Input(shape=(freq,tm,1))
#input_this = Input(shape=(X_.shape[1],X_.shape[2],1))

ins = Conv2D(filters=8, kernel_size=(3,3), activation='relu')(input_this)
ins = MaxPooling2D(pool_size=(2,2))(ins)
#ins = BatchNormalization()(ins)

# 1 resiudal
x = Conv2D(filters=8, kernel_size=(3,3), activation='relu', padding='same')(ins)  # res block, first conv.
x = MaxPooling2D(pool_size=(2,2), strides=1, padding='same')(x)  #set strides=1 to retain output shape same as input shape
#x = BatchNormalization()(x)

x = Conv2D(filters=8, kernel_size=(3,3), activation='relu', padding='same')(x) #res block, 2nd conv
x = MaxPooling2D(pool_size=(2,2), strides=1, padding='same')(x)
#x = BatchNormalization()(x)

#x = Conv2D(filters=8, kernel_size=(3,3), activation='relu', padding='same')(x) # block, 3rd conv
#x = BatchNormalization()(x)

x1 = add([ins,x])  # residual block output

x1 = TimeDistributed(Flatten())(x1)

x1 = Dropout(0.5)(x1)

x1 = Bidirectional(LSTM(40, return_sequences=True))(x1)

x1 = Dropout(0.5)(x1)

y = Flatten()(x1)

out = Dense(numClasses, activation='softmax')(y)


model_minires = Model(inputs = input_this, outputs = out)

model_minires.compile(optimizer='Adam', loss = 'categorical_crossentropy', metrics = ['acc'])
model_minires.summary()
callbacks_list = caller('model_minires')
history_minires = model_minires.fit(X, Y_cat, batch_size=bs, epochs=100, validation_data=(X_test,Y_test_cat), callbacks=callbacks_list)

In [0]:
plot_loss(history_minires)

In [0]:
# Load a checkpoint model weights
#model_minires.load_weights('/content/drive/My Drive/DCASE19 ASC/checkpoints/model_minires.hdf5')

yhat_minires = model_minires.predict(X_test)
pred_minires = yhat_minires.argmax(axis=1)
cm_minires = confusion_matrix(Y_test,pred_minires)
confmat(cm_minires, labels)
print(accuracy_score(Y_test,pred_minires)*100)
print(classification_report(Y_test,pred_minires,target_names=labels))

# Resnet

In [0]:
# resnet like model. keras functional

input_this = Input(shape=(freq,tm,1))

ins = Conv2D(filters=8, kernel_size=(3,3), activation='relu')(input_this)
#ins = MaxPooling2D(pool_size=(2,2))(ins)
ins = BatchNormalization()(ins)

# Residual block 1
x = Conv2D(filters=8, kernel_size=(3,3), padding='same', activation='relu')(ins)  #first res block, first conv
#x = MaxPooling2D(pool_size=(2,2),strides=1, padding='same')(x)
x = BatchNormalization()(x)
x = Conv2D(filters=8, kernel_size=(3,3), padding='same', activation='relu')(x) #first block, 2nd conv
#x = MaxPooling2D(pool_size=(2,2),strides=1, padding='same')(x)
x = BatchNormalization()(x)


x1 = add([ins,x])  #add two outputs/inputs for residual. First residual block output

x1 = Dropout(0.5)(x1)

y = Flatten()(x1)

out = Dense(numClasses, activation='softmax')(y)

model_res = Model(inputs = input_this, outputs = out)

model_res.compile(optimizer='Adam', loss = 'categorical_crossentropy', metrics = ['acc'])
model_res.summary()
callbacks_list = caller('model_res')
history_res = model_res.fit(X, Y_cat, batch_size=bs, epochs=100, validation_data=(X_test,Y_test_cat), callbacks=callbacks_list)
a,b,c,d = stats(model_res, None, X_test)

In [0]:
plot_loss(history_res)

In [0]:
#model_res.load_weights('/content/drive/My Drive/DCASE19 ASC/checkpoints/model_res.hdf5')

yhat_res = model_res.predict(X_test)
pred_res = yhat_res.argmax(axis=1)
cm_res = confusion_matrix(Y_test,pred_res)
confmat(cm_res, labels)
print(accuracy_score(Y_test,pred_res)*100)
print(classification_report(Y_test,pred_res,target_names=labels))

# CNN


In [0]:
modeld = Sequential()

modeld.add(Conv2D(filters=8, kernel_size=(3,3), activation='relu', input_shape=(freq,tm,1)))
modeld.add(MaxPooling2D(pool_size=(2,2)))

modeld.add(Conv2D(filters=8, kernel_size=(3,3), activation='relu'))
modeld.add(MaxPooling2D(pool_size=(2,2)))

modeld.add(Conv2D(filters=8, kernel_size=(3,3), activation='relu'))
modeld.add(MaxPooling2D(pool_size=(2,2)))  #CHANGED: was (4,100) (too big) just scaled down to (1,25)
modeld.add(Dropout(0.5))

modeld.add(Flatten())

modeld.add(Dense(numClasses, activation='softmax'))

modeld.compile(optimizer='Adam', loss = 'categorical_crossentropy', metrics = ['acc'])
modeld.summary()
callbacks_list = caller('modelcnn')
history_modeld = modeld.fit(X, Y_cat, batch_size=bs, epochs=100, validation_data=(X_test,Y_test_cat),callbacks=callbacks_list)
ac,bc,cc,dc = stats(modeld, None, X_test)



In [0]:
plot_loss(history_modeld)

In [0]:
#modeld.load_weights('/content/drive/My Drive/DCASE19 ASC/checkpoints/modeld.hdf5')

yhat_d = modeld.predict(X_test)
pred_d = yhat_d.argmax(axis=1)
cm_d = confusion_matrix(Y_test,pred_d)
confmat(cm_d, labels)
print(accuracy_score(Y_test,pred_d)*100)
print(classification_report(Y_test,pred_d,target_names=labels))


# RNN

In [0]:
# RESHAPE THE INPUT DIMENSIONS FOR LSTM USE (REMOVE THE CHANNEL DIMENSION)
Xr = np.reshape(X, (len(X), freq,tm) )
Xr = np.transpose(Xr, (0,2,1))
X_testr = np.reshape(X_test, (len(X_test),freq,tm))
X_testr = np.transpose(X_testr, (0,2,1))
# NOTE: TRY TRANSPOSING SO TIME DIMENSION IS FIRST SO INPUT SHAPE IS (BATCH, TIMESTEPS, INPUT DIME)
print(Xr.shape[1])

234


In [0]:
model_rnn = Sequential()

model_rnn.add(Bidirectional(LSTM(40, return_sequences=True), input_shape=(Xr.shape[1],Xr.shape[2])))  #lstm 1


model_rnn.add(Flatten())
model_rnn.add(Dense(numClasses, activation='softmax'))

model_rnn.compile(optimizer='Adam', loss = 'categorical_crossentropy', metrics = ['acc'])
model_rnn.summary()
callbacks_list = caller('model_rnn')


history_rnn = model_rnn.fit(Xr, Y_cat, batch_size=bs, epochs=25, validation_data=(X_testr,Y_test_cat),callbacks=callbacks_list)
ac,bc,cc,dc = stats(model_rnn, None, X_testr)



In [0]:
plot_loss(history_rnn)