In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))
        
# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Exploring the files

In [None]:
base_path = "/kaggle/input/deam-mediaeval-dataset-emotional-analysis-in-music"
for root, dirs, files in os.walk(base_path):
    print(f"\n📂 {root}")
    for f in files[:5]:  # preview 5 files per folder 
        print(f"    └── {f}")


# Importing libraries

In [None]:
import torch
import pickle
import torchaudio
import matplotlib.pyplot as plt
import seaborn as sns
import transformers
import librosa
import pandas as pd
import numpy as np
import os
import gc
from glob import glob
from torch.utils.data import Dataset,DataLoader
from sklearn.model_selection import train_test_split
from transformers import ASTModel, ASTConfig,ASTFeatureExtractor, Trainer, TrainingArguments
import torch.nn as nn
import torch.optim as optim # used in defining model optimization functions 
import torch.nn.functional as F

print("torch:", torch.__version__)
print("torchaudio:", torchaudio.__version__)
print("transformers:", transformers.__version__)
print("librosa:", librosa.__version__)



# Setting up dataframe


## Traversing the data

In [None]:
# Path to audio files
audio_dir = '/kaggle/input/deam-mediaeval-dataset-emotional-analysis-in-music/DEAM_audio/MEMD_audio'

# Paths to annotation CSVs (these have the target variables data)
ann_csv_1 = '/kaggle/input/deam-mediaeval-dataset-emotional-analysis-in-music/DEAM_Annotations/annotations/annotations averaged per song/song_level/static_annotations_averaged_songs_1_2000.csv'
ann_csv_2 = '/kaggle/input/deam-mediaeval-dataset-emotional-analysis-in-music/DEAM_Annotations/annotations/annotations averaged per song/song_level/static_annotations_averaged_songs_2000_2058.csv'

# Loading both CSVs and merging them on common columns
df1 = pd.read_csv(ann_csv_1)
df2 = pd.read_csv(ann_csv_2, usecols=['song_id', ' valence_mean',' valence_std',' arousal_mean',' arousal_std'])
# had to get rid of the other columns in the df2 before merging
labels_df = pd.concat([df1, df2], ignore_index=True)
# Creating an empty list to store the paths
audio_paths = []

# Loop through each song ID and manually build the path
for song_id in labels_df['song_id']:
    full_path = os.path.join(audio_dir, f"{song_id}.mp3")
    if os.path.exists(full_path):
        audio_paths.append(full_path)
    else:
        audio_paths.append(None)  # Just in case the file doesn't exist

# Add this list to the DataFrame
labels_df['audio_path'] = audio_paths

In [None]:
#Modifying column names to remove confusion
labels_df.rename(columns={' valence_mean':'valence_mean',' valence_std':'valence_std',' arousal_mean':'arousal_mean',' arousal_std':'arousal_std'}, inplace=True)
print(labels_df.head())

In [None]:
#Studying the first entry 
sample_path = labels_df.iloc[0]['audio_path']

# Load audio
waveform, sr = torchaudio.load(sample_path) #the load function returns waveform with sample rate
# the sample rate came out to be 44100 
waveform = waveform.mean(dim=0) #The audio is stereo i.e. separate left and right channel so we merge them using mean
print(f"Sample rate: {sr}, Shape: {waveform.shape}")

# Plot waveform
plt.figure(figsize=(10, 3))
plt.plot(waveform.numpy())
plt.title('Waveform')
plt.show()

# Show spectrogram
spec = torchaudio.transforms.MelSpectrogram(sample_rate=sr)(waveform)
spec_db = torchaudio.transforms.AmplitudeToDB()(spec)

plt.figure(figsize=(10, 4))
plt.xlabel('Time')
plt.ylabel('Frequency(in mels)')
plt.imshow(spec_db.numpy(), aspect='auto', origin='lower')
plt.title('Mel Spectrogram (dB)')
plt.colorbar()
plt.show()

In [None]:
naming functions so we can easily call them later
sr=44100 #observed from the analysis of one of the entries of the spectrogram
mel_extractor = torchaudio.transforms.MelSpectrogram(sample_rate=sr) # converts to spectrogram
to_db = torchaudio.transforms.AmplitudeToDB() #the spectrogram orginally measures the intensity values differently so we convert them into dB for human-interpretability

In [None]:
#naming functions so we can easily call them later
sr=44100 #observed from the analysis of one of the entries of the spectrogram
mel_extractor = torchaudio.transforms.MelSpectrogram(sample_rate=sr) # converts to spectrogram
to_db = torchaudio.transforms.AmplitudeToDB() #the spectrogram orginally measures the intensity values differently so we convert them into dB for huam-interpretability

# Create empty lists to store features and labels
# these will house the spectrograms from each file after conversion
spec_list = []
valence_targets = []
arousal_targets = []

for idx, row in labels_df.iterrows():
    audio_path = row['audio_path']
    
    try:
        waveform, sr = torchaudio.load(audio_path)
        waveform = waveform.mean(dim=0)  # convert stereo to mono

        # Convert to Mel Spectrogram in dB
        spec = mel_extractor(waveform)
        spec_db = to_db(spec)

        spec_list.append(spec_db)
        valence_targets.append(row['valence_mean'])
        arousal_targets.append(row['arousal_mean'])
    
    except Exception as e:
        print(f"Failed for index {idx} - {e}")

In [None]:
#EDA to analyse which samples can be safely dropped and what should be the trucation level of our spectrograms to maintain uniformity in their lengths

lengths=[]
for spec in spec_list:
    lengths.append(spec.shape[1])
max_length = max(lengths)
min_length = min(lengths)

print(f"Max number of time frames: {max_length}")
print(f"Min number of time frames: {min_length}")


# Assuming `lengths` is a list or array of spectrogram frame lengths
lengths_array = np.array(lengths)

# Plot histogram
counts, bin_edges, _ = plt.hist(lengths_array, bins=800, edgecolor='black')
plt.title('Histogram of Spectrogram Time Frame Lengths')
plt.xlabel('Number of Time Frames')
plt.ylabel('Number of Samples')
plt.grid(True)
plt.show()

# Find the bin with the maximum count
max_bin_index = np.argmax(counts)
max_bin_range = (bin_edges[max_bin_index], bin_edges[max_bin_index + 1])
max_bin_count = counts[max_bin_index]

print(f"Most dense bin range: {max_bin_range}")
print(f"Number of samples in this bin: {int(max_bin_count)}")

Since 1718/1802 samples lie in this bin, we will follow the following methodology for data preprocessing: 
1) we drop the samples that are shorter than 9830.
2) We truncate the rest to 9830 length.

In [None]:
fixed_length = 9830

# Create empty lists to store valid data
clean_spec_list = []
clean_valence_targets = []
clean_arousal_targets = []
clean_paths = []

for idx, row in labels_df.iterrows():
    audio_path = row['audio_path']
    
    try:
        waveform, sr = torchaudio.load(audio_path)
        waveform = waveform.mean(dim=0)  # stereo → mono

        # Mel Spectrogram in dB
        spec = mel_extractor(waveform)
        spec_db = to_db(spec)

        # Only keep if it's long enough
        if spec_db.shape[1] >= fixed_length:
            truncated = spec_db[:, :fixed_length]  # Truncate if longer
            clean_spec_list.append(truncated)
            clean_valence_targets.append(row['valence_mean'])
            clean_arousal_targets.append(row['arousal_mean'])
            clean_paths.append(audio_path)
        
    except Exception as e:
        print(f"Failed for index {idx} - {e}")


In [None]:
#Uncomment if you want to save these files for later use

#SAVING  the lists formed above:
# os.makedirs("/kaggle/working/clean_stuff", exist_ok=True)

# torch.save(clean_spec_list, "/kaggle/working/clean_stuff/clean_spec_list.pt")

# with open("/kaggle/working/clean_stuff/clean_valence_targets.pkl", "wb") as f:
#     pickle.dump(clean_valence_targets, f)

# with open("/kaggle/working/clean_stuff/clean_arousal_targets.pkl", "wb") as f:
#     pickle.dump(clean_arousal_targets, f)

# with open("/kaggle/working/clean_stuff/clean_paths.pkl", "wb") as f:
#     pickle.dump(clean_paths, f)

In [None]:
#to load later: 
# clean_spec_list = torch.load("/kaggle/working/clean_stuff/clean_spec_list.pt")
# with open("/kaggle/working/clean_stuff/clean_valence_targets.pkl", "rb") as f:
#     clean_valence_targets = pickle.load(f)

# with open("/kaggle/working/clean_stuff/clean_arousal_targets.pkl", "rb") as f:
#     clean_arousal_targets = pickle.load(f)

# with open("/kaggle/working/clean_stuff/clean_paths.pkl", "rb") as f:
#     clean_paths = pickle.load(f)

In [None]:
#making new dataframe out of the clean stuff
filtered_labels_df = pd.DataFrame({
    'audio_path': clean_paths,
    'valence_mean': clean_valence_targets,
    'arousal_mean': clean_arousal_targets
})

In [None]:
# to save the above created df
# filtered_labels_df.to_csv("filtered_labels_df.csv",index=False)

In [None]:
# to load it
# filtered_labels_df=pd.read_csv("/kaggle/input/filtered-labels-df/filtered_labels_df.csv")

In [None]:
#our ratings were in range [1,9], we normalize them to [0,1]
normalized_valence_targets = (filtered_labels_df['valence_mean'].values - 1) / 8
normalized_arousal_targets = (filtered_labels_df['arousal_mean'].values - 1) / 8
# later we can use: valence_pred_original = valence_pred * 8 + 1 to interpret our results

In [None]:
# #Saving normalized labels:

# with open("/kaggle/working/normalized_valence_targets.pkl", "wb") as f:
#     pickle.dump(normalized_valence_targets, f)

# with open("/kaggle/working/normalized_arousal_targets.pkl", "wb") as f:
#     pickle.dump(normalized_arousal_targets, f)


In [None]:
#Z-score normalization (preferred for transformers over min-max normalization)
# Step 1: Stack all spectrograms to compute global mean and std
all_specs = torch.cat([spec.flatten() for spec in clean_spec_list])
global_mean = all_specs.mean()
global_std = all_specs.std()

print(f"Global mean: {global_mean:.4f}, Global std: {global_std:.4f}")

# Step 2: Normalize each spectrogram
normalized_spec_list = [(spec - global_mean) / global_std for spec in clean_spec_list]

In [None]:
#SAVING NORMALIZED LIST
# torch.save(normalized_spec_list, "/kaggle/working/normalized_spec_list.pt")

In [None]:
#LOAD normalized list
# normalized_spec_list = torch.load("/kaggle/input/normalized-spec-listtt/normalized_spec_list.pt")

In [None]:
normalized_spec_list[0].shape

In [None]:
# we need to truncate the last dimension such that it is a multiple of 16 because the model uses patch size 16x16
for i in range(len(normalized_spec_list)):
    normalized_spec_list[i] = normalized_spec_list[i][:, :9216]

In [None]:
normalized_spec_list[0].shape

In [None]:
# our transformer needs input with dimension (128,1024), so we break ours down into 9 chunks and increase dataset
chunk_size = 1024
num_chunks = 9

# Final lists
segmented_specs = []
segmented_valence = []
segmented_arousal = []
for i in range(len(normalized_spec_list)):
    spec = normalized_spec_list[i]  # shape: [128, 9216]
    valence = normalized_valence_targets[i]
    arousal = normalized_arousal_targets[i]

    for j in range(num_chunks):
        start = j * chunk_size
        end = start + chunk_size
        segment = spec[:, start:end]  # shape: [128, 1024]

        segmented_specs.append(segment)
        segmented_valence.append(valence)
        segmented_arousal.append(arousal)

# # LETS START WITH DATASET PREPARATION 

In [None]:
class EmotionAudioDataset(Dataset):
    def __init__(self, specs, labels):
        self.specs = specs
        self.labels = labels

    def __len__(self):
        return len(self.specs)

    def __getitem__(self, idx):
        spec = torch.tensor(self.specs[idx], dtype=torch.float32)  # [128, 1024]
        spec = spec.unsqueeze(0)  # Add channel dim: [1, 128, 1024]
        label = torch.tensor(self.labels[idx], dtype=torch.float32)  # [2]
        return {"input_values": spec, "labels": label}

In [None]:
#Train test val split: 
# Combine valence and arousal into a tuple of labels
all_labels = list(zip(segmented_valence, segmented_arousal))

#zip combines the two values in a vector and makes a list out of them

# First split: Train vs Temp (val + test)
X_train, X_temp, y_train, y_temp = train_test_split(
    segmented_specs, all_labels, test_size=0.2, random_state=42
)

# Second split: Validation vs Test
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42
)

In [None]:
train_dataset = EmotionAudioDataset(X_train, y_train)
val_dataset = EmotionAudioDataset(X_val, y_val)
test_dataset = EmotionAudioDataset(X_test, y_test)

In [None]:

batch_size = 16  # decided after multiple iterations

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)

# MODEL IN PROGRESS


In [None]:

class ASTForEmotionRegression(nn.Module):
    def __init__(self, pretrained_model_name='MIT/ast-finetuned-audioset-10-10-0.4593'):
        super().__init__()
        self.ast = ASTModel.from_pretrained(pretrained_model_name)

        self.regression_head = nn.Sequential(
            nn.Linear(self.ast.config.hidden_size, 128),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(128, 2)  # Output: [valence, arousal]
        )
        
    def forward(self, input_values):
        """
        input_values: Tensor of shape [B, 1, 128, 9824]
        """
        # Flatten to [B, 128, 1024]
        input_values = input_values.squeeze(1)

        # AST expects inputs_embeds of shape [B, F, T]
        outputs = self.ast(input_values=input_values)
        hidden_states = outputs.last_hidden_state  # [B, T_patches, H]

        pooled = hidden_states.mean(dim=1)  # [B, H]
        output = self.regression_head(pooled)  # [B, 2]
        return output


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Instantiate model
model = ASTForEmotionRegression()
model.to(device)

# Define loss function
criterion = nn.MSELoss()  # Mean Squared Error for regression

# Define optimizer
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [None]:

num_epochs = 15

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for batch in train_loader:
        specs = batch["input_values"].to(device)
        labels = batch["labels"].to(device)

        optimizer.zero_grad()
        outputs = model(specs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    avg_train_loss = running_loss / len(train_loader)

    # Validation phase
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for batch in val_loader:
            specs = batch["input_values"].to(device)
            labels = batch["labels"].to(device)
            outputs = model(specs)
            loss = criterion(outputs, labels)
            val_loss += loss.item()

    avg_val_loss = val_loss / len(val_loader)

    print(f"Epoch {epoch+1}/{num_epochs} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")


In [None]:
# torch.save(model.state_dict(), "ast_emotion_regression_moresamples_batch16_epochs15.pth")

In [None]:
# torch.save(model, "ast_emotion_regression_full_moresamples_batch16_epochs15.pt")

In [None]:
# TO directly, use the saved weights of the model, uncomment this 
# model = ASTForEmotionRegression()

# # Step 3: Load the weights
# state_dict = torch.load("/kaggle/input/ast_emotion_detection/pytorch/default/1/ast_emotion_regression_moresamples_batch16_epochs10.pth", map_location="cpu")
# model.load_state_dict(state_dict)

# # Define loss function
# criterion = nn.MSELoss()  # Mean Squared Error for regression

# # Define optimizer
# optimizer = optim.Adam(model.parameters(), lr=1e-4)
# # Step 4: Move model to device
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model.to(device)


In [None]:
model.eval()
test_loss = 0.0
all_preds = []
all_targets = []

with torch.no_grad():
    for batch in test_loader:
        specs = batch["input_values"].to(device)
        labels = batch["labels"].to(device)
        outputs = model(specs)

        test_loss += criterion(outputs, labels).item()

        all_preds.append(outputs.cpu())
        all_targets.append(labels.cpu())

avg_test_loss = test_loss / len(test_loader)
print(f"Test MSE Loss: {avg_test_loss:.4f}")

In [None]:
import torch
from sklearn.metrics import mean_squared_error, mean_absolute_error

model.eval()
preds = []
labels = []

with torch.no_grad():
    for batch in test_loader:
        inputs = batch["input_values"].to(device)
        targets = batch["labels"].to(device)

        outputs = model(inputs)

        preds.append(outputs.cpu())
        labels.append(targets.cpu())

# Stack into numpy arrays
preds = torch.cat(preds).numpy()
labels = torch.cat(labels).numpy()

# Compute metrics
valence_mse = mean_squared_error(labels[:, 0], preds[:, 0])
arousal_mse = mean_squared_error(labels[:, 1], preds[:, 1])

valence_mae = mean_absolute_error(labels[:, 0], preds[:, 0])
arousal_mae = mean_absolute_error(labels[:, 1], preds[:, 1])

print(f"Valence MSE: {valence_mse:.4f} | MAE: {valence_mae:.4f}")
print(f"Arousal MSE: {arousal_mse:.4f} | MAE: {arousal_mae:.4f}")
