In [5]:
from datasets import Dataset, Audio, ClassLabel, Features, load_dataset
import pandas as pd
import numpy as np
import os
import torchaudio
import torch.nn as nn
import torch
from pathlib import Path
source_dir = Path(r'C:\Users\pepij\OneDrive - Delft University of Technology\THESIS\data\csv')
path = source_dir.joinpath('first_try.csv')
metadata = pd.read_csv(path)
metadata
paqs = [
    "pleasant", "chaotic", "vibrant", "uneventful", 
    "calm", "annoying", "eventful", "monotonous"
]

# Before training — scale your labels
metadata[paqs] = (metadata[paqs] / 5.0)
metadata
from datasets import Audio
from transformers import ASTFeatureExtractor
def load_audio(row):
    waveform, sr = torchaudio.load(row['audio_path'])
    return {
        'filename': f"{row['GroupID']}.wav",
        'labels': {k: row[k] for k in ['pleasant', 'chaotic', 'vibrant', 'uneventful', 'calm', 'annoying', 'eventful', 'monotonous']},
        # 'labels': [
        #     row['pleasant'], row['chaotic'], row['vibrant'], row['uneventful'],
        #     row['calm'], row['annoying'], row['eventful'], row['monotonous']
        # ],
        'audio': {
            'path': row['audio_path'],
            'array': waveform.squeeze().numpy().reshape(-1),
            'sampling_rate': sr
        }
    }

# Step 1: Apply transformation to entire DataFrame
records = metadata.apply(load_audio, axis=1)

# Step 2: Apply to rows and collect results as list of dicts
records = [load_audio(row) for _, row in metadata.iterrows()]

# Step 3: Create a new DataFrame directly
new_df = pd.DataFrame(records)
dataset = Dataset.from_pandas(new_df)
new_df
dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))
dataset[0]
dataset[0]['audio']['array'].shape
# dataset[0]
# De oude format als je eerst numpy array zou maken maar is dus niet nodig zie hierboven
# Define the pretrained model and instantiate the feature extractor
pretrained_model = "MIT/ast-finetuned-audioset-10-10-0.4593"
feature_extractor = ASTFeatureExtractor.from_pretrained(pretrained_model)
model_input_name = feature_extractor.model_input_names[0]
SAMPLING_RATE = feature_extractor.sampling_rate
label_keys = ["pleasant", "chaotic", "vibrant", "uneventful", "calm", "annoying", "eventful", "monotonous"]

# Preprocessing function
def preprocess_audio(batch):
    wavs = [audio["array"] for audio in batch["input_values"]]
    inputs = feature_extractor(wavs, sampling_rate=SAMPLING_RATE, return_tensors="pt")
    labels = [[row[k] for k in label_keys] for row in batch["labels"]]
    return {model_input_name: inputs[model_input_name], "labels": torch.tensor(labels, dtype=torch.float32)}

# split training data
if "test" not in dataset:
    dataset = dataset.train_test_split(
        test_size=0.2, shuffle=True, seed=0)
dataset = dataset.cast_column("audio", Audio(sampling_rate=feature_extractor.sampling_rate))
dataset = dataset.rename_column("audio", "input_values")
# Apply transforms
dataset["train"].set_transform(preprocess_audio, output_all_columns=False)
dataset["test"].set_transform(preprocess_audio, output_all_columns=False)
dataset['test'][0]
import evaluate
from transformers import ASTConfig, ASTForAudioClassification, TrainingArguments, Trainer
# Load configuration from the pretrained model
config = ASTConfig.from_pretrained(pretrained_model)

# Modify the model's final layer for regression (8 outputs)
model = ASTForAudioClassification.from_pretrained(pretrained_model, config=config, ignore_mismatched_sizes=True)
model.classifier = nn.Linear(config.hidden_size, 8)
model.init_weights()
from transformers import Trainer
import torch.nn as nn

class RegressionTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        loss = nn.MSELoss()(logits, labels)
        return (loss, outputs) if return_outputs else loss


# Configure training arguments
training_args = TrainingArguments(
    output_dir="./runs/ast_regressor_test",
    logging_dir="./logs/ast_regressor_test",
    report_to="none",
    learning_rate=5e-2,  # Learning rate
    push_to_hub=False,
    num_train_epochs=3,  # Number of epochs
    per_device_train_batch_size=8,  # Batch size
    eval_strategy="epoch",  # Evaluate after each epoch
    save_strategy="no", # Set false to save time
    save_total_limit=2,
    load_best_model_at_end=False, # Set false to save time
    metric_for_best_model="rmse",
    logging_strategy="steps",
    logging_steps=5, # Decrease to save time
    remove_unused_columns=False,
)
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np

def compute_metrics(eval_pred):
    logits = eval_pred.predictions
    labels = eval_pred.label_ids

    mse_value = mean_squared_error(labels, logits)
    rmse_value = np.sqrt(mse_value)
    mae_value = mean_absolute_error(labels, logits)
    r2_value = r2_score(labels, logits)

    return {
        "mse": mse_value,
        "rmse": rmse_value,
        "mae": mae_value,
        "r2": r2_value
    }

# OPTIONAL
# If you ever want per-attribute RMSE for better model diagnostics:

# def compute_metrics(eval_pred):
#     logits = eval_pred.predictions
#     labels = eval_pred.label_ids

#     metrics = {}
#     for i, name in enumerate(["pleasant", "chaotic", "vibrant", "uneventful", "calm", "annoying", "eventful", "monotonous"]):
#         mse = mean_squared_error(labels[:, i], logits[:, i])
#         rmse = np.sqrt(mse)
#         metrics[f"{name}_rmse"] = rmse

#     metrics["rmse"] = np.sqrt(mean_squared_error(labels, logits))
#     return metrics

trainer = RegressionTrainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
    eval_dataset=dataset["test"],
    compute_metrics=compute_metrics,
)
trainer.train()

trainer.predict(dataset["test"])