In [None]:
import pandas as pd
from pathlib import Path
import os
import librosa
import librosa.display
import librosa.feature as feat
import matplotlib.pyplot as plt
import numpy as np
import os
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score
from pathlib import Path
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from tqdm import tqdm
from scipy import signal
import seaborn as sns
import numpy as np
import IPython.display as ipd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import layers, models
from time import time

In [None]:
def load_and_preprocess_data(file_paths, target_length): 
    data = []
    for file_path in tqdm(file_paths, desc="Loading and preprocessing data", unit="file"):
        audio, sr = librosa.load(file_path, sr=None)
        if len(audio) < target_length:
            audio = np.pad(audio, (0, target_length - len(audio)))
        else: 
            audio = audio[:target_length]

        audio = bandpass_filter(audio, sr)

        data.append(audio)

    print("Done")
    return np.array(data)

def load_and_preprocess_data_with_spectrogram(file_paths, target_length):
    data = []
    for file_path in tqdm(file_paths, desc="Loading and preprocessing data", unit="file"):
        audio, sr = librosa.load(file_path, sr=None)
        if len(audio) < target_length:
            audio = np.pad(audio, (0, target_length - len(audio)))
        else: 
            audio = audio[:target_length]

        sos = signal.butter(6, [5000, 100000], 'bandpass', fs=sr, output='sos')
        audio = signal.sosfiltfilt(sos, audio)

        spectrogram = librosa.feature.melspectrogram(y=audio, sr=sr, n_fft=1024, hop_length=512, n_mels=128)
        spectrogram = librosa.power_to_db(spectrogram, ref=np.max)
        # Normalize the spectrogram
        spectrogram = (spectrogram - spectrogram.min()) / (spectrogram.max() - spectrogram.min())
        data.append(spectrogram)

    print("Done")
    return np.array(data)

def bandpass_filter(audio, sr):
    sos = signal.butter(6, [5000, 100000], 'bandpass', fs=sr, output='sos')
    audio = signal.sosfiltfilt(sos, audio)
    return audio


def build_model(target_length):
    print("\nCreating model")
    model = models.Sequential()
    model.add(layers.Conv1D(32, kernel_size=9, activation='relu', input_shape=(128, 101))) 
    model.add(layers.MaxPooling1D(pool_size=2))
    model.add(layers.Dropout(0.5))
    # Second convolutional layer
    model.add(layers.Conv1D(32, kernel_size=9, activation='relu'))
    model.add(layers.MaxPooling1D(pool_size=2))
    # Third convolutional layer
    model.add(layers.Conv1D(32, kernel_size=5, activation='relu'))
    model.add(layers.MaxPooling1D(pool_size=2))
    model.add(layers.Dropout(0.5))
    # Flatten the output for the fully connected layers
    model.add(layers.Flatten())
    # First fully connected layer
    model.add(layers.Dense(128, activation='relu', kernel_regularizer='l2')) #! L2 regularization to remove after
    # Second fully connected layer
    model.add(layers.Dense(64, activation='relu', kernel_regularizer='l2')) #! L2 regularization to remove after
    # Dropout regularization to avoid overfitting
    model.add(layers.Dropout(0.5))
    # Binary classification output layer
    model.add(layers.Dense(1, activation='sigmoid'))
    # Compile the model
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    # Display the model summary
    model.summary()
    return model

def plot_accuracy(history):
    acc = history.history['accuracy']
    val_acc = history.history['val_accuracy']
    epochs = range(1, len(acc) + 1)

    plt.plot(epochs, acc, '-', label='Training Accuracy')
    plt.plot(epochs, val_acc, ':', label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend(loc='lower right')
    plt.plot()


In [None]:
Path.cwd().parents[1]

In [None]:
#! ===== Set parameters ======
grandparent_dir = Path.cwd().parents[1]
conv1D_directory = Path.cwd()
test_directory = grandparent_dir / ".dataset" / "X_test"
models_directory = conv1D_directory / "models"


# Set the path to the downloaded data
download_path = grandparent_dir / ".dataset"

# Audio parameters
sample_rate = 256000
audio_duration_seconds = 0.2 

In [None]:
#! ====== Load and preprocess data ====== 
# Read labels file
labels_file = download_path / "Y_train_ofTdMHi.csv"
df = pd.read_csv(labels_file)

# Construct file path by concatenating folder and file name
df["relative_path"] = Path(download_path) / "X_train" / df["id"]
# df["relative_path"] = str(download_path) + "/X_train/" + df["id"]

# Drop id column (replaced it with relative_path)
df.drop(columns=["id"], inplace=True)

df.rename(columns={"pos_label": "label"}, inplace=True)

# invert relative_path and label columns positions
df = df[["relative_path", "label"]]
print(f"### There are {len(df)} audio files in the dataset.")

table = f"""
Here is the split into good and bad signals:
| Label   | Count   |
|:-------:|:-------:|
| 0       | {df['label'].value_counts()[0]:7} |
| 1       | {df['label'].value_counts()[1]:7} |"""
print(table, end="\n\n")

In [None]:
print("Loading and preprocessing data")
target_length = int(sample_rate * audio_duration_seconds)
X = load_and_preprocess_data_with_spectrogram(df["relative_path"], target_length)
y = df["label"].values.astype(int)

In [None]:
print(f"X shape: {X.shape}")

In [None]:
print("\nSplitting data into train and validation sets")
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=64) 

In [None]:
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

## Building a model

In [None]:
model = build_model(target_length) # Build model

In [None]:
print("\n------------------ Training model ------------------", end="\n\n")
history = model.fit(X_train, y_train, epochs=20, batch_size=32, validation_data=(X_val, y_val), callbacks=[early_stopping])


In [None]:
# print final model accuracy
print("\n------------------ Model accuracy ------------------")
_, accuracy = model.evaluate(X_val, y_val)
print("Accuracy: %.2f" % (accuracy*100))

In [None]:
print("\n------------------ Saving model ------------------", end="\n\n")
model_name = "1d_cnn_l2_spectro.keras"

os.mkdir(Path(models_directory)) if not os.path.exists(Path(models_directory)) else None
model.save(Path(models_directory) / model_name)
print(f"Model {model_name} saved at {models_directory}")

In [None]:
print("\n------------------ Plotting accuracy ------------------", end="\n\n")
plot_accuracy(history)

## Testing


In [None]:
def load_test_data(folder_path, target_length):
    file_paths = list(Path(folder_path).rglob('*.wav'))  # Assuming the audio files are in WAV format
    return load_and_preprocess_data(file_paths, target_length)

def load_test_data_with_spectrogram(folder_path, target_length):
    file_paths = list(Path(folder_path).rglob('*.wav'))  # Assuming the audio files are in WAV format
    return load_and_preprocess_data_with_spectrogram(file_paths, target_length)

In [None]:
X_test = load_test_data_with_spectrogram(test_directory, target_length)

model_to_test = "1d_cnn_l2_spectro.keras"
model = models.load_model(models_directory / model_to_test)

In [None]:
file_names = [file_path.name for file_path in Path(test_directory).rglob('*.wav')]
predictions = model.predict(X_test)

In [None]:
print("----------------------------------")
print(predictions.shape)
print(len(file_names))
print("----------------------------------")

In [None]:
# Save data as csv for submission file
submission_filename = "submission.csv"

df = pd.DataFrame({'id': file_names, 'pos_label': predictions[:, 0]})
df.to_csv(Path(conv1D_directory) / submission_filename, index=False)