In [None]:
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, Dropout, Concatenate
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint
from sklearn.model_selection import train_test_split
from datetime import datetime

: 

In [7]:
#  for reproducability
tf.random.set_seed(42)
np.random.seed(42)

In [8]:
def load_data_from_csv(csv_path, feature_dir):

    df = pd.read_csv(csv_path)

    display(df)

    gcc_features = []
    spec_features = []
    labels = []

    for _, row in df.iterrows():
        # loading gcc features
        gcc_path = os.path.join(feature_dir, row['fea_gcc'])
        print(gcc_path)
        gcc_feature = np.load(gcc_path)
        gcc_features.append(gcc_feature)

        # loading mel spec features
        spec_path = os.path.join(feature_dir, row['fea_spec'])
        print(spec_path)
        spec_feature = np.load(spec_path)
        spec_features.append(spec_feature)

        # adding a label
        labels.append(row['subject_label'])

    # Convert to numpy arrays
    gcc_features = np.array(gcc_features)
    spec_features = np.array(spec_features)
    labels = np.array(labels)

    return gcc_features, spec_features, labels


In [9]:

def encode_labels(labels):
    # encoding labels
    unique_labels = np.unique(labels)
    label_to_index = {label: idx for idx, label in enumerate(unique_labels)}
    encoded_labels = np.array([label_to_index[label] for label in labels])
    return encoded_labels, label_to_index

In [10]:
def build_lstm_model(input_shape_gcc, input_shape_spec, hidden_units, output_units, dropout_rate=0.2):
    # input branch for gcc features
    gcc_input = Input(shape=input_shape_gcc)
    gcc_lstm = LSTM(hidden_units, return_sequences=True)(gcc_input)
    gcc_dropout = Dropout(dropout_rate)(gcc_lstm)

    # input branch for mel spec features
    spec_input = Input(shape=input_shape_spec)
    spec_lstm = LSTM(hidden_units, return_sequences=True)(spec_input)
    spec_dropout = Dropout(dropout_rate)(spec_lstm)

    # concat the two branches
    merged = Concatenate()([gcc_dropout, spec_dropout])

    # second lstm later
    lstm_2 = LSTM(hidden_units // 2, return_sequences=False)(merged)
    lstm_dropout = Dropout(dropout_rate)(lstm_2)

    # o/p later
    output = Dense(output_units, activation='softmax')(lstm_dropout)

    # create the model
    model = Model(inputs=[gcc_input, spec_input], outputs=output)
    return model

In [11]:
def main(feature_dir, csv_train_path, csv_test_path, batch_size=10, epochs=1, lr=0.001):
    # logging
    log_dir = os.path.join("logs", datetime.now().strftime("%Y%m%d-%H%M%S"))
    os.makedirs(log_dir, exist_ok=True)

    # loading training data
    train_gcc, train_spec, train_labels = load_data_from_csv(csv_train_path, feature_dir)
    train_labels, label_to_index = encode_labels(train_labels)

    # loading test data
    test_gcc, test_spec, test_labels = load_data_from_csv(csv_test_path, feature_dir)
    test_labels = np.array([label_to_index[label] for label in test_labels])

    # split training data and validation
    X_train_gcc, X_val_gcc, X_train_spec, X_val_spec, y_train, y_val = train_test_split(
        train_gcc, train_spec, train_labels, test_size=0.2, random_state=42
    )

    # building lstm model
    input_shape_gcc = (X_train_gcc.shape[1], X_train_gcc.shape[2])  # (timesteps, features) for GCC
    input_shape_spec = (X_train_spec.shape[1], X_train_spec.shape[2])  # (timesteps, features) for Mel-spectrogram
    model = build_lstm_model(
        input_shape_gcc=input_shape_gcc,
        input_shape_spec=input_shape_spec,
        hidden_units=128,
        output_units=len(label_to_index),  # Number of unique subjects
        dropout_rate=0.2
    )

    model.summary()

    model.compile(
        optimizer=Adam(learning_rate=lr),
        loss='sparse_categorical_crossentropy',  # Use 'sparse_categorical_crossentropy' for integer labels
        metrics=['accuracy']
    )

    tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)
    checkpoint_callback = ModelCheckpoint(
        filepath=os.path.join(log_dir, 'best_model.h5'),
        monitor='val_accuracy',
        save_best_only=True,
        mode='max'
    )

    history = model.fit(
        [X_train_gcc, X_train_spec], y_train,
        validation_data=([X_val_gcc, X_val_spec], y_val),
        batch_size=batch_size,
        epochs=epochs,
        callbacks=[tensorboard_callback, checkpoint_callback]
    )

    # testing
    test_loss, test_accuracy = model.evaluate([test_gcc, test_spec], test_labels, verbose=2)
    print(f"Test Accuracy: {test_accuracy:.4f}")


In [None]:
if __name__ == "__main__":
    # Define paths
    feature_dir = "/content/drive/MyDrive/FYP/data/audio_feature"  # Directory containing the .npy feature files
    csv_train_path = "/content/drive/MyDrive/FYP/data/audio_feature/AFPILD_FE1_rd_train.csv"  # Path to the training CSV file
    csv_test_path = "/content/drive/MyDrive/FYP/data/audio_feature/AFPILD_FE1_rd_test.csv"  # Path to the test CSV file

    # Run the main function
    main(feature_dir, csv_train_path, csv_test_path, batch_size=10, epochs=1, lr=0.001)