<a href="https://colab.research.google.com/github/phumipatc/CU_Submissions/blob/master/AI/Sound_to_Dementia.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Dataset: DementiaBank**
https://dementia.talkbank.org/


English Pitt Corpus: Cookie theft task
* https://dementia.talkbank.org/access/English/Pitt.html
* Dementia vs control



Preparing environment

In [None]:
%pip install openl3

In [None]:
%pip install nvidia-cudnn-cu11==8.6.0.163 tensorflow==2.12.*

In [None]:
address_sample_path = '/kaggle/input/dementia-adress-m-train/train/'
address_sample_csv_path = '/kaggle/input/dementia-train-groundtruth/'

# **Audio Embedding**
# OpenL3
* https://openl3.readthedocs.io/en/latest/tutorial.html
* http://www.justinsalamon.com/uploads/4/3/9/4/4394963/cramer_looklistenlearnmore_icassp_2019.pdf

# AudioSet
* https://github.com/tensorflow/models/tree/master/research/audioset
* Use vggish
* Or, https://tfhub.dev/google/vggish/1

# Other embedding models
* https://tfhub.dev/s?module-type=audio-embedding


In [None]:
%pwd

In [None]:
# need to be run

import os

address_sample_list = []
address_sample_name = []
for fName in os.listdir(address_sample_path):
#   check if fName is file
	if os.path.isfile(os.path.join(address_sample_path, fName)):
		address_sample_name.append(fName)
		address_sample_list.append(address_sample_path + fName)
# print size of list
address_sample_name.sort()
address_sample_list.sort()
print(len(address_sample_list))
print(address_sample_name)

Before embedded audio, each audio file need to be at the same length. We are going to pad the audio file

In [None]:
import numpy as np
import random

# padding the audio file
def pad_audio(audio, sr, duration):
	padding_samples = duration - len(audio)
	if padding_samples <= 0:
		return audio
	else:
		return np.pad(audio, (0, padding_samples), 'constant')

def cut_audio(audio, sr, duration):
	if(len(audio) <= duration):
		return pad_audio(audio, sr, duration)
	frontCut = int((len(audio) - duration)*0.5)
	backCut = frontCut + duration
	return audio[frontCut:backCut]

In [None]:
import librosa
import gc

min_duration = 1e9
max_duration = 0
mean_duration = 0
lenList = []
for sample in address_sample_list[:]:
	audio, sr = librosa.load(sample, sr = 8000)
#	print(audio.shape)
	min_duration = min(min_duration, len(audio))
	max_duration = max(max_duration, len(audio))
	mean_duration += len(audio)
	lenList.append(len(audio))
mean_duration /= len(address_sample_list)
median_duration = np.median(lenList)
print(min_duration, max_duration, mean_duration, median_duration)

del lenList
gc.collect()

In [None]:
!pwd

In [None]:
import pickle
import openl3
import gc

for i in range(0,len(address_sample_name)):
	print(str(i) + ': processing ' + address_sample_name[i])
	sample = address_sample_list[i]
	audio, sr = librosa.load(sample, sr=8000)
	if len(audio.shape) > 1:
		audio = audio.mean(axis=1)
	audio = cut_audio(audio, sr, int(median_duration))
	try:
		embedding, timestamps = openl3.get_audio_embedding(audio, sr)
		try:
			with open(address_sample_name[i].split('.')[0] + '.pkl' , 'wb') as f:
				pickle.dump((embedding, timestamps), f)
			del f
		except Exception as error:
			print('error saving pickle file for ' + address_sample_name[i])
			print(error)
			break
		del embedding
		del timestamps
	except Exception as error:
		print('error getting audio embedding from' + address_sample_name[i])
		print(error)
		break
	del sample
	del audio
	del sr
	gc.collect()

# **Classification**
# Classics
* https://scikit-learn.org/stable/supervised_learning.html
* Logistic regression, Support Vector Classification, Decision Tree, Random Forest, Neural Net, AdaBoost, Naïve Bayes
* https://scikit-learn.org/stable/auto_examples/classification/plot_classifier_comparison.html

# Classification heads
* https://www.isca-speech.org/archive/pdfs/interspeech_2021/gauder21_interspeech.pdf
* Neural networks - Conv1D (k=1), Conv1D (k=3), Global. Average
* https://www.isca-speech.org/archive/pdfs/interspeech_2021/wang21ca_interspeech.pdf-Neural networks - Conv - Conv1D - Softmax
* Others
* https://www.tensorflow.org/tutorials/images/transfer_learning#add_a_classification_head

In [None]:
import gc

gc.collect()

In [None]:
# need to be run

import pandas as pd

# Get dataFrame
address_sample_original_df = pd.read_csv(address_sample_csv_path + "training-groundtruth.csv")
address_sample_clean_df = address_sample_original_df
del address_sample_original_df
#address_sample_clean_df = address_sample_original_df.dropna().drop_duplicates()

# Cleaning Process

## Drop other columns
address_sample_clean_df.drop(['age', 'gender', 'educ', 'mmse'], axis=1, inplace=True)

## In dx column, change "Control" to 0 and "ProbableAD" to 1
address_sample_clean_df['dx'] = address_sample_clean_df['dx'].apply(lambda x: 0 if x == "Control" else 1)

address_sample_clean_df.head()


In [None]:
# need to be run

import os

address_sample_name = []
for fName in os.listdir(address_sample_path):
#   check if fName is file
	if os.path.isfile(os.path.join(address_sample_path, fName)):
		address_sample_name.append(fName)
# print size of list
address_sample_name.sort()
print(len(address_sample_name))
# print(address_sample_name)

In [None]:
import librosa
from sklearn.preprocessing import normalize
import matplotlib.pyplot as plt

def extract_features(embedding, sr):
    # we will extract 4 features from each embedding
	# 1. MFCC (Mel-Frequency Cepstral Coefficients)
	# 2. Chromagram, spectral bandwidth, centroid repeatedly
	# 3. Short-Time Fourier Transform
	# 4. Zero Crossing Rate
	
	# 1. MFCC (Mel-Frequency Cepstral Coefficients)
	# parameter y is the audio time series
	# parameter sr is the sampling rate of y
	# parameter n_mfcc is the number of MFCCs to return
	mfcc = librosa.feature.mfcc(y=embedding, sr=sr, n_mfcc=128)

	# 2. Chromagram, spectral bandwidth, centroid repeatedly
	# parameter y is the audio time series
	# parameter sr is the sampling rate of y
	chroma_stft = librosa.feature.chroma_stft(y=embedding, sr=sr)
	spectral_bandwidth = librosa.feature.spectral_bandwidth(y=embedding, sr=sr)
	spectral_centroid = librosa.feature.spectral_centroid(y=embedding, sr=sr)

	# 3. Short-Time Fourier Transform
	# parameter y is the audio time series
	# parameter n_fft is the length of the FFT window
	# parameter hop_length is the number of samples between successive frames
	stft = np.abs(librosa.stft(embedding, n_fft=255, hop_length=512))

	# 4. Zero Crossing Rate
	# parameter y is the audio time series
	zero_crossing_rate = librosa.feature.zero_crossing_rate(embedding)

	# spectral_contrast = librosa.feature.spectral_contrast(y=embedding, sr=sr, fmin=100, n_bands=10)

	image = np.array(spectral_bandwidth)
	image = np.append(image, spectral_centroid, axis=0)
	image = np.append(image, chroma_stft, axis=0)
	image = np.append(image, zero_crossing_rate, axis=0)
	image = np.append(image, zero_crossing_rate, axis=0)

	for i in range(0,7):
		image = np.append(image, spectral_bandwidth, axis=0)
		image = np.append(image, spectral_centroid, axis=0)
		image = np.append(image, chroma_stft, axis=0)
		image = np.append(image, zero_crossing_rate, axis=0)
		image = np.append(image, zero_crossing_rate, axis=0)

	# stack all the features together
	final = np.dstack((mfcc, image, stft))

	# print(mfcc.shape, image.shape, stft.shape, final.shape)

	# show result of the features
	plt.figure(figsize=(10, 4))
	librosa.display.specshow(librosa.power_to_db(final[:, :, 0], ref=np.max), y_axis='mel', fmax=8000, x_axis='time')
	plt.colorbar(format='%+2.0f dB')
	plt.title('MFCC')
	plt.tight_layout()
	plt.show()

	# plt.figure(figsize=(10, 4))
	librosa.display.specshow(librosa.power_to_db(final[:, :, 1], ref=np.max), y_axis='mel', fmax=8000, x_axis='time')
	plt.colorbar(format='%+2.0f dB')
	plt.title('Spectral Bandwidth')
	plt.tight_layout()
	plt.show()

	# plt.figure(figsize=(10, 4))
	librosa.display.specshow(librosa.power_to_db(final[:, :, 2], ref=np.max), y_axis='mel', fmax=8000, x_axis='time')
	plt.colorbar(format='%+2.0f dB')
	plt.title('Short-Time Fourier Transform')
	plt.tight_layout()
	plt.show()

	# return the features
	return final


In [None]:
#need to be run for training model

import pickle
import gc
import numpy as np

result = []
y_train = []

# address_sample_number = len(address_sample_name)
address_sample_number = 22

#read each pickle file and append to result
for i in range(0,address_sample_number):
	if(address_sample_clean_df['cate'][i] == 'test'):
		continue
	print(str(i) + ': reading ' + address_sample_name[i].split('.')[0] + '.pkl')
	with open(address_sample_name[i].split('.')[0] + '.pkl', 'rb') as f:
		embedding, timestamps = pickle.load(f)
		# print(np.shape(embedding))
		result.append(extract_features(embedding, timestamps))
		y_train.append(address_sample_clean_df['dx'][i])
		del embedding
		del timestamps
		gc.collect()

In [None]:
# shuffle the result
from sklearn.utils import shuffle
result, y_train = shuffle(result, y_train, random_state=42)

**Classic - Logistic Regressing**

In [None]:
X_train = np.reshape(result, (np.shape(result)[0], -1))

In [None]:
np.shape(X_train)

In [None]:
del result

In [None]:
gc.collect()

In [None]:
from sklearn.linear_model import LogisticRegression

clf = LogisticRegression(max_iter = 150, random_state=0).fit(X_train, y_train)

In [None]:
# save model 
import pickle
pickle.dump(clf, open('logistic_regression_model.sav', 'wb'))

**CNN**

In [None]:
import numpy as np

X_train = np.array(result)
y_train = np.array(y_train)

In [None]:
X_train.shape

In [None]:
X_train = X_train.astype(np.float16)

X_train

In [None]:
del result

In [None]:
gc.collect()

In [None]:
import tensorflow as tf
from tensorflow.keras import layers

gpu_devices = tf.config.experimental.list_physical_devices('GPU')
tf.config.experimental.set_memory_growth(gpu_devices[0], True)

In [None]:
CNNmodel = tf.keras.models.Sequential()

CNNmodel.add(layers.Conv2D(64, (3, 3), activation='relu', input_shape=(X_train.shape[1], X_train.shape[2], X_train.shape[3])))
CNNmodel.add(layers.MaxPooling2D((2, 2)))
CNNmodel.add(layers.Conv2D(128, (3, 3), activation='relu'))
CNNmodel.add(layers.MaxPooling2D((2, 2)))
CNNmodel.add(layers.Conv2D(256, (3, 3), activation='relu'))
CNNmodel.add(layers.MaxPooling2D((2, 2)))
CNNmodel.add(layers.Conv2D(512, (3, 3), activation='relu'))
CNNmodel.add(layers.MaxPooling2D((2, 2)))
CNNmodel.add(layers.Flatten())
CNNmodel.add(layers.Dropout(0.7))
CNNmodel.add(layers.Dense(1024, activation='relu'))
CNNmodel.add(layers.Dense(1, activation='sigmoid'))

CNNmodel.compile(optimizer='adam',
                 loss='binary_crossentropy',
                 metrics=['accuracy'])

CNNmodel.summary()


In [None]:
del CNNmodel

In [None]:
gc.collect()

In [None]:
device_name = tf.test.gpu_device_name()

if "GPU" not in device_name:
    print("GPU device not found")
print('Found GPU at: {}'.format(device_name))

In [None]:
from keras.callbacks import LambdaCallback

# train model
history = CNNmodel.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2, shuffle=True)

# save model
CNNmodel.save('cnn_model.h5')

In [None]:
# plot training history
import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'], label='accuracy')
plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim([0.5, 1])
plt.legend(loc='lower right')
plt.show()

plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label = 'val_loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.ylim([0, 5])
plt.legend(loc='lower right')
plt.show()


# **Testing model**

In [None]:
#optional

del X_train
del y_train

In [None]:
del clf

In [None]:
#optional

gc.collect()

**Load Testing Data**

In [None]:
# need to be run

import pandas as pd

# Get dataFrame
address_sample_original_df = pd.read_csv(address_sample_csv_path + "training-groundtruth.csv")
address_sample_clean_df = address_sample_original_df
del address_sample_original_df
#address_sample_clean_df = address_sample_original_df.dropna().drop_duplicates()

# Cleaning Process

## Drop other columns
address_sample_clean_df.drop(['age', 'gender', 'educ', 'mmse'], axis=1, inplace=True)

## In dx column, change "Control" to 0 and "ProbableAD" to 1
address_sample_clean_df['dx'] = address_sample_clean_df['dx'].apply(lambda x: 0 if x == "Control" else 1)

address_sample_clean_df.head()


In [None]:
# need to be run

import os

address_sample_name = []
for fName in os.listdir(address_sample_path):
#   check if fName is file
	if os.path.isfile(os.path.join(address_sample_path, fName)):
		address_sample_name.append(fName)
# print size of list
address_sample_name.sort()
print(len(address_sample_name))
# print(address_sample_name)

In [None]:
#need to be run for training model

import pickle
import gc

X_test = []
y_test = []

address_sample_number = len(address_sample_name)

#read each pickle file and append to result
for i in range(0,address_sample_number):
	if(address_sample_clean_df['cate'][i] == 'train'):
		continue
	print(str(i) + ': reading ' + address_sample_name[i].split('.')[0] + '.pkl')
	try:
		with open(address_sample_name[i].split('.')[0] + '.pkl', 'rb') as f:
			try:
				embedding, timestamps = pickle.load(f)
				X_test.append(embedding)
				y_test.append(address_sample_clean_df['dx'][i])
				del embedding
				del timestamps
				gc.collect()
			except Exception as error:
				print('error loading pickle file for ' + address_sample_name[i].split('.')[0] + '.pkl')
				print(error)
	except Exception as error:
		print('error reading pickle file for ' + address_sample_name[i].split('.')[0] + '.pkl')
		print(error)

**Testing Logistic Regression**

In [None]:
X_test = np.reshape(X_test, (np.shape(X_test)[0],-1))

In [None]:
gc.collect()

In [None]:
# optional IF model is trained and need to be loaded

import pickle

clf = pickle.load(open('logistic_regression_modelHalf.sav', 'rb'))

In [None]:
# Test Logistic Regression

y_pred = clf.predict(X_test)

**Testing CNN**

In [None]:
import numpy as np

X_test = np.array(X_test)
y_test = np.array(y_test)

In [None]:
import gc

gc.collect()

In [None]:
# Load CNN model

import tensorflow as tf

model = tf.keras.models.load_model('cnn_model.h5')

In [None]:
# Test CNN model

y_pred = model.predict(X_test)

**Show Accuracy Score**

In [None]:
y_pred

In [None]:
np.round(y_pred)

In [None]:
from sklearn.metrics import accuracy_score

accuracy_score(y_test, y_pred.round())

**Showing Confusion Matrix below**

In [None]:
# confusion matrix
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

cm = confusion_matrix(y_test, y_pred.round())
sns.heatmap(cm, annot=True, fmt='g')
plt.xlabel('Predicted')
plt.ylabel('Truth')
plt.show()