In [None]:
# !pip install datasets
# !pip install transformers
# !pip install transformers[torch]
# !pip install accelerate -U

## Import Libraries


In [None]:
# Import the necessary libraries for data processing, model training, and evaluation.
%matplotlib inline
import json, sys, os
import numpy as np, pandas as pd
import math
import torch, torchaudio
import random

# Import the necessary typing modules for type hints.
from typing import List, Dict, Union, Any

# Import specific classes from the 'datasets' library to load and handle audio datasets.
from datasets import load_dataset, Audio, Dataset

# Import specific classes from the 'transformers' library for audio classification.
from transformers import (
    TrainingArguments,
    Trainer,
    AutoFeatureExtractor,
    AutoModelForAudioClassification,
    AutoConfig,
    set_seed
)

MY_PATH = 'gdsc' #the folder path with the data

# Add the path f'{MY_PATH}/src' to the system path to access the 'gdsc_eval' module for evaluation purposes.
sys.path.append(f'{MY_PATH}/src')
from gdsc_eval import compute_metrics


# Functions

## Preprocessing functions

In [None]:
random.seed(42) #for model reproducibility

MAX_DURATION = 11 #max duration in  of the audio files (generally to pass to the feature extractor - it speed up the pre-processing step)

def preprocess_function(examples: Dict[str, Any], path: bool = 1) -> Dict[str, Any]:
    """
    Preprocesses audio data for audio classification task.

    Parameters:
    -----------
        examples: dict
                  A dictionary containing the input examples, where the 'audio' key corresponds to the audio data.
                  Each audio example should have a 'path' and 'array' field.
        path: int (optional)
                   An integer flag indicating whether to include the 'file_name' field in the output.
                   Default is 1, which includes the 'file_name' field. Set to 0 to exclude it.

    Returns:
    --------
        dict: A dictionary containing the preprocessed inputs for audio classification.
              The returned dictionary includes the following fields:
              - 'input_values': The audio arrays preprocessed by the feature extractor, truncated to MAX_DURATION seconds.
              - 'label' (optional): The true labels of audio arrays.
              - 'attention_mask' (optional): If 'return_attention_mask' is True in the feature extractor, this field will be present.
              - 'file_name' (optional): If 'path' is set to 1, this field contains the filenames extracted from the 'path' field of input examples.
    """

    # Extract audio arrays from the input examples and truncate them to MAX_DURATION seconds.
    audio_arrays = [x["array"][:MODEL_SAMPLING_RATE*MAX_DURATION] for x in examples['audio']]

    # Use the feature extractor to preprocess the audio data.
    inputs = feature_extractor(
        audio_arrays,
        sampling_rate=feature_extractor.sampling_rate,
        truncation=True,
        return_attention_mask=False,
    )

    # Include 'file_name' field in the output if 'path' is set to 1.
    if path:
        inputs['file_name'] = [e['path'].split('/')[-1] for e in examples['audio']]

    return inputs



def chunk_aug(example: np.ndarray) -> np.ndarray:

    """
    Randomly selects a chunk from the input audio signal and returns the chunk with a maximum duration of MAX_DURATION seconds.

    Parameters:
    -----------
    example: numpy.ndarray
          The input audio signal represented as a 1-D numpy array.

    Returns:
    --------
        numpy.ndarray: A 1-D numpy array representing the selected chunk from the input audio signal 'example'. The maximum duration of the chunk is MAX_DURATION seconds, but it may be shorter if 'example' is not long enough.

    """


    e_len = int(example.shape[0]/MODEL_SAMPLING_RATE)  #length of audio
    min_len = min([2, e_len]) #min possible seconds
    max_len = min([MAX_DURATION, e_len]) #max possible seconds
    chunk_len = list(range(min_len, max_len+1)) #how many seconds
    chunk_len_ran = random.choice(chunk_len) #random chunk seconds

    e_len_range = list(range(0,e_len-chunk_len_ran+1)) #positions
    e_len_range_ran = random.choice(e_len_range) #random position
    example = example[e_len_range_ran*MODEL_SAMPLING_RATE:(e_len_range_ran+chunk_len_ran)*MODEL_SAMPLING_RATE] #get random chunk from audio

    return example[:MODEL_SAMPLING_RATE*MAX_DURATION]


def call_files(x: Dict[str, Union[List[int], torch.Tensor]]) -> Dict[str, Any]:

    """
    Loads audio files based on the labels in 'x', applies chunk augmentation, extracts features using a feature extractor,
    and returns the processed inputs.

    Parameters:
    -----------
    x : dict
        A dictionary containing the input data.
        Required keys:
            - 'label': A list of integers representing the labels.

    Returns:
    --------
    dict
        A dictionary containing the processed inputs.
        The dictionary has the following keys:
            - 'input_values': A torch.Tensor representing the processed audio inputs.
            - 'label': A list of integers representing the labels.
    """
    # Select a random file path for each label in 'x' from 'balanced_df_list'.
    path_files = [random.choice(balanced_df_list[l]) for l in x['label']]

    # Extract the file names from the selected file paths.
    file_name = [p.split('/')[-1] for p in path_files]

    # Apply chunk augmentation to the audio signals and store the augmented chunks in 'wv'.
    wv = [chunk_aug(np.array(torchaudio.load(p)[0][0].numpy())) for p in path_files]

    # Extract features from the augmented audio signals using the feature extractor.
    inputs = feature_extractor(
        wv,
        sampling_rate=feature_extractor.sampling_rate,
        truncation=True,
        return_attention_mask=False)

    # Convert the 'input_values' to a torch.Tensor and store it in the 'inputs' dictionary.
    inputs['input_values'] = torch.Tensor(np.array(inputs['input_values']))

    # Store the 'label' values from the input dictionary 'x' in the 'inputs' dictionary.
    inputs['label'] = x['label']

    return inputs


## Predicting functions

This function divides the input audio signal 'example' into multiple chunks of 11 seconds (the standard AST model takes ~11 second of a waveform and transforms it into a mel spectogram of 1024 frames) each for prediction, ensuring that each chunk contains a maximum of 11 seconds of audio. If the length of 'example' is less than 11 seconds, the function returns the original 'example' as a single chunk.

The function performs the following steps:
1. Calculate the length of the audio signal 'example' in seconds ('e_len') based on the 'MODEL_SAMPLING_RATE'.
2. If 'e_len' is greater than 11 seconds, calculate the minimum possible length ('min_len') for the chunks, considering the remaining audio after the last 11-second chunk.
3. Create a list of chunk lengths ('chunk_len') ranging from 0 to 'min_len' (exclusive) with a step of 2 seconds.
4. Split the input audio signal 'e' into multiple chunks, each starting at different 2 seconds position and having a duration of max 11 seconds.
5. Return the list of chunks.

Example:
--------
Given an audio signal 'example' with a length of 15 seconds, the function will return 3 chunks:
- Chunk 1: Audio from 0 to 11 seconds (~11 seconds)
- Chunk 2: Audio from 2 to 13 seconds (~11 seconds)
- Chunk 3: Audio from 4 to 15 seconds (~11 seconds)

In [None]:
def chunk_pred(example: np.ndarray) -> List[np.ndarray]:
    """
    Divide the input audio signal 'example' into multiple chunks and return a list of these chunks for prediction.

    Parameters:
    -----------
    example : numpy.ndarray
        The input audio signal represented as a 1-D numpy array.

    Returns:
    --------
    List[numpy.ndarray]
        A list of numpy arrays, each representing a chunk of the input audio signal for prediction.

    """
    e_len = int(example.shape[0]/MODEL_SAMPLING_RATE)
    if e_len > MAX_DURATION: #if length of audio is more than MAX_DURATION seconds, divide the audio to chunks
        min_len = min(360, e_len-11) #min possible seconds
        chunk_len = list(range(0, min_len, 2)) #how many seconds
        return [example[MODEL_SAMPLING_RATE*r:MODEL_SAMPLING_RATE*(MAX_DURATION+r)] for r in chunk_len]
    return [example[:MODEL_SAMPLING_RATE*MAX_DURATION]]

This next function preprocesses the audio data from the 'examples' dictionary in chunks using the 'chunk_pred' function.
It then extracts features from the chunks using the provided feature extractor.
The 'max_duration', 'feature_extractor', and 'MODEL_SAMPLING_RATE'.

The function performs the following steps:
1. Calls the 'chunk_pred' function to chunk the audio data from the 'examples' dictionary.
2. Calls the feature extractor with the chunked audio data to extract features.
3. Moves the 'model' to the 'cuda:0' device for GPU acceleration.
4. Converts the input values to a torch.Tensor and moves it to the 'cuda:0' device.
5. Performs predictions for each chunk using the 'model' and stores the logits.
6. Extracts the predicted class IDs and their corresponding prediction scores.
7. Finds the class ID with the highest prediction score for the entire example.
8. Updates the 'examples' dictionary with the prediction information and the file name of the audio example.
9. Returns the updated 'examples' dictionary with prediction information.

In [None]:
def preprocess_function_pred_chunks(examples: Dict[str, Any], model: torch.nn.Module) -> Dict[str, Any]:

    """
    Preprocesses audio examples in chunks for prediction using the provided model.

    Parameters:
    -----------
    examples : dict
        A dictionary containing the audio examples.
        The 'audio' key holds another dictionary with two keys: 'array' (numpy.ndarray) and 'path' (str).
    model : torch.nn.Module
        The audio classification model used for prediction.

    Returns:
    --------
    dict
        A dictionary containing the processed audio examples with additional prediction information.
        The keys in the dictionary include:
            - 'audio': A dictionary with the processed audio data.
            - 'file_name': A string representing the file name of the audio example.
            - 'pred_id': A list of tuples containing the class ID and its corresponding prediction score for each chunk.
            - 'predicted_class_id': An integer representing the predicted class ID for the example.
    """

    # Chunk the audio data using the 'chunk_pred' function
    wv = chunk_pred(examples['audio']['array'])

    # Extract features from the chunked audio data using the feature extractor
    inputs = feature_extractor(
        wv,
        sampling_rate=feature_extractor.sampling_rate,
        truncation=True,
        return_attention_mask=False,
    )

    # Move the model to the 'cuda:0' device for GPU acceleration
    model_pred = model.to('cuda:0')

    # Convert the input values to a torch.Tensor and move it to the 'cuda:0' device
    input_values = torch.Tensor(np.array(inputs['input_values'])).to('cuda:0')

    # Perform predictions for each chunk using the model and store the logits
    logits = []
    with torch.no_grad():
        logits = [model(i.unsqueeze(0)).logits.cpu() for i in input_values]

    # Concatenate the logits and extract the predicted class IDs and their corresponding prediction scores
    logits = torch.Tensor(np.concatenate(logits))
    predicted_class_id_max = [torch.max(item).item() for item in logits]
    predicted_class_id = [(int(torch.argmax(item).item()), torch.max(item).item()) for item in logits]

    # Find the class ID with the highest prediction score for the entire example
    pred_id_max = torch.Tensor(predicted_class_id)[:, 1:].argmax().item()

    # Update the 'examples' dictionary with prediction information and file name
    examples['pred_id'] = predicted_class_id
    examples['predicted_class_id'] = predicted_class_id[pred_id_max][0]
    examples['file_name'] = examples['audio']['path'].split('/')[-1]

    # Return the updated 'examples' dictionary with prediction information
    return examples

# Preprocessing the data

## Mapping Class Labels to Numerical IDs for Audio Dataset

(*NOTE: the train dataset is prepared differently and it will be explained in the next paragraph*)

1.   Defining file paths for the validation and test datasets
2.   Loading class labels from a JSON file, and creates two dictionaries to map class labels to their corresponding numerical IDs and vice versa.

The resulting dictionaries enable convenient encoding and decoding of class labels in the audio dataset, making it suitable for further processing and model training.

In [None]:
# Define the file paths for the validation and test datasets.
val_path = f'{MY_PATH}/data/val'
test_path = f'{MY_PATH}/data/test'

# Open the file containing the labels in JSON format and load its content into the 'labels' dictionary.
with open(f'{MY_PATH}/data/labels.json', 'r') as f:
    labels = json.load(f)

# Initialize two dictionaries 'label2id' and 'id2label' to map the class labels to their corresponding numerical IDs and vice versa.
label2id, id2label = dict(), dict()

# Iterate through the 'labels' dictionary and populate the 'label2id' and 'id2label' dictionaries.
for k, v in labels.items():
    label2id[k] = str(v)  # Map class label 'k' to its numerical ID 'v' as a string.
    id2label[str(v)] = k  # Map numerical ID 'v' as a string back to its corresponding class label 'k'.

# Calculate the number of unique labels in the dataset and store it in 'num_labels'.
num_labels = len(label2id)


3. Creating an automatic Feature Extractor Using a Pre-trained Model

***Note:*** We are using `AutoFeatureExtractor` (instead of ASTFeatureExtractor) which allows us to be flexible in terms of choosing different type of feature extractors for different type of model architectures.

In [None]:
# Define the pre-trained model to be used for audio classification.
THE_MODEL = "MIT/ast-finetuned-audioset-10-10-0.4593"

# Define the sampling rate of the audio data to be processed by the feature extractor.
MODEL_SAMPLING_RATE = 44100

# Create a feature extractor object ('feature_extractor') by loading the pre-trained model specified by 'THE_MODEL'. The feature extractor is responsible for converting audio data into suitable features for the model.
feature_extractor = AutoFeatureExtractor.from_pretrained(
    THE_MODEL,
    do_normalize=True,                    # enable normalization of the audio data
    return_attention_mask=False,          # 'return_attention_mask=False' disables returning attention masks.
    sampling_rate=MODEL_SAMPLING_RATE,    # sets the sampling rate
    num_mel_bins=128                      # specifying the number of Mel bins used for the Mel spectrogram feature representation.
)


## Creating Hugging Face Dataset Objects for Audio Classification



In [None]:
# Load the validation dataset from the "audiofolder" format using the 'load_dataset' function.
# The 'val_path' variable represents the path to the directory containing the validation data.
val_dataset = load_dataset("audiofolder", data_dir=val_path).get('train')

# Load the test dataset from the "audiofolder" format using the 'load_dataset' function.
# The 'test_path' variable represents the path to the directory containing the test data.
test_dataset = load_dataset("audiofolder", data_dir=test_path).get('train')

# Convert the 'audio' column in the validation and test dataset to the 'Audio' data type with a specific 'sampling_rate'.
val_dataset = val_dataset.cast_column("audio", Audio(sampling_rate=MODEL_SAMPLING_RATE))
test_dataset = test_dataset.cast_column("audio", Audio(sampling_rate=MODEL_SAMPLING_RATE))


Resolving data files:   0%|          | 0/580 [00:00<?, ?it/s]



  0%|          | 0/1 [00:00<?, ?it/s]

Resolving data files:   0%|          | 0/557 [00:00<?, ?it/s]



  0%|          | 0/1 [00:00<?, ?it/s]

## Encode the validation and test dataset by applying the `preprocess_function` to each example.

In [None]:
# The 'preprocess_function' is applied with the argument 'path=0', indicating that function will not return 'file_name' column.
# The dataset is processed in batches of size 2 for efficiency ('batch_size=2').
val_dataset_encoded = val_dataset.map(
    lambda x: preprocess_function(x, path=0), remove_columns=["audio"], batched=True, batch_size=2)

# Set the format of the validation dataset to use Torch tensors ('type='torch'') and include all columns in the output ('output_all_columns=True').
val_dataset_encoded.set_format(type='torch', columns=['input_values'], output_all_columns=True)



# No specific 'path' argument is provided, so 'path' argument is 1(True) by default. It means function will return 'file_name' column.
# The dataset is processed in batches of size 2 for efficiency ('batch_size=2').
test_dataset_encoded = test_dataset.map(
    preprocess_function, remove_columns=["audio"], batched=True, batch_size=2)

# Set the format of the test dataset to use Torch tensors ('type='torch'') and include all columns in the output ('output_all_columns=True').
test_dataset_encoded.set_format(type='torch', columns=['input_values'], output_all_columns=True)

val_dataset_encoded, test_dataset_encoded



(Dataset({
     features: ['label', 'input_values'],
     num_rows: 579
 }),
 Dataset({
     features: ['input_values', 'file_name'],
     num_rows: 556
 }))

## Creating a Balanced Dataset

*Note: Due to more characteristics/patterns in validation test provided, we decided to train the model on all of the data train+validation provided.*

1. The dataset is constructed with 1320 examples, ensuring each of the 66 labels appears 20 times.
2. The 'audio' column in the dataset is initialized with empty strings, and the 'label' column is populated with integers from 0 to 65, repeating each label 20 times.
3. To enhance the empty dataset, metadata is loaded from a CSV file and concatenated with the corresponding file paths, forming the full paths to the audio files. This facilitates easy access to the data during model training.
4. The file paths are then grouped based on their labels, creating a DataFrame named 'balanced_df_list', which contains two columns: 'label' and 'path'.
5. The 'balanced_dataset' object is created using the 'balanced_dataset_dict', representing the balanced dataset, and a transform function, 'call_files', is set to process the audio data which applies chunk augmentation, extracts features using a feature extractor for each chunk and returns the processed inputs.

The 'output_all_columns=True' parameter ensures that all transformed columns are included in the output, making it ready for model training or evaluation.

In [None]:
random.seed(42) #for model reproducibility

# Create a dictionary 'balanced_dataset_dict' to represent a balanced dataset where each label appears 20 times, resulting in a total of 20 * 66 = 1320 examples.
balanced_dataset_dict = {
    'audio': ['']*20*66,
    'label': list(range(66))*20,
}

# Create a Dataset object 'balanced_dataset' from the 'balanced_dataset_dict'.
balanced_dataset = Dataset.from_dict(balanced_dataset_dict)

# Read the metadata from the CSV file located at f'{MY_PATH}/data/metadata.csv' into a pandas DataFrame 'all_data'.
all_data = pd.read_csv(f'{MY_PATH}/data/metadata.csv')

# Concatenate the string f'{MY_PATH}/' with each value in the 'path' column of 'all_data'.
all_data['path'] = f'{MY_PATH}/' + all_data['path']

# Group the DataFrame 'all_data' by the 'label' column and aggregate the 'path' column values into lists for each label.
balanced_df_list = all_data[['label','path']].groupby('label')['path'].apply(list)

# Set the transform function for 'balanced_dataset' to 'call_files'. This function will be applied to each example in 'balanced_dataset' to process the audio data and extract features.
balanced_dataset.set_transform(call_files, output_all_columns=True)

# Create a model configuration and an audio classification model

In [None]:
set_seed(42) #for model reproducibility

# Create a model configuration 'model_config' using the AutoConfig class, which is initialized with pretrained settings from 'THE_MODEL'.
model_config = AutoConfig.from_pretrained(
    THE_MODEL,
    num_labels=num_labels,       #the number of output labels for the audio classification task.
    label2id=label2id,
    id2label=id2label,
    num_hidden_layers=8,         #number of hidden layers
    ignore_mismatched_sizes=True #parameter is set to True, enabling the model to handle inputs of different sizes during inference.
    )


# Create an audio classification model 'model' using the AutoModelForAudioClassification class.
model = AutoModelForAudioClassification.from_pretrained(
    THE_MODEL,
    config=model_config,
    ignore_mismatched_sizes=True #parameter is set to True, enabling the model to handle inputs of different sizes during inference.
    )


Some weights of the model checkpoint at MIT/ast-finetuned-audioset-10-10-0.4593 were not used when initializing ASTForAudioClassification: ['audio_spectrogram_transformer.encoder.layer.8.attention.attention.key.weight', 'audio_spectrogram_transformer.encoder.layer.10.attention.attention.query.weight', 'audio_spectrogram_transformer.encoder.layer.10.intermediate.dense.bias', 'audio_spectrogram_transformer.encoder.layer.9.layernorm_after.weight', 'audio_spectrogram_transformer.encoder.layer.8.attention.attention.query.bias', 'audio_spectrogram_transformer.encoder.layer.11.output.dense.bias', 'audio_spectrogram_transformer.encoder.layer.8.layernorm_before.bias', 'audio_spectrogram_transformer.encoder.layer.9.intermediate.dense.weight', 'audio_spectrogram_transformer.encoder.layer.11.attention.attention.value.bias', 'audio_spectrogram_transformer.encoder.layer.9.layernorm_before.weight', 'audio_spectrogram_transformer.encoder.layer.10.layernorm_after.weight', 'audio_spectrogram_transformer

## Training and Evaluation Configuration for Audio Classification Model

In [None]:
from transformers import TrainingArguments

NUM_TRAIN_EPOCHS = 12                                                   # variable defining number of training epochs

training_args = TrainingArguments(
    output_dir=f'{MY_PATH}/models/final',       # directory for saving model checkpoints and logs
    num_train_epochs=NUM_TRAIN_EPOCHS,                                  # number of epochs
    per_device_train_batch_size=4,                                      # number of examples in batch for training
    per_device_eval_batch_size=4,                                       # number of examples in batch for evaluation
    evaluation_strategy="epoch",                                        # makes evaluation at the end of each epoch
    learning_rate=float(3e-5),                                          # learning rate
    optim="adamw_torch",                                                # optimizer
    logging_steps=1,                                                    # number of steps for logging the training process - one step is one batch
    load_best_model_at_end=True,                                        # whether to load or not the best model at the end of the training
    metric_for_best_model="eval_loss",                                  # claiming that the best model is the one with the lowest loss on the validation set
    save_strategy='epoch',                                              # saving is done at the end of each epoch
    gradient_accumulation_steps=8,                                      # the number of gradient accumulation steps to be used during training.
    remove_unused_columns=False,
)


trainer = Trainer(
    model=model,                                                        # passing our model
    args=training_args,                                                 # passing the above created arguments
    train_dataset=balanced_dataset,                                     # passing the balanced set
    eval_dataset=val_dataset_encoded,                                   # passing the encoded validation set
    tokenizer=feature_extractor,                                        # passing the feature extractor
    compute_metrics=compute_metrics,                                    # passing the compute_metrics function that we imported from gdsc_eval module
)


In [None]:
# layers=8, grad=8, lr=3e-5, batch_size=4, num_mel_bins=128
trainer.train()

# Prediction of test set

In [None]:
# Construct the path to the checkpoint directory based on the checkpoint number 'N' of your preference.
N = 495 #our best model
checkpoint = f'{MY_PATH}/models/final/checkpoint-{N}'

# Create a model configuration 'model_config_pred' using the 'AutoConfig' class and load it from the specified checkpoint directory.
model_config_pred = AutoConfig.from_pretrained(checkpoint, ignore_mismatched_sizes=True)

# Create an audio classification model 'model_pred' using the 'ASTForAudioClassification' class and load it from the specified checkpoint directory..
model_pred = AutoModelForAudioClassification.from_pretrained(checkpoint, config=model_config_pred, ignore_mismatched_sizes=True)


In [None]:
# Preprocess the test dataset by applying the 'preprocess_function_pred_chunks' function to each example.

test_dataset_encoded_pred_chunks = test_dataset.map(lambda x: preprocess_function_pred_chunks(x, model_pred),
                                                              remove_columns=["audio"],
                                                              batched=False,
                                                              batch_size=1)

# Convert the preprocessed test dataset to a pandas DataFrame, and select the columns 'file_name' and 'predicted_class_id'.
# The 'file_name' column contains the file names of the audio examples, and the 'predicted_class_id' column contains the predicted class IDs.
pred_pandas_df = test_dataset_encoded_pred_chunks.to_pandas()[['file_name', 'predicted_class_id']]

# Save the selected columns to a CSV file with a name containing the value of 'N'.
# The CSV file will be stored in the '/content/drive/MyDrive/gdsc/Kamran/models/final/' directory.
pred_pandas_df.to_csv(f'{MY_PATH}/Kamran/models/final/final_{N}.csv', index=False)
