In [None]:
!pip install pandas
!pip install matplotlib
!pip install seaborn
!pip install tensorflow
!pip install tensorboard

In [None]:
import pandas as pd

In [30]:
data_path = r"D:\datasets\cv-corpus-19.0-2024-09-13-fr\cv-corpus-19.0-2024-09-13\fr"
data_path = r"/mnt/d/datasets/cv-corpus-19.0-2024-09-13-fr/cv-corpus-19.0-2024-09-13/fr"

train_age_only = True
use_ordinal_age = False
use_early_stopping = False
features_length = 40

# Hyperparameters
loss_age_weight = 1
loss_genre_weight = 0.1
learning_rate = 0.00005
epochs = 100
batch_size = 128
num_age_classes = 7
train_ratio = 0.90

In [None]:
from sklearn.model_selection import train_test_split


def preprocess_label_data(label_data, keep_age=False):
    gender_dict = {
        'male_masculine': 0,
        'female_feminine': 1
    }
    age_dict = {
        'teens': 0,
        'twenties': 1,
        'thirties': 2,
        'fourties': 3,
        'fifties': 4,
        'sixties': 5,
        'seventies': 6
    }
    label_data = label_data.copy()
    mfcc_features = []
    for i in range(features_length):
        mfcc_features.append(label_data[f'mfcc_{i}'])
        label_data = label_data.drop(f'mfcc_{i}', axis=1)
    label_data['mfcc_features'] = list(np.asarray(mfcc_features).T)
    if use_ordinal_age:
        label_data['age'] = label_data['age'].map(age_dict)
    else:
        dummies = pd.get_dummies(label_data['age'])
        dummies = dummies[[*age_dict.keys()]]
        label_data = pd.concat([label_data, dummies], axis=1)
        if not keep_age:
            label_data = label_data.drop('age', axis=1)
    label_data['gender'] = label_data['gender'].map(gender_dict)
    return label_data

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf


def load_csv_data(csv_path):
    """
    Load CSV data into a DataFrame with specific columns.
    """
    df = pd.read_csv(csv_path)
    return df


def create_dataset(csv_path, batch_size, num_age_classes, train_ratio=0.8, random_state=0):
    """
    Create a tf.data.Dataset from a CSV file.
    """
    # Load CSV and preprocess
    df = load_csv_data(csv_path)
    df = preprocess_label_data(df)

    # Split the data
    train_data, val_data = train_test_split(df, train_size=train_ratio, random_state=random_state,
                                            stratify=df['gender'])
    print(f"Train data: {len(train_data)} samples")
    print(f"Validation data: {len(val_data)} samples")

    # Convert columns to tensors
    def convert_to_tensors(data):
        features = data['mfcc_features']
        features = tf.convert_to_tensor(np.stack(features), dtype=tf.float32)
        ages = tf.convert_to_tensor(data['age'] if use_ordinal_age else data.iloc[:, 4:].values,
                                    dtype=tf.float32)  # Assuming age columns start from index 3
        if not train_age_only:
            genders = tf.convert_to_tensor(data['gender'].values, dtype=tf.int32)
            return features, genders, ages
        else:
            return features, ages

    if not train_age_only:
        train_features, train_genders, train_ages = convert_to_tensors(train_data)
        val_features, val_genders, val_ages = convert_to_tensors(val_data)
        # Create datasets from tensors
        train_dataset = tf.data.Dataset.from_tensor_slices((train_features, train_genders, train_ages))
        val_dataset = tf.data.Dataset.from_tensor_slices((val_features, val_genders, val_ages))
        # Parse rows, and batch
        train_dataset = (
            train_dataset
            .map(lambda path, gender, age: tf_parse_row(path, gender, age, num_age_classes),
                 num_parallel_calls=tf.data.AUTOTUNE)
            .batch(batch_size)
            .prefetch(tf.data.AUTOTUNE)  # Prefetch for efficient data loading
        )

        val_dataset = (
            val_dataset
            .map(lambda path, gender, age: tf_parse_row(path, gender, age, num_age_classes),
                 num_parallel_calls=tf.data.AUTOTUNE)
            .batch(batch_size)
            .prefetch(tf.data.AUTOTUNE)  # Prefetch for efficient data loading
        )
    else:
        train_features, train_ages = convert_to_tensors(train_data)
        val_features, val_ages = convert_to_tensors(val_data)
        # Create datasets from tensors
        train_dataset = tf.data.Dataset.from_tensor_slices((train_features, train_ages))
        val_dataset = tf.data.Dataset.from_tensor_slices((val_features, val_ages))

        # Parse rows, and batch
        train_dataset = (
            train_dataset
            .map(lambda features, age: tf_parse_row(features, None, age, num_age_classes),
                 num_parallel_calls=tf.data.AUTOTUNE)
            .batch(batch_size)
            .prefetch(tf.data.AUTOTUNE)  # Prefetch for efficient data loading
        )

        val_dataset = (
            val_dataset
            .map(lambda features, age: tf_parse_row(features, None, age, num_age_classes),
                 num_parallel_calls=tf.data.AUTOTUNE)
            .batch(batch_size)
            .prefetch(tf.data.AUTOTUNE)  # Prefetch for efficient data loading
        )
    return train_dataset, val_dataset


def tf_parse_row(features, gender_label, age_label, num_age_classes):
    """
    Wrapper to use the parse_row function with TensorFlow.
    """

    # Set shapes for TensorFlow to understand
    features.set_shape([features_length])  # Ajustez cette forme en fonction de vos données
    age_label.set_shape([] if use_ordinal_age else [num_age_classes])
    labels = {"age": age_label}
    if not train_age_only:
        gender_label.set_shape([])
        labels['gender'] = gender_label
    return features, labels




In [None]:
csv_path = f"features.csv"
train_dataset, val_dataset = create_dataset(csv_path, batch_size, num_age_classes, train_ratio)

In [31]:
import tensorflow as tf
from tensorflow.keras import layers, models, initializers


class ConvBlock(layers.Layer):
    def __init__(self, filters, kernel_size, strides=1, padding='same', activation='relu',
                 kernel_initializer='he_normal', batch_norm=True, max_pool=True, dropout_rate=0.0):
        super(ConvBlock, self).__init__()
        self.conv1 = layers.Conv1D(
            filters=filters,
            kernel_size=kernel_size,
            strides=strides,
            padding=padding,
            kernel_initializer=kernel_initializer
        )
        self.conv2 = layers.Conv1D(
            filters=filters,
            kernel_size=kernel_size,
            strides=strides,
            padding=padding,
            kernel_initializer=kernel_initializer
        )
        self.batch_norm = layers.BatchNormalization() if batch_norm else None
        self.activation = layers.Activation(activation)
        self.max_pool = layers.MaxPooling1D(pool_size=2, strides=2, padding='same') if max_pool else None
        self.dropout = layers.Dropout(dropout_rate) if dropout_rate > 0 else None

    def call(self, inputs, training=False):
        x = self.conv1(inputs, training=training)
        x = self.conv2(x, training=training)
        if self.batch_norm:
            x = self.batch_norm(x, training=training)
        x = self.activation(x)
        if self.max_pool:
            x = self.max_pool(x)
        if self.dropout:
            x = self.dropout(x, training=training)
        return x


class AudioAgeAndGenderClassifier(tf.keras.Model):
    def __init__(self, *args, **kwargs):
        super(AudioAgeAndGenderClassifier, self).__init__(*args, **kwargs)
        self.blocks = [
            # ConvBlock(filters=16, kernel_size=3, strides=1, padding='same', activation='relu',
            #           kernel_initializer='he_normal', batch_norm=False, max_pool=True, dropout_rate=0.0),
            ConvBlock(filters=32, kernel_size=3, strides=1, padding='same', activation='relu',
                      kernel_initializer='he_normal', batch_norm=False, max_pool=True, dropout_rate=0.0),
            ConvBlock(filters=64, kernel_size=3, strides=1, padding='same', activation='relu',
                      kernel_initializer='he_normal', batch_norm=False, max_pool=True, dropout_rate=0.1),
            ConvBlock(filters=128, kernel_size=3, strides=1, padding='same', activation='relu',
                      kernel_initializer='he_normal', batch_norm=False, max_pool=True, dropout_rate=0.1),
            ConvBlock(filters=256, kernel_size=3, strides=1, padding='same', activation='relu',
                      kernel_initializer='he_normal', batch_norm=True, max_pool=True, dropout_rate=0.2),
            ConvBlock(filters=512, kernel_size=3, strides=1, padding='same', activation='relu',
                      kernel_initializer='he_normal', batch_norm=True, max_pool=True, dropout_rate=0.2),
        ]

        self.reduction_layer = layers.Flatten()
        self.dense = layers.Dense(128, activation='relu')
        self.age_output = layers.Dense(1 if use_ordinal_age else num_age_classes,
                                       activation="relu" if use_ordinal_age else 'softmax')
        if not train_age_only:
            self.gender_output = layers.Dense(1, activation='sigmoid')

    def call(self, inputs, training=False):
        x = tf.expand_dims(inputs, axis=-1)
        # x = inputs
        for block in self.blocks:
            x = block(x, training=training)

        x = self.reduction_layer(x)
        x = self.dense(x, training=training)
        age_pred = self.age_output(x)

        if not train_age_only:
            gender_pred = self.gender_output(x)
            return {"age": age_pred, "gender": gender_pred}
        else:
            return {"age": age_pred}


# Instantiate the model
device_name = tf.test.gpu_device_name()
if device_name:
    print(f"Using GPU: {device_name}")
else:
    print("Using CPU")

Using GPU: /device:GPU:0


I0000 00:00:1733759324.781949  176862 gpu_device.cc:2022] Created device /device:GPU:0 with 5563 MB memory:  -> device: 0, name: NVIDIA GeForce RTX 4060 Laptop GPU, pci bus id: 0000:01:00.0, compute capability: 8.9


In [32]:
import sklearn
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay


def age_loss_function(y_true, y_pred):
    return tf.reduce_mean(tf.square(y_true - y_pred))

# Compile the model
def compile_model(model, learning_rate=0.001):
    # Define separate losses for age and gender
    losses = {
        "age": age_loss_function if use_ordinal_age else tf.keras.losses.CategoricalCrossentropy(),
    }
    metrics = {
        "age": [
            tf.keras.metrics.MeanSquaredError() if use_ordinal_age else tf.keras.metrics.CategoricalAccuracy,
            # *[tf.keras.metrics.Recall(class_id=i, name=f'{i}') for i in range(num_age_classes)]
        ],
    }
    if not train_age_only:
        losses["gender"] = tf.keras.losses.BinaryCrossentropy()
        metrics["gender"] = [tf.keras.metrics.BinaryAccuracy()]

    # Compile the model
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss=losses,
        loss_weights=[loss_age_weight, loss_genre_weight] if not train_age_only else [loss_age_weight],
        metrics=metrics,
    )
    return model


def plot_confusion_matrix(y_test_classes, y_pred_classes, display_labels, title="Confusion Matrix"):
    plt.figure(figsize=(3, 1))
    # Compute the confusion matrix
    cm = confusion_matrix(y_test_classes, y_pred_classes)

    # Optionally normalize the confusion matrix
    # cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    # Display the confusion matrix
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=display_labels)
    # Plot the matrix
    disp.plot(cmap=plt.cm.Blues)
    plt.title(title)

    plt.show()


def retrieve_labels(dataset):
    all_labels = []
    for features, labels in dataset:
        all_labels.append(labels)
    print(np.asarray(all_labels).shape)
    return np.concatenate(all_labels, axis=0)


def print_example_output(model, dataset):
    # Get the model predictions
    predictions = {"age": [], "gender": []}
    labels = {"age": [], "gender": []}
    for features, labels_ in dataset:
        for key in labels_.keys():
            predictions[key].extend(model(features)[key].numpy())
            labels[key].extend(labels_[key].numpy())
    if not use_ordinal_age:
        predictions["age"] = np.argmax(predictions["age"], axis=1)
        labels["age"] = np.argmax(labels["age"], axis=1)
    labels_names = ['teens', 'twenties',
                    'thirties', 'fourties',
                    'fifties', 'sixties',
                    'seventies']
    plot_confusion_matrix(labels["age"], predictions["age"],labels_names,
                          title="Age Confusion Matrix")

    # precisions, recall, f1_score, _ = sklearn.metrics.precision_recall_fscore_support(labels["age"], predictions["age"])
    # for i in range(num_age_classes):
    #     print(f"Age {i + 1}: Precision: {precisions[i]}, Recall: {recall[i]}, F1 Score: {f1_score[i]}")

    if not train_age_only:
        predictions["gender"] = predictions["gender"] > 0.5
        plot_confusion_matrix(labels["gender"], predictions["gender"], ['male', 'female'],
                              title="Gender Confusion Matrix")


# Train the model
def train_model(model, dataset, validation_dataset, epochs, batch_size, verbose=0, print_confusion_matrix=True):
    # Define callbacks
    callbacks = [
        tf.keras.callbacks.ModelCheckpoint(
            "train/best_model.weights.h5", save_best_only=True, monitor="val_loss", save_weights_only=True
        ),
        tf.keras.callbacks.TensorBoard(log_dir="./logs"),
    ]
    if print_confusion_matrix:
        callbacks.append(
            tf.keras.callbacks.LambdaCallback(
                on_epoch_end=lambda epoch, logs: print_example_output(model, val_dataset)
            )
        )
    if use_early_stopping:
        callbacks.append(
            tf.keras.callbacks.EarlyStopping(
                monitor="val_loss", patience=3
            ), )

    # Train the model
    history = model.fit(
        dataset,
        # validation_data=validation_dataset,
        epochs=epochs,
        batch_size=batch_size,
        callbacks=callbacks,
        verbose=verbose,
    )
    return history


csv_path = f"{data_path}/features.csv"

train_dataset, val_dataset = create_dataset(csv_path, batch_size, num_age_classes, train_ratio)

Train data: 131031 samples
Validation data: 14559 samples


In [None]:
# Train the model
model = AudioAgeAndGenderClassifier()
model = compile_model(model, learning_rate)
history = train_model(model, train_dataset, val_dataset, epochs, batch_size, verbose=2)

Epoch 1/100


In [None]:
model = AudioAgeAndGenderClassifier()
model = compile_model(model, learning_rate)
# retrieve the best weights from the training folder
# model.load_weights("train/model_epoch_31.weights.h5")

In [None]:
for i in range(20):
    new_history = train_model(model, train_dataset, val_dataset, 2, batch_size, verbose=2, print_confusion_matrix=False)
    for key in history.history.keys():
        history.history[key] += new_history.history[key]
    print_example_output(model, val_dataset)


In [None]:
# plot confusion matrix
print_example_output(model, val_dataset)

In [None]:
# Visualise a batch on the same figure
import matplotlib.pyplot as plt

features, labels = train_dataset.take(1).as_numpy_iterator().next()
plt.figure(figsize=(12, 6))
for i, feature in enumerate(features):
    if i != 5:
        continue
    plt.plot(feature)
plt.show()