# Concept proof - Final project
## Evaluation of IA model prediction of respiratory deseases thourgh breathing sounds processing

The idea of this concept proof is to make a little first preprocessing of the .wav files that are going to be use for the project, train with them an existing IA model and then show up some metrics, in order to see that the current project could be done in one year, and have a little idea of is going to be done.

In order to make this proof, this tutorial has been followed, which makes a very similar task (IA model training with .wav files processing to predict some specific english keywords): https://www.tensorflow.org/tutorials/audio/simple_audio?hl=es-419

## Configuration

### Constants

The proof will use the following constants in the code.

In [None]:
##Constants

###Seed for randomness
SEED = 42
###Paths
DATASET_AUDIO_PATH = 'dataset/audio_and_txt_files'
DATASET_LABELS_PATH = 'dataset/patient_diagnosis.csv'
###IA Model
TRAINING_PERCENTAGE = 0.8
VALIDATION_PERCENTAGE = 0.1
TESTING_PERCENTAGE = 0.1
EPOCHS = 10


### Libraries

In order that the current proof works, you should import the following libraries.

In [None]:
import os
import pathlib

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow as tf
import soundfile
import pywav

from tensorflow.keras import layers
from tensorflow.keras import models
from IPython import display

from data import transform_audio_filenames, preprocess_dataset, get_waveform_and_label, generate_diagnosis_data, get_spectrogram, get_spectrogram_and_label_id, get_diagnosis_list, get_random_file_of_diagnosis
from graphs import plot_spectrogram

# Set the seed value for experiment reproducibility.
seed = SEED
tf.random.set_seed(seed)
np.random.seed(seed)

## Preprocessing of audio files

### Reading .wav files and their corresponding labels (diagnosis)

In [None]:
generate_diagnosis_data(DATASET_LABELS_PATH,False)
diagnosis_list = get_diagnosis_list()
audio_filenames = tf.io.gfile.glob(str(DATASET_AUDIO_PATH) + '/*.wav')
transform_audio_filenames(audio_filenames)
audio_filenames = tf.io.gfile.glob(str(DATASET_AUDIO_PATH) + '/*.wav')
audio_filenames = tf.random.shuffle(audio_filenames)
num_samples = len(audio_filenames)
print('Number of total examples:', num_samples)
print('Example file tensor:', audio_filenames[0])

In [None]:
print('Diagnosis list :', diagnosis_list)

We define the different sets of data for training the IA model (train, validation and test), with the proportions set in the constants part.

In [None]:
training_num_samples = int(num_samples * TRAINING_PERCENTAGE)
validation_num_samples = int(num_samples * VALIDATION_PERCENTAGE)
testing_num_samples = int(num_samples * TESTING_PERCENTAGE)

train_files = audio_filenames[:training_num_samples]
val_files = audio_filenames[training_num_samples: training_num_samples + validation_num_samples]
test_files = audio_filenames[-testing_num_samples:]

print('Training set size', len(train_files))
print('Validation set size', len(val_files))
print('Test set size', len(test_files))


We generate a tensorflow dataset for the training, all with tuples that have the form (waveform, label)

In [None]:
AUTOTUNE = tf.data.AUTOTUNE

files_ds = tf.data.Dataset.from_tensor_slices(train_files)

waveform_ds = files_ds.map(
    map_func=get_waveform_and_label,
    num_parallel_calls=AUTOTUNE)

In [None]:
rows = 3
cols = 3
n = rows * cols
fig, axes = plt.subplots(rows, cols, figsize=(20, 20))


for i, (audio, label) in enumerate(waveform_ds.take(n)):
  r = i // cols
  c = i % cols
  ax = axes[r][c]
  ax.plot(audio.numpy())
  ax.set_yticks(np.arange(-1.2, 1.2, 0.2))
  label = label.numpy().decode('utf-8')
  ax.set_title(label)

plt.show()

## Convert waveforms to spectograms

In [None]:
for waveform, label in waveform_ds.take(1):
  label = label.numpy().decode('utf-8')
  spectrogram = get_spectrogram(waveform)

print('Label:', label)
print('Waveform shape:', waveform.shape)
print('Spectrogram shape:', spectrogram.shape)
print('Audio playback')
display.display(display.Audio(waveform, rate=16000))

In [None]:
fig, axes = plt.subplots(2, figsize=(12, 8))
timescale = np.arange(waveform.shape[0])
axes[0].plot(timescale, waveform.numpy())
axes[0].set_title('Waveform')
axes[0].set_xlim([0, 16000])

plot_spectrogram(spectrogram.numpy(), axes[1])
axes[1].set_title('Spectrogram')
plt.show()

In [None]:
spectrogram_ds = waveform_ds.map(
  map_func=get_spectrogram_and_label_id,
  num_parallel_calls=AUTOTUNE)

In [None]:
rows = 3
cols = 3
n = rows*cols
fig, axes = plt.subplots(rows, cols, figsize=(20, 20))

for i, (spectrogram, label_id) in enumerate(spectrogram_ds.take(n)):
  r = i // cols
  c = i % cols
  ax = axes[r][c]
  plot_spectrogram(spectrogram.numpy(), ax)
  ax.set_title(get_diagnosis_list()[label_id.numpy()])

plt.show()

## Build and train the model

In [None]:
train_ds = spectrogram_ds
val_ds = preprocess_dataset(val_files)
test_ds = preprocess_dataset(test_files)

In [None]:
batch_size = 64
train_ds = train_ds.batch(batch_size)
val_ds = val_ds.batch(batch_size)

In [None]:
train_ds = train_ds.cache().prefetch(AUTOTUNE)
val_ds = val_ds.cache().prefetch(AUTOTUNE)

In [None]:
for spectrogram, _ in spectrogram_ds.take(1):
  input_shape = spectrogram.shape
print('Input shape:', input_shape)
num_labels = len(diagnosis_list)

# Instantiate the `tf.keras.layers.Normalization` layer.
norm_layer = layers.Normalization()
# Fit the state of the layer to the spectrograms
# with `Normalization.adapt`.
norm_layer.adapt(data=spectrogram_ds.map(map_func=lambda spec, label: spec))

model = models.Sequential([
    layers.Input(shape=input_shape),
    # Downsample the input.
    layers.Resizing(32, 32),
    # Normalize.
    norm_layer,
    layers.Conv2D(32, 3, activation='relu'),
    layers.Conv2D(64, 3, activation='relu'),
    layers.MaxPooling2D(),
    layers.Dropout(0.25),
    layers.Flatten(),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.5),
    layers.Dense(num_labels),
])

model.summary()

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=['accuracy'],
)

In [None]:
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=tf.keras.callbacks.EarlyStopping(verbose=1, patience=2),
)

In [None]:
metrics = history.history
plt.plot(history.epoch, metrics['loss'], metrics['val_loss'])
plt.legend(['loss', 'val_loss'])
plt.show()

## Test the model performance

In [None]:
test_audio = []
test_labels = []

for audio, label in test_ds:
  test_audio.append(audio.numpy())
  test_labels.append(label.numpy())

test_audio = np.array(test_audio)
test_labels = np.array(test_labels)

In [None]:
y_pred = np.argmax(model.predict(test_audio), axis=1)
y_true = test_labels

test_acc = sum(y_pred == y_true) / len(y_true)
print(f'Test set accuracy: {test_acc:.0%}')

### Show a confusion matrix

In [None]:
confusion_mtx = tf.math.confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(confusion_mtx,
            xticklabels=diagnosis_list,
            yticklabels=diagnosis_list,
            annot=True, fmt='g')
plt.xlabel('Prediction')
plt.ylabel('Label')
plt.show()

In [None]:
test_diagnosis = 'Bronchiolitis'

sample_file = get_random_file_of_diagnosis(test_diagnosis,DATASET_AUDIO_PATH)

if(sample_file is not None):
  
    print(f"The sample file chosen is : {sample_file}")

    sample_ds = preprocess_dataset([str(sample_file)])

    for spectrogram, label in sample_ds.batch(1):
        prediction = model(spectrogram)
        plt.figure(figsize=(10,5))
        plt.bar(diagnosis_list, tf.nn.softmax(prediction[0]))
        plt.title(f'Predictions for "{diagnosis_list[label[0]]}"')
        plt.show()