# Import librerie

In [None]:
!git clone https://github.com/marika-rago/projectID2ML
%cd projectID2ML

In [None]:
import os
import datetime
import pprint
import random
import warnings
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import autocast, GradScaler
import torchaudio
import torchaudio.transforms as T
import librosa
import soundfile as sf
import matplotlib.pyplot as plt
import scipy.signal
from IPython.display import Audio, display
from torch import amp
from tqdm.notebook import tqdm
from scipy.signal import get_window

In [None]:
warnings.filterwarnings("ignore", message=".*TorchCodec.*", category=UserWarning)
warnings.filterwarnings("ignore", message=".*StreamingMediaDecoder.*", category=UserWarning)
warnings.filterwarnings("ignore", message=".*torch.hann_window.*", category=UserWarning)
warnings.filterwarnings("ignore", category=UserWarning, module="librosa")
warnings.filterwarnings("ignore", category=FutureWarning, module="librosa")
warnings.filterwarnings("ignore", message="PySoundFile failed. Trying audioread instead.")

warnings.filterwarnings("ignore")
os.environ["FFMPEG_LOG_LEVEL"]="quiet"

# Riproducibilità

Definiamo i parametri di configurazione principali e fissiamo un `seed` globale per la riproducibilità. Questo permette di avere risultati coerenti tra le varie esecuzioni.

Nel dizionario `CONFIG` ci sono gli iperparametri comuni a tutta la pipeline, tra cui:
- **Frequenza di campionamento** (`sample_rate`)
- **Modalità di padding** (`pad_mode`): con "reflect", le estremità del segnale vengono riflesse, riducendo gli artefatti ai bordi.
- **Parametri della STFT** (`n_fft`, `hop_length`, `win_length`, `window`)  
- **Parametri di normalizzazione** (`top_db`, `epsilon`)  
- **Frazione del dataset** utilizzata per l’esperimento  
- **Seed** per la riproducibilità  

Alla fine selezioniamo automaticamente il dispositivo di calcolo (GPU se disponibile).


In [None]:
# Riproducibilità
def set_seed(seed=42):
    random.seed(seed) # seed per random
    np.random.seed(seed) # seed per NumPy
    torch.manual_seed(seed) # seed per PyTorch (CPU)
    torch.cuda.manual_seed_all(seed) # seed per cuda
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    print(f"Seed globale impostato a {seed}")


>**ATTENZIONE!!!**
>
>All'interno del dizionario CONFIG è presente la voce `skip_training`. Se si >vuole eseguire il training cambiare la voce in `False` (di base è settata a `True`).



In [None]:
CONFIG = {
    "sample_rate": 32000,     # Frequenza di campionamento audio
    "pad_mode": "reflect",    # Modalità di padding
    "n_fft": 1024,            # Dimensione della FFT
    "hop_length": 256,        # Passo tra due finestre consecutive
    "win_length": 1024,       # Lunghezza effettiva della finestra di analisi
    "window": "hann",         # Tipo di finestra (Hann per ridurre le discontinuità)
    "center": True,           # Centra ogni frame rispetto al segnale originale
    "top_db": 80.0,           # Range dinamico per la conversione in dB
    "epsilon": 1e-8,          # Termine di sicurezza per evitare divisioni per zero
    "seed": 42,               # Seed di riproducibilità
    "dataset_fraction": 0.16, # Percentuale del dataset usata
    "skip_training": True,    # Salto del training loop se è true
}

set_seed(CONFIG["seed"])

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Download Dataset

Per il training e la valutazione del mio modello è stato utilizzato il dataset **FMA**, in particolare la versiona *fma_small*, che contiene $8.000$ clip da $30$ secondi ciascuna in formati $.mp3$.

Quello che facciamo nelle seguenti celle è:

1.   scaricare e decomprimere il dataset FMA;
2.   contare il numero di file $.mp3$ per capire quanti utilizzarne;
3.   selezionare casualmente una porzione del dataset, definita con il parametro `CONFIG["dataset_fraction"]`;
4.   dividiamo i file selezionati in tre sottoinsimi, train_files, val_files e test_files;
5.   salviamo sul disco la lista dei file selezionati.   





In [None]:
%cd /content
!wget https://os.unil.cloud.switch.ch/fma/fma_small.zip
!unzip fma_small.zip

In [None]:
root_dir = "/content/fma_small"  # cartella che contiene il dataset compresso

# Conta tutti i file .mp3 nelle sottocartelle
count = sum(len([f for f in files if f.endswith(".mp3")]) for _, _, files in os.walk(root_dir))

print(f"Numero totale di file MP3: {count}")

In [None]:
seed = CONFIG["seed"]
fraction = CONFIG["dataset_fraction"]

# Imposta il seed per la riproducibilità
random.seed(seed)

selected_files = []

# Scorri tutte le sottocartelle e seleziona un tot di file per ciascuna
for subdir, _, files in os.walk(root_dir):
    mp3_files = [os.path.join(subdir, f) for f in files if f.endswith(".mp3")]
    if not mp3_files: # Salta le cartelle vuote
        continue
    n_select = max(1, int(len(mp3_files) * fraction))
    chosen = random.sample(mp3_files, n_select)
    selected_files.extend(chosen)

print(f"Totale file selezionati: {len(selected_files)} ({fraction*100:.0f}% del dataset)")

# Salva la lista su file
with open("selected_files.txt", "w") as f:
    for path in selected_files:
        f.write(path + "\n")

print("Lista salvata in 'selected_files.txt'")

In [None]:
# Mischia la lista in modo riproducibile
random.seed(42)
random.shuffle(selected_files)

# Suddivisione (907 + 150 + 150 = 1207)
train_files = selected_files[:907]
val_files = selected_files[907:1057]
test_files = selected_files[1057:]

print(f"Train files: {len(train_files)}")
print(f"Val files: {len(val_files)}")
print(f"Test files: {len(test_files)}")


## Classe Degradazioni

La classe `AudioDegradationDataset` va a definire il dataset che poi utilizziamo per addestrare il modello.
L'obiettivo di questa classe è simulare vari tipi di degradazioni audio realistiche a partire dalle clip pulite del datset FMA. In questo modo avremo coppie *(degraded, clean)* su cui il modello deve imparare a ricostruire l'audio eliminando gli artefatti.

Le degradazioni vengono applicate dinamicamente ad ogni clip, aumentando così la variabilità dei dati.

---

**Struttura della Classe**


1. *Caricamento e segmentazione* :
  ogni clip viene caricata a $32 kHz$ e ridotta a $3$ secondi, in modo da avere input di lunghezza breve e non appesantire il training. Se la clip è più corta da $3$ secondi viene riempita con zeri (padded). Inoltre i segmenti vengono normalizzati.

2. *identity_prob* :
  con una probabilità del $15\%$ la clip non viene degradata e rimane uguale a clean. Questo serve per far capire al modello che non deve alterare segnali che sono già buoni di partenza.

3. *Degradazioni* :
  il metodo `degradation` applica casualmente una delle degradazioni specificate in `degradation_types`. Le varie degradazioni che possono essere applicate sono:

      * **Quantizzazione (`quantize`)**: simula la riduzione delle profondità di bit, come avviene nei formati compressi. L'audio viene scalato e arrotondato a una risoluzione di 6, 8 o 10 bit, viene inoltre aggiunto un piccolo `dither` per evitare quantizzazione troppo regolare.

      * **Low-pass (`lowpass`)**: simula la perdita delle alte frequenze, l'effetto è che il suono risulta più "ovattato". Applica un filtro passa-basso con frequenza di taglio casuale tra $2.5$ e $7$  $kHz$.

      * **Clipping (`clipping`)**: simila la distorsione da saturazione del segnale. Vengono troncati tutti i valori oltre una soglia casuale tra $0.6$ e $0.9$.

      * **Rumore (`noise`)**: simula un rumore di fondo bianco o rosa, l'effetto è un "fruscio" costante.

      * **Reverbero (`reverb`)**: simula l'effetto di una stanza. Viene creato una specie di eco con decadimento esponenziale.

      * **Distorsione armonica (`distort`)**: simula l'effetto di saturazione dei dispositivi analogici. Applica una non linearita, `tanh`.

      * **Tonal Stripes (`tonal_stripes`)**: simula inferenza sinusoidali periodiche. Genera alcune sinusoidi a frequenze casuali (tra $200$ e $800$ $Hz$) e le somma al segnale con ampiezza limitata.

4. *Conversione in spettrogrammi lineari* :  
  Dopo la degradazione, entrambi i segnali (pulito e degradato) vengono trasformati tramite STFT in spettrogrammi lineari di ampiezza.
  Le magnitudini vengono convertite in decibel (dB) e poi normalizzate tra 0 e 1. I risultati vengono infine convertiti in tensori PyTorch, pronti per essere utilizzati nel modello.





In [None]:
class AudioDegradationDataset(Dataset):

    def __init__(self, audio_files, sample_rate=32000, segment_length=2.0,
                 degradation_types=['quantize', 'lowpass', 'noise', 'tonal_stripes'],
                 identity_prob=0.15,
                 deterministic=False,
                 seed=42):
        self.audio_files = audio_files
        self.sr = sample_rate
        self.segment_samples = int(segment_length * sample_rate)
        self.degradation_types = degradation_types
        self.identity_prob = identity_prob
        self.deterministic = deterministic # Se True blocca la randomizzazione
        self.seed = seed

    def __len__(self):
        return len(self.audio_files)


    # Tonal stripes
    def add_tonal_stripes(self, audio, sr, num_tones=3, amp_range=(0.02, 0.05),
                          min_freq=200, max_freq=8000):
        n = len(audio)
        t = np.linspace(0, n / sr, n, endpoint=False)
        audio = audio.copy()

        for _ in range(random.randint(1, num_tones)):
            freq = random.uniform(min_freq, max_freq)
            amp = random.uniform(*amp_range)
            stripe = np.sin(2 * np.pi * freq * t)

            # Applica dissolvenze casuali
            if random.random() < 0.5:
                fade_len = random.randint(int(0.2*n), int(0.6*n))
                start = random.randint(0, n - fade_len)
                fade = np.linspace(0, 1, fade_len)
                stripe[start:start+fade_len] *= fade[::-1] if random.random() < 0.5 else fade

            audio += amp * stripe

        # Evita saturazione
        return np.clip(audio, -1.0, 1.0)


    # Degradazione
    def degradation(self, audio):
        x = audio.copy()

        # Scelta casuale
        degradation = random.choice(self.degradation_types)

        # Quantizzazione
        if degradation == 'quantize':
            bits = np.random.choice([6, 8, 10])
            q = 2 ** bits
            dither = np.random.uniform(-1/q, 1/q, size=x.shape)
            x = np.round(x * q) / q + dither

        # Low-pass
        elif degradation == 'lowpass':
            cutoff = float(np.random.uniform(2500, 7000))
            xt = torch.from_numpy(x).unsqueeze(0)
            xt = torchaudio.functional.lowpass_biquad(xt, self.sr, cutoff)
            x = xt.squeeze(0).numpy()

        # Clipping
        elif degradation == 'clipping':
            thr = float(np.random.uniform(0.6, 0.9))
            x = np.clip(x, -thr, thr)

        # Rumore bianco o rosa
        elif degradation == 'noise':
            if np.random.rand() < 0.5:
                std = np.random.uniform(0.01, 0.05)
                x += np.random.randn(len(x)) * std
            else:
                white = np.random.randn(len(x))
                b = np.cumsum(white)
                pink = b / np.max(np.abs(b))
                std = np.random.uniform(0.01, 0.04)
                x += pink * std

        # Reverbero
        elif degradation == 'reverb':
            decay = np.random.uniform(0.3, 0.9)
            ir_len = np.random.randint(2000, 6000)
            ir = np.exp(-np.linspace(0, decay, ir_len))
            x = np.convolve(x, ir, mode='same')
            x = x / (np.max(np.abs(x)) + 1e-8)

        # Distorsione
        elif degradation == 'distort':
            gain = np.random.uniform(1.5, 3.0)
            x = np.tanh(x * gain)

        # Tonal stripes
        elif degradation == 'tonal_stripes':
            x = self.add_tonal_stripes(x, self.sr)

        return np.clip(x, -1.0, 1.0)


    # Caricamento di un file audio
    def file_load(self, path):
        try:
            audio, _ = librosa.load(path, sr=self.sr, mono=True)
            return audio

        except Exception:
            from pydub import AudioSegment
            audio = AudioSegment.from_file(path)
            samples = np.array(audio.get_array_of_samples()).astype(np.float32)
            samples = samples / (np.max(np.abs(samples)) + 1e-8)
            return samples


    # Get item
    def __getitem__(self, idx):

        # Se deterministic=True imposta un seed fisso
        if self.deterministic:
            np.random.seed(self.seed + idx)
            random.seed(self.seed + idx)

        path = self.audio_files[idx]

        # Gestisce eventuali errori nel caricamento del file audio
        try:
            audio = self.file_load(path)
        except Exception as e:
            print(f"[WARN] Errore nel file {path}: {e}")
            new_idx = (idx + 1) % len(self.audio_files)
            return self.__getitem__(new_idx)

        # Segmentazione
        if len(audio) > self.segment_samples:
            start = np.random.randint(0, len(audio) - self.segment_samples)
            audio = audio[start:start + self.segment_samples]
        else:
            audio = np.pad(audio, (0, self.segment_samples - len(audio)))

        # Normalizzazione
        max_val = np.max(np.abs(audio)) + CONFIG["epsilon"]
        audio = audio / max_val

        # Identità stocastica
        if np.random.rand() < self.identity_prob:
            degraded = audio.copy()
        else:
            degraded = self.degradation(audio)

        # STFT
        win = scipy.signal.get_window(CONFIG["window"], CONFIG["win_length"], fftbins=True)
        S_clean = librosa.stft(audio, n_fft=CONFIG["n_fft"], hop_length=CONFIG["hop_length"],
                               win_length=CONFIG["win_length"], window=win,
                               center=CONFIG["center"], pad_mode=CONFIG["pad_mode"])
        S_deg = librosa.stft(degraded, n_fft=CONFIG["n_fft"], hop_length=CONFIG["hop_length"],
                             win_length=CONFIG["win_length"], window=win,
                             center=CONFIG["center"], pad_mode=CONFIG["pad_mode"])

        # Magnitudine e conversione in dB
        Mag_clean = np.abs(S_clean)
        Mag_deg = np.abs(S_deg)

        S_db_clean = librosa.amplitude_to_db(np.maximum(Mag_clean, CONFIG["epsilon"]),
                                             ref=np.max, top_db=CONFIG["top_db"])
        S_db_deg = librosa.amplitude_to_db(np.maximum(Mag_deg, CONFIG["epsilon"]),
                                           ref=np.max, top_db=CONFIG["top_db"])

        # Normalizzazione su [0, 1]
        Mag_clean_01 = np.clip((S_db_clean + CONFIG["top_db"]) / CONFIG["top_db"], 0.0, 1.0)
        Mag_deg_01 = np.clip((S_db_deg + CONFIG["top_db"]) / CONFIG["top_db"], 0.0, 1.0)

        # Conversione in tensori PyTorch
        clean_spec = torch.from_numpy(Mag_clean_01).unsqueeze(0).float()
        degraded_spec = torch.from_numpy(Mag_deg_01).unsqueeze(0).float()

        return degraded_spec, clean_spec


In [None]:
# Gruppi di degradazioni
gruppo_A = ['quantize', 'tonalstripes', 'noise']
gruppo_B = ['clipping', 'reverb', 'lowpass', 'distort']

### Dataset Gruppo A

In [None]:
# Richiamiamo la classe AudioDegradationDataset sulle degradazioni del gruppo A
train_dataset_A = AudioDegradationDataset(
    train_files,
    sample_rate=32000,
    segment_length=3.0,
    degradation_types=gruppo_A,
    deterministic=False
)
val_dataset_A = AudioDegradationDataset(
    val_files,
    sample_rate=32000,
    segment_length=3.0,
    degradation_types=gruppo_A,
    deterministic=True, seed=23
)
test_dataset_A = AudioDegradationDataset(
    test_files,
    sample_rate=32000,
    segment_length=3.0,
    degradation_types=gruppo_A,
    deterministic=True,
    seed=19
)

print(f"Training samples: {len(train_dataset_A)}")
print(f"Validation samples: {len(val_dataset_A)}")
print(f"Test samples: {len(test_dataset_A)}")


### Dataset Gruppo B

In [None]:
# Richiamiamo la classe AudioDegradationDataset sulle degradazioni del gruppo B
train_dataset_B = AudioDegradationDataset(
    train_files,
    sample_rate=32000,
    segment_length=3.0,
    degradation_types=gruppo_B,
    deterministic=False
)
val_dataset_B = AudioDegradationDataset(
    val_files,
    sample_rate=32000,
    segment_length=3.0,
    degradation_types=gruppo_B,
    deterministic=True,
    seed=23
)
test_dataset_B = AudioDegradationDataset(
    test_files,
    sample_rate=32000,
    segment_length=3.0,
    degradation_types=gruppo_B,
    deterministic=True,
    seed=19
)

print(f"Training samples: {len(train_dataset_B)}")
print(f"Validation samples: {len(val_dataset_B)}")
print(f"Test samples: {len(test_dataset_B)}")


## Visualizzazione esempio

Con la funzione `show_example_spectrogram` visualizziamo la rappresentazione spettrale (STFT lineare) dei segnali clean e degraded.
Gli spettrogrammi sono visualizzati in scala di intensità normalizzata tra
$[0, 1]$ ottenuta dalla conversione in decibel (dB).

Sull'asse delle $x$ abbiamo il tempo (frame), sull'asse delle $y$ abbiamo la frequenza.

---

Con la funzione `reconstruct_audio` ricostruiamo un segnale audio (che possiamo sentire) a partire dallo spettrogramma di magnitudine.
Poiché la STFT utilizzata nel dataset non conserva la fase, usiamo l’algoritmo di Griffin-Lim, che stima iterativamente una fase coerente a partire dalla magnitudine.

Questa fase permette di valutare in modo percettivo la qualità delle degradazioni.

In [None]:
def show_example_spectrogram(dataset, idx=0):

    # Estrae la coppia (degraded, clean)
    degraded_sp, clean_sp = dataset[idx]

    # Rimuove la dimensione del canale
    clean_sp = clean_sp.squeeze().numpy()
    degraded_sp = degraded_sp.squeeze().numpy()

    print(f"Shape clean_spec: {clean_sp.shape} | Shape degraded_spec: {degraded_sp.shape}")

    fig, axes = plt.subplots(2, 1, figsize=(12, 8))

    # Plot clean
    im1 = axes[0].imshow(clean_sp, aspect='auto', origin='lower')
    axes[0].set_title('Clean Linear Spectrogram (dB --> [0,1])'); fig.colorbar(im1, ax=axes[0])

    # Plot degraded
    im2 = axes[1].imshow(degraded_sp, aspect='auto', origin='lower')
    axes[1].set_title('Degraded Linear Spectrogram (dB --> [0,1])'); fig.colorbar(im2, ax=axes[1])

    plt.tight_layout();
    plt.show()

In [None]:
def reconstruct_audio(dataset, idx=0, n_iter=32):

    # Estrae la coppia (degraded, clean)
    degraded_spec, clean_spec = dataset[idx]

    # Converti i tensori in numpy [F, T]
    degraded_spec = degraded_spec.squeeze(0).numpy()
    clean_spec = clean_spec.squeeze(0).numpy()

    # Da [0,1] --> dB --> ampiezza
    S_db_clean = clean_spec * CONFIG["top_db"] - CONFIG["top_db"]
    S_db_degraded = degraded_spec * CONFIG["top_db"] - CONFIG["top_db"]

    Mag_clean = librosa.db_to_amplitude(S_db_clean)
    Mag_degraded = librosa.db_to_amplitude(S_db_degraded)

    # Griffin-Lim per ricostruire fase e segnale
    win = scipy.signal.get_window(CONFIG["window"], CONFIG["win_length"], fftbins=True)

    y_clean = librosa.griffinlim(
        Mag_clean,
        n_iter=n_iter,
        hop_length=CONFIG["hop_length"],
        win_length=CONFIG["win_length"],
        window=win,
        center=CONFIG["center"]
    )

    y_degraded = librosa.griffinlim(
        Mag_degraded,
        n_iter=n_iter,
        hop_length=CONFIG["hop_length"],
        win_length=CONFIG["win_length"],
        window=win,
        center=CONFIG["center"]
    )

    # Normalizza
    y_clean = y_clean / (np.max(np.abs(y_clean)) + CONFIG["epsilon"])
    y_degraded = y_degraded / (np.max(np.abs(y_degraded)) + CONFIG["epsilon"])

    # Ascolto
    print(f"Clean ricostruito con Griffin-Lim ({n_iter} iterazioni):")
    display(Audio(y_clean, rate=CONFIG["sample_rate"]))

    print(f"Degraded ricostruito con Griffin-Lim ({n_iter} iterazioni):")
    display(Audio(y_degraded, rate=CONFIG["sample_rate"]))

    # Per confronto, ascolta anche la versione originale dal dataset
    path = dataset.audio_files[idx]
    original_audio = dataset.file_load(path)
    max_val = np.max(np.abs(original_audio)) + CONFIG["epsilon"]
    original_audio = original_audio / max_val
    print("Audio originale (caricato da file):")
    display(Audio(original_audio, rate=CONFIG["sample_rate"]))


### Esempio Gruppo A

In [None]:
idx = 4
show_example_spectrogram(train_dataset_A, idx=idx)
reconstruct_audio(train_dataset_A, idx=idx, n_iter=64)

### Esempio Gruppo B

In [None]:
idx = 4
show_example_spectrogram(train_dataset_B, idx=idx)
reconstruct_audio(train_dataset_B, idx=idx, n_iter=64)

# Dataloader

Dopo la definizione dei dataset per i gruppi $A$ e $B$, definiamo ora i corrispondenti `DataLoader`, che gestiscono il caricamento e la preparazione dei batch per il training e la validazione.

---

Settiamo i parametri generali con `LOADER_CONFIG`:

* `batch_size` = $16$, è il numero di esempi per batch.

* `num_workers` = $0$, lo impostiamo a $0$ per compatibilità con Colab evitando di saturare la RAM.

* `pin_memory` = $True$, blocca la memoria dei batch in RAM, in modo da trasferirli più velocemente sulla GPU.

* `persistent_workers` = $False$, evita che i processi di caricamento restino attivi dopo ogni epoca.







In [None]:
# Parametri generali
LOADER_CONFIG = {
    "batch_size": 16,
    "num_workers": 0,
    "pin_memory": True,
    "persistent_workers": False,
}

## Dataloader Gruppo A

In [None]:
train_loader_A = DataLoader(
    train_dataset_A,
    batch_size=LOADER_CONFIG["batch_size"],
    shuffle=True, # Mescola i campioni ad ogni epoca
    num_workers=LOADER_CONFIG["num_workers"],
    pin_memory=LOADER_CONFIG["pin_memory"],
    persistent_workers=LOADER_CONFIG["persistent_workers"]
)

val_loader_A = DataLoader(
    val_dataset_A,
    batch_size=LOADER_CONFIG["batch_size"],
    shuffle=False, # non mescola in modo da vere una validazione deterministica
    num_workers=LOADER_CONFIG["num_workers"],
    pin_memory=LOADER_CONFIG["pin_memory"],
    persistent_workers=LOADER_CONFIG["persistent_workers"]
)

test_loader_A = DataLoader(
    test_dataset_A,
    batch_size=1, # analizza un file per volta
    shuffle=False,
    num_workers=0,
    pin_memory=True
)

print(f"Train batches: {len(train_loader_A)}, Val batches: {len(val_loader_A)}, Test batches: {len(test_loader_A)}")

## Dataloader Gruppo B

In [None]:
train_loader_B = DataLoader(
    train_dataset_B,
    batch_size=LOADER_CONFIG["batch_size"],
    shuffle=True,
    num_workers=LOADER_CONFIG["num_workers"],
    pin_memory=LOADER_CONFIG["pin_memory"],
    persistent_workers=LOADER_CONFIG["persistent_workers"]
)

val_loader_B = DataLoader(
    val_dataset_B,
    batch_size=LOADER_CONFIG["batch_size"],
    shuffle=False,
    num_workers=LOADER_CONFIG["num_workers"],
    pin_memory=LOADER_CONFIG["pin_memory"],
    persistent_workers=LOADER_CONFIG["persistent_workers"]
)

test_loader_B = DataLoader(
    test_dataset_B,
    batch_size=1,
    shuffle=False,
    num_workers=0,
    pin_memory=True
)

print(f"Train batches: {len(train_loader_B)}, Val batches: {len(val_loader_B)}, Test batches: {len(test_loader_B)}")

# Modello

Il mio modello prende in input uno spettrogramma normalizzato in $[0,1]$ (lineare o Mel, 1 canale) e produce uno spettrogramma "enhanced" della stessa forma.
Segue lo stile di una U-Net, e integra un contesto multi-scala.

---

La funzione `crop_to_match` serve per fare combaciare i due tensori che prende in input. Serve perchè le operazioni di downsampling e upsampling possono generare piccole differenze di pixel, e prima di concatenare le skip connections occorre che le dimensioni siano uguali.

---

La calsse `MultiScaleCNNBlock` applica tre convoluzioni 2d in parallelo con kernel $3 \times 3$, $5 \times 5$, $7 \times 7$. Successivamente fa una fusione con conv $1 \times 1$ e poi facoltativamente applica ReLU.

Tutto questo viene fatto perchè gli artefatti possono essere locali o più larghi.

---

La classe `ResidualBlock` applica due conv $3 \times 3$ con BatchNorm e skip residual, e infine applica ReLU. Questo viene fatto per preservare informazioni ed evitare vanishing gradient.

---

La classe `UpBlock` fa upsampling bilineare seguita da conv $3 \times 3$, BatchNorm e infine ReLU.

---

La classe `SpctralEnhancementNet` è composta da tre parti principali: Encoder, Bottleneck e Decoder.


L'**Encoder** riduce progressivamente la risoluzione spaziale dello spettrogramma. Ogni blocco MultiScakeCNNBlock estrae feature locali e globali come detto precedentemente. Dopo ogni blocco, una convoluzione con stride $2$ dimezza le simesioni dello spettro, comprimendo l'informazione e ampliando il campo percettivo. Qui la rete impara a vedere il quadro complessivo del segnale.


Nella parte centrale, **Bottleneck**, tre blocchi residiali lavorano mantenendo la profondità costante.
Questi blocchi permettono di rielaborare le caratteristiche apprese mantenendo l’informazione originale, grazie alle skip connection tra input e output del blocco.
Questo meccanismo stabilizza l’addestramento e migliora la capacità della rete di affinare dettagli senza distruggere la struttura armonica del segnale.
Viene applicato anche un Dropout2D per ridurre l’overfitting.


Il **Decoder** ricostruisce progressivamente la risoluzione originale dello spettrogramma, invertendo il processo di compressione.
Ogni livello effettua un upsampling bilineare seguito da una convoluzione $3 \times 3$ e da una fusione con le feature corrispondenti dell’encoder.
Le skip connections collegano i livelli simmetrici dell’encoder e del decoder, in modo che la rete possa recuperare dettagli fini persi durante il downsampling.
Dopo la concatenazione, un nuovo MultiScaleCNNBlock elabora le informazioni combinate, permettendo alla rete di fondere dettagli locali e contesto globale.


Infine, un’uscita convoluzionale $1 \times 1$ riduce i canali a uno solo, producendo il mel-spettrogramma enhanced.
L’attivazione finale è lineare, il clamp a $[0, 1]$ viene applicato solo durante il training e il test per coerenza con la normalizzazione dei dati.



In [None]:
def crop_to_match(source, target):

    _, _, h, w = source.shape
    _, _, h_t, w_t = target.shape

    # Se le dimensioni non coincidono, le ritaglia a quella minima
    if h != h_t or w != w_t:
        h_min = min(h, h_t)
        w_min = min(w, w_t)
        source = source[:, :, :h_min, :w_min]
        target = target[:, :, :h_min, :w_min]
    return source, target

In [None]:
class MultiScaleCNNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, last_relu=True):
        super().__init__()

        # Suddivide i canali di outout tra i tre rami
        base = out_channels // 3
        rem = out_channels - 3 * base
        ch1, ch2, ch3 = base, base, base + rem  # branch3 assorbe il resto

        # Ramo 1 kernel 3x3
        self.branch1 = nn.Sequential(
            nn.Conv2d(in_channels, ch1, kernel_size=3, padding=1, padding_mode='reflect'),
            nn.BatchNorm2d(ch1),
            nn.ReLU(inplace=True),
        )

        # Ramo 2 kernel 5x5
        self.branch2 = nn.Sequential(
            nn.Conv2d(in_channels, ch2, kernel_size=5, padding=2, padding_mode='reflect'),
            nn.BatchNorm2d(ch2),
            nn.ReLU(inplace=True),
        )

        # Ramo 3 kernel 7x7
        self.branch3 = nn.Sequential(
            nn.Conv2d(in_channels, ch3, kernel_size=7, padding=3, padding_mode='reflect'),
            nn.BatchNorm2d(ch3),
            nn.ReLU(inplace=True),
        )

        # Dopo concatenazione riduce i canali con con 1x1
        fused_in = ch1 + ch2 + ch3
        layers = [nn.Conv2d(fused_in, out_channels, kernel_size=1)]
        if last_relu:
            layers.append(nn.ReLU(inplace=True))
        self.fusion = nn.Sequential(*layers)

    def forward(self, x):

        # Calcola le tre rappresentazioni parallele
        b1 = self.branch1(x)
        b2 = self.branch2(x)
        b3 = self.branch3(x)

        # Concatena lungo la dimensione dei canali
        out = torch.cat([b1, b2, b3], dim=1)

        # Fusione e riduzione dei canali
        return self.fusion(out)

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, channels):
        super().__init__()
        self.conv1 = nn.Conv2d(channels, channels, 3, padding=1, padding_mode='reflect')
        self.bn1 = nn.BatchNorm2d(channels)
        self.conv2 = nn.Conv2d(channels, channels, 3, padding=1, padding_mode='reflect')
        self.bn2 = nn.BatchNorm2d(channels)

    def forward(self, x):
        residual = x # Skip Connection
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out = out + residual # Somma residua
        return F.relu(out)

In [None]:
class UpBlock(nn.Module):

    def __init__(self, in_ch, out_ch):
        super().__init__()
        self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False)
        self.conv = nn.Conv2d(in_ch, out_ch, kernel_size=3, padding=1, padding_mode='reflect')
        self.bn = nn.BatchNorm2d(out_ch)
        self.act = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.up(x) # Upsampling (fa interpolazione bilineare)
        x = self.conv(x)
        x = self.bn(x)
        x = self.act(x)
        return x

In [None]:
class SpectralEnhancementNet(nn.Module):
    def __init__(self):
        super().__init__()

        # Encoder --> estrae feature
        self.enc1 = MultiScaleCNNBlock(1, 32)
        self.down1 = nn.Conv2d(32, 32, kernel_size=3, stride=2, padding=1, padding_mode='reflect')

        self.enc2 = MultiScaleCNNBlock(32, 64)
        self.down2 = nn.Conv2d(64, 64, kernel_size=3, stride=2, padding=1, padding_mode='reflect')

        self.enc3 = MultiScaleCNNBlock(64, 128)
        self.down3 = nn.Conv2d(128, 128, kernel_size=3, stride=2, padding=1, padding_mode='reflect')

        # Bottleneck --> 3 blocchi residuali
        self.res_blocks = nn.Sequential(
            ResidualBlock(128),
            ResidualBlock(128),
            ResidualBlock(128),
        )

        # Decoder --> upsampling progressivo e concatenazione con feture encoder
        self.up3 = UpBlock(128, 64)
        self.dec3 = MultiScaleCNNBlock(64 + 128, 64)

        self.up2 = UpBlock(64, 32)
        self.dec2 = MultiScaleCNNBlock(32 + 64, 32)

        self.up1 = UpBlock(32, 16)
        self.dec1 = MultiScaleCNNBlock(16 + 32, 16, last_relu=False)

        # Output --> spettrogramma migliorato (un canale)
        self.out = nn.Conv2d(16, 1, kernel_size=1)
        self.dropout = nn.Dropout2d(p=0.1) # Regolarizzazione nel bottleneck

    def forward(self, x):

        # Encoder
        e1 = self.enc1(x)
        e1d = self.down1(e1)
        e2 = self.enc2(e1d)
        e2d = self.down2(e2)
        e3 = self.enc3(e2d)
        e3d = self.down3(e3)

        # Bottleneck
        b = self.res_blocks(e3d)
        b = self.dropout(b)

        # Decoder
        d3 = self.up3(b)
        d3, e3 = crop_to_match(d3, e3)
        d3 = torch.cat([d3, e3], dim=1)
        d3 = self.dec3(d3)

        d2 = self.up2(d3)
        d2, e2 = crop_to_match(d2, e2)
        d2 = torch.cat([d2, e2], dim=1)
        d2 = self.dec2(d2)

        d1 = self.up1(d2)
        d1, e1 = crop_to_match(d1, e1)
        d1 = torch.cat([d1, e1], dim=1)
        d1 = self.dec1(d1)

        # Output
        enhanced = self.out(d1)
        return enhanced

# Loss

La loss che utilizzo nel mio modello
`HybridLoss` combina diverse componenti, ognuna della quali si occupa di un aspetto diverso della qualità audio.

La loss totale è una somma pesata dei vari termini:

$$L_{tot} = w_c \cdot L_{charb} + w_{hf} \cdot L_{hf} + w_{dnh} \cdot L_{dnh} + w_{f} \cdot L_{flat} + w_{bw} \cdot L_{bw} + w_e \cdot L_{energy} $$

Dove:

* **Charbonnier Loss**: penalizza le differenze punto per punto tra lo spettrogramma predetto ($x$) e il target ($y$). È robusta agli outliers.

$$L_{charb} = \frac{1}{N} \sum_{f,t}
\sqrt{(x_{f,t} - y_{f,t})^2 + \varepsilon^2}$$

* **High Frequency Loss**: enfatizza le regioni ad alta frequenza e ad alta energia, poichè mi sono accorta che il modello tendeva a smussare nella zona alta.

$$L_{hf} = \frac{1}{N} \sum_{f,t}
w(f) \cdot m_{f,t} \, |x_{f,t} - y_{f,t}|$$

* **Do No Harm Loss**: scoraggia la rete a modificare le zone già "buone".

$$L_{dnh} = \frac{1}{N} \sum_{f,t}
(1 - n_{f,t}) \, |x_{f,t} - d_{f,t}| $$

* **Spectral Flatness e Tonal Penalty Loss**: mantiene la coerenza tonale, evitando che l'output risulti "metallico" o troppo "piatto".

$$SF(S) = \frac{\exp \left( \frac{1}{T} \sum_{t} \log(S_{f,t} + \varepsilon) \right)}
{\frac{1}{T} \sum_{t} (S_{f,t} + \varepsilon)} $$

$$L_{flat} = \frac{1}{F} \sum_{f}
|SF(x_f) - SF(y_f)| $$

* **Band Weighted L1 Loss**: rafforza la coerenza nelle bande di frequenza più importanti per l'udito umano.

$$L_{bw} = \frac{1}{N} \sum_{f,t}
w(f) \, |x_{f,t} - y_{f,t}| $$

* **Energy Consistency Loss**: preserva il volume e il bilanciamento energetico del segnale.

$$L_{energy} = \frac{1}{B} \sum_{b=1}^{B}
\left| E(x_b) - E(y_b) \right|
\quad \text{con} \quad
E(S) = \frac{1}{F T} \sum_{f,t} S_{f,t}^2 $$








In [None]:
class HybridLoss(nn.Module):
    def __init__(self, weights=None, eps=1e-6, flat_win=9):

        super().__init__()
        self.eps = eps
        self.flat_win = flat_win

        # Pesi bilanciati per enfatizzare alte frequenze e fedeltà strutturale
        self.w = weights or {
            "charb": 1.0,
            "hf": 0.4,
            "dnh": 0.25,
            "flat":0.05,
            "bw": 0.8,
            "energy": 0.1
        }


        if weights is not None:
            self.w.update(weights)

    # Charbonnier loss
    def charbonnier(self, x, y):
        diff = x - y
        return torch.mean(torch.sqrt(diff*diff + self.eps*self.eps))


    # High-frequency
    def hf_guided(self, pred, target):
        B, C, F, T = pred.shape
        # peso crescente lungo la frequenza
        w_f = torch.linspace(0.8, 1.2, F, device=pred.device).view(1,1,F,1)  # [1,1,F,1]
        # Normalizzazione del target per generare una maschera percettiva
        t_norm = (target - target.min(dim=2, keepdim=True)[0].min(dim=3, keepdim=True)[0]) \
                 / (target.max(dim=2, keepdim=True)[0].max(dim=3, keepdim=True)[0] -
                    target.min(dim=2, keepdim=True)[0].min(dim=3, keepdim=True)[0] + self.eps)
        m = torch.clamp(t_norm, 0.0, 1.0)  # [B,1,F,T]
        return torch.mean(w_f * m * torch.abs(pred - target))

    # Do-no-harm
    def do_no_harm(self, pred, target, degraded, tau=0.05):
        # Calcola dove degraded è già simile al target
        need = torch.abs(target - degraded)
        need = need / (need.amax(dim=(2, 3), keepdim=True) + self.eps)
        mask_no_need = 1.0 - need
        # Penalizza modifiche inutili per evitare di peggiorare cose buone
        return torch.mean(mask_no_need * torch.abs(pred - degraded))


    # Spectral flatness penalty
    def spectral_flatness(self, S):
        # Evita log(0)
        S = torch.clamp(S, min=self.eps)
        # Media aritmetica e geometrica lungo il tempo
        am = S.mean(dim=3)
        gm = torch.exp((torch.log(S)).mean(dim=3))
        sf = gm / (am + self.eps)
        return sf

    def tonal_penalty(self, pred, target):
        # Differenza di flatness tra predizione e target
        sf_p = self.spectral_flatness(pred)
        sf_t = self.spectral_flatness(target)
        return torch.mean(torch.abs(sf_p - sf_t))


    # Band-weighted L1
    def band_weighted_l1(self, pred, target):
        B, C, F, T = pred.shape
        # Curva sinusoidale
        weights = 1.0 + 0.3 * torch.sin(torch.linspace(0, 3.14, F, device=pred.device))
        weights = weights.view(1, 1, F, 1)
        return torch.mean(weights * torch.abs(pred - target))


    # Energy consistency
    def energy_consistency(self, pred, target):
        # Energia media per spettrogramma
        e_pred = torch.mean(pred ** 2, dim=(2,3))
        e_tgt  = torch.mean(target ** 2, dim=(2,3))
        return torch.mean(torch.abs(e_pred - e_tgt))


    # Forward
    def forward(self, enhanced, target, degraded=None, return_components=False):
        # Componenti principali della loss
        charb = self.charbonnier(enhanced, target)
        hf = self.hf_guided(enhanced, target)
        bw = self.band_weighted_l1(enhanced, target)
        energy = self.energy_consistency(enhanced, target)

        # Somma pesata delle componenti principali
        total = (
            self.w["charb"] * charb +
            self.w["bw"] * bw +
            self.w["hf"] * hf +
            self.w["energy"] * energy
        )

        # Componenti opzionali
        dnh = None
        if degraded is not None and self.w["dnh"] > 0:
            dnh = self.do_no_harm(enhanced, target, degraded)
            total = total + self.w["dnh"] * dnh

        flat = None
        if self.w["flat"] > 0:
            flat = self.tonal_penalty(enhanced, target)
            total = total + self.w["flat"] * flat

        # Restituisce anche le singole componenti
        if return_components:
            out = {"total": total, "charb": charb, "bw": bw, "hf": hf, "energy": energy}
            if dnh is not None:  out["dnh"]  = dnh
            if flat is not None: out["flat"] = flat
            return out
        else:
            return total


# Training

## Metriche

Le metriche permettono di valutare quanto bene il modello sta facendo.
Ho utilizzato quattro metriche, ognuna che guarda un aspetto diverso del segnale.

---

**Mean Squared Error**

Misura la distanza quadratica media tra lo spettrogramma predetto $x$ e lo spettrogramma target $y$.

Valori più bassi indicano una ricostruzione più fedele.

$$MSE = \frac{1}{N} \sum_{f, t}
(x_{f,t} - y_{f,t})^2 $$


---

**L1, Mean Absolute Error**

Indica quanto, in media, ogni punto delle spettrogramma differisce dal corrispondente valore del target. Permette di controllare la precisione media.

Valori minori indicano una migliore ricostruzione.

$$L1 = \frac{1}{N} \sum_{f, t}|x_{f,t} - y_{f,t}|$$


---

**Cosine Similarity**

Misura l'orientamento tra i vettori. Vale:

* $1$ se i due spettrogrammi hanno la stessa forma spettrale, anche se con differente scala;
* $0$ se sono ortogonali;
* $-1$ se sono opposti.

Valori vicino a $1$ indicano una forte similarità strutturale.


$$ CosSim = \frac {\mathbf{x} \cdot \mathbf{y}}
{\|\mathbf{x}\|_2 \, \|\mathbf{y}\|_2}
= \frac{\sum_{i=1}^{N} x_i y_i}
{\sqrt{\sum_{i=1}^{N} x_i^2} \, \sqrt{\sum_{i=1}^{N} y_i^2}}
$$





In [None]:
def mse_spec(pred, target):
    return torch.mean((pred - target) ** 2).item()


def l1_spec(pred, target):
    return torch.mean(torch.abs(pred - target)).item()


def cosine_spec(pred, target):
    pred_f = pred.flatten(start_dim=1).float()
    target_f = target.flatten(start_dim=1).float()

    # Centra i vettori (rimuove il bias positivo da [0,1])
    pred_f = pred_f - pred_f.mean(dim=1, keepdim=True)
    target_f = target_f - target_f.mean(dim=1, keepdim=True)

    cos = F.cosine_similarity(pred_f, target_f, dim=1)
    cos = torch.clamp(cos, -1.0, 1.0)
    return torch.clamp(torch.mean(cos), -1.0, 1.0).item()



## Funzioni Ausiliarie

Durante l'elaborazione degli spettrogrammi lineari, è possibile che l'output prodotto dalla rete abbia dimensioni leggermente diverse rispetto al target. Queste discrepanze sono causate dalle convoluzioni, downsampling e upsampling, che possono modificare l'altezza o la larghezza del tensore.

Per evitare errori quando calcoliamo la loss e le metriche, è necessario che la predizione del modello e il target abbiamo esattamente la stessa forma.

La funzione `match_shape` uniforma le dimensioni dei due tensori. Confronta altezza e larghezza della predizione e del target. Se la dimensione della predizione è maggiore la taglia (crop), se è minore la riempie (pad) con zeri fino ad uguagliarla.





In [None]:
def match_shape(pred, target):
    # Estrae le dimensioni di pred e target
    _, _, h_p, w_p = pred.shape
    _, _, h_t, w_t = target.shape

    # Allinea l'altezza (frequenze)
    if h_p > h_t:
        pred = pred[:, :, :h_t, :]
    elif h_p < h_t:
        pred = F.pad(pred, (0, 0, 0, h_t - h_p))

    # Allinea la larghezza (tempo)
    if w_p > w_t:
        pred = pred[:, :, :, :w_t]
    elif w_p < w_t:
        pred = F.pad(pred, (0, w_t - w_p, 0, 0))

    return pred, target


## Training Loop

La funzione `train_spectral_model` implementa l’intero ciclo di addestramento del modello basato su spettrogrammi lineari.

L’obiettivo è ottimizzare il modello nel ripristino dello spettro pulito a partire da versioni degradate, utilizzando un insieme di metriche quantitative (MSE, L1, COS, RMSE).

La funzione è divisa in varie sezioni:

1. **Inizializzazione**: mandiamo il modello sul dispositivo (`cuda` se disponibile altrimenti `cpu`). Creiamo un `GradScaler` per utilizzare mixed precision training, e inizializzaimo le variabili che conterranno le varie train loss, val loss e i valori delle metriche.

2. **Loop per ogni epoca**: in ogni  epoca distinguiamo due fasi:
    * **Training**: mettiamo il modello in modalità `train`. Per ogni batch calcoliamo la predizione (`enhanced`), la loss e aggiornaimo i pesi del modello.
    * **Validation**: mettiamo il modello in modalità `eval`, calcoliamo la loss e le metriche. Confrontiamo i risultati tra `degraded` ed `enhanced` per valutare il miglioramento medio (`delta`).

3. **Aggiormaneto dello scheduler**: se è definito un learning rate scheduler lo aggiorniamo in base alla validation loss.

4. **Early Stopping e salvataggio Checkpoint**: se la `val_loss` migliora il modello viene salvato. Se il modello non migliora per `patience` epoche consecutive l'addestramenti si interrompe.

5. **Visualizzazione plot**: se `show_example=True` vengono mostrati gli spettrogrammi clean, degraded ed enhanced. Se `show_metrics=True` vengono mostrati i valori delle metriche per valutare i progressi.


In [None]:
def train_spectral_model(
    model,
    train_dl,
    val_dl,
    loss_fn,
    optimizer,
    scheduler=None,
    num_epochs=20,
    patience=5,
    device="cuda",
    save_path="/content/checkpoint_best.pt",
    show_example=False, # Per mostrare gli spettrogrammi
    show_metrics=True, # Per mostrare le metriche
    use_amp=True,
    log_loss_components=False # Per mostrare i valori delle varie componenti della loss
):
    model = model.to(device)
    scaler = amp.GradScaler('cuda', enabled=use_amp)

    best_val_loss = float("inf")
    best_epoch = 0
    patience_counter = 0

    train_losses, val_losses, val_metrics_list = [], [], []

    for epoch in range(1, num_epochs + 1):
        print(f"\n----- Epoch {epoch}/{num_epochs} -----")


        # TRAINING
        model.train()
        train_loss = 0.0
        train_bar = tqdm(train_dl, desc=f"Train {epoch}", leave=False)

        for spec_degraded, spec_clean in train_bar:
            # Sposta su device
            spec_degraded = spec_degraded.to(device, non_blocking=True)
            spec_clean = spec_clean.to(device, non_blocking=True)

            optimizer.zero_grad(set_to_none=True)

            # Forward
            with amp.autocast(device_type='cuda', dtype=torch.float16, enabled=use_amp):
                # Predice enhanced
                enhanced = model(spec_degraded)
                enhanced = torch.clamp(enhanced, 0.0, 1.0)

                # Allinea le dimensioni
                enhanced, spec_clean = crop_to_match(enhanced, spec_clean)
                spec_degraded, _ = crop_to_match(spec_degraded, spec_clean)

                # Normalizzazione
                ref = spec_clean.max()
                ref = torch.clamp(ref, min=1e-8)  # sicurezza
                spec_clean_n = spec_clean / ref
                spec_degraded_n = spec_degraded / ref
                enhanced_n = enhanced / ref

                loss = loss_fn(enhanced_n, spec_clean_n, spec_degraded_n)

            # Controlla stabilità numerica
            if torch.isnan(loss) or torch.isinf(loss):
                print(" NaN/Inf rilevato — training interrotto.")
                return model, train_losses, val_losses, val_metrics_list

            # Backpropagation
            scaler.scale(loss).backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            scaler.step(optimizer)
            scaler.update()

            train_loss += loss.detach().item()
            train_bar.set_postfix({"train_loss": f"{loss.item():.4f}"})

        train_loss /= max(1, len(train_dl))
        train_losses.append(train_loss)


        # VALIDATION
        model.eval()
        val_loss = 0.0
        metrics_sum = {"MSE": 0, "L1": 0, "COS": 0}
        n_batches = 0
        do_no_harm_ratios = []  # Debug

        val_bar = tqdm(val_dl, desc=f"Val {epoch}", leave=False)
        with torch.no_grad():
            delta_sum = {"MSE": 0, "L1": 0, "COS": 0}

            for spec_degraded, spec_clean in val_bar:
                spec_degraded = spec_degraded.to(device, non_blocking=True)
                spec_clean = spec_clean.to(device, non_blocking=True)

                with amp.autocast(device_type='cuda', dtype=torch.float16, enabled=use_amp):
                    enhanced = model(spec_degraded)
                    enhanced = torch.clamp(enhanced, 0.0, 1.0)

                    enhanced, spec_clean = crop_to_match(enhanced, spec_clean)
                    spec_degraded, _ = crop_to_match(spec_degraded, spec_clean)

                    # Normalizzazione
                    ref = spec_clean.max()
                    ref = torch.clamp(ref, min=1e-8)
                    spec_clean_n = spec_clean / ref
                    spec_degraded_n = spec_degraded / ref
                    enhanced_n = enhanced / ref

                    if log_loss_components:
                        out = loss_fn(enhanced_n, spec_clean_n, degraded=spec_degraded_n, return_components=True)
                        loss = out["total"]
                        comp = {k: v.item() if torch.is_tensor(v) else v for k, v in out.items()}
                    else:
                        loss = loss_fn(enhanced_n, spec_clean_n, spec_degraded_n)
                        comp = None

                val_loss += loss.item()

                # Calcola le metriche
                restored_metrics = {
                    "MSE": mse_spec(enhanced_n.cpu(), spec_clean_n.cpu()),
                    "L1": l1_spec(enhanced_n.cpu(), spec_clean_n.cpu()),
                    "COS": cosine_spec(enhanced_n.cpu(), spec_clean_n.cpu())
                }

                baseline_metrics = {
                    "MSE": mse_spec(spec_degraded_n.cpu(), spec_clean_n.cpu()),
                    "L1": l1_spec(spec_degraded_n.cpu(), spec_clean_n.cpu()),
                    "COS": cosine_spec(spec_degraded_n.cpu(), spec_clean_n.cpu())
                }

                # Calcola il delta Δ
                delta_metrics = {k: baseline_metrics[k] - restored_metrics[k] for k in restored_metrics}


                for k in metrics_sum.keys():
                    metrics_sum[k] += restored_metrics[k]
                    delta_sum[k] += delta_metrics[k]


                n_batches += 1
                val_bar.set_postfix({"val_loss": f"{loss.item():.4f}"})

        val_loss /= n_batches
        metrics_avg = {k: v / n_batches for k, v in metrics_sum.items()}
        avg_delta = {k: delta_sum[k] / n_batches for k in delta_sum.keys()}

        print(f"\nΔ delta medio per epoca — "
              f"MSE:{avg_delta['MSE']:+.5f} | "
              f"L1:{avg_delta['L1']:+.5f} | "
              f"COS:{avg_delta['COS']:+.5f}" )


        val_losses.append(val_loss)
        val_metrics_list.append(metrics_avg)

        if scheduler is not None:
            scheduler.step(val_loss)


        # EARLY STOPPING + CHECKPOINT
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_epoch = epoch
            patience_counter = 0

            torch.save({
                "epoch": epoch,
                "model_state_dict": model.state_dict(),
                "optimizer_state_dict": optimizer.state_dict(),
                "scheduler_state_dict": scheduler.state_dict() if scheduler else None,
                "train_loss": train_loss,
                "val_loss": val_loss,
                "metrics": metrics_avg,
            }, save_path)
            print(f"Nuovo best model salvato (val_loss={val_loss:.4f})")
        else:
            patience_counter += 1
            print(f"Nessun miglioramento ({patience_counter}/{patience})")

        if patience_counter >= patience:
            print(f"Early stopping attivato alla epoca {epoch}")
            break


        # VISUALIZZAZIONE ESEMPIO
        if show_example:
            spec_degraded, spec_clean = next(iter(val_dl))
            spec_degraded, spec_clean = spec_degraded.to(device), spec_clean.to(device)

            with torch.no_grad():
                enhanced = model(spec_degraded)
                enhanced = torch.clamp(enhanced, 0.0, 1.0)  # clamping anche qui


            # Normalizzazione
            ref = spec_clean.max()
            ref = torch.clamp(ref, min=1e-8)
            spec_clean_n = spec_clean / ref
            spec_degraded_n = spec_degraded / ref
            enhanced_n = enhanced / ref

            idx = 0
            plt.figure(figsize=(14, 8))

            plt.subplot(3, 1, 1)
            im1 = plt.imshow(spec_clean_n[idx, 0].cpu().numpy() ** 0.5,
                            origin='lower', aspect='auto', cmap='viridis')
            plt.title("Clean (Target)")
            plt.colorbar(im1, fraction=0.046, pad=0.04)

            plt.subplot(3, 1, 2)
            im2 = plt.imshow(spec_degraded_n[idx, 0].cpu().numpy() ** 0.5,
                            origin='lower', aspect='auto', cmap='viridis')
            plt.title("Degraded (Input)")
            plt.colorbar(im2, fraction=0.046, pad=0.04)

            plt.subplot(3, 1, 3)
            im3 = plt.imshow(enhanced_n[idx, 0].cpu().numpy() ** 0.5,
                            origin='lower', aspect='auto', cmap='viridis',
                            vmin=0, vmax=1)  # Forza range visivo coerente
            plt.title("Enhanced (Output)")
            plt.colorbar(im3, fraction=0.046, pad=0.04)
            plt.tight_layout()
            plt.show()



        # LOGGING
        if show_metrics:
            print(f"\nValidation Summary (Epoch {epoch}) ")
            print(f"Val Loss: {val_loss:.4f}")
            print(f"MSE:{metrics_avg['MSE']:.5f} | L1:{metrics_avg['L1']:.5f} | COS:{metrics_avg['COS']:.5f} ")

            if log_loss_components and comp is not None:
                print(f"Loss components --> Total:{comp['total']:.4f} | "
                      f"charb:{comp['charb']:.4f} | hf:{comp['hf']:.4f} | "
                      f"bw:{comp['bw']:.4f} | energy:{comp['energy']:.4f} |"
                      f"dnh:{comp['dnh']:.4f} | flat:{comp['flat']:.4f} "
                )
            print("-------------------------------------------")

        print(f"Epoch {epoch} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

    print(f"\nTraining completato — Best epoch: {best_epoch} | Best val_loss={best_val_loss:.4f}")
    return model, train_losses, val_losses, val_metrics_list


> **ATTENZIONE!!!** Se la variabile `skip_training` all'interno del dizionario `CONFIG` è stata impostata a `False` i pesi scaricati dalla repo github verranno sovrascritti.

### Training Gruppo A

In [None]:
# Inizializziamo il modello
model_A = SpectralEnhancementNet().to(device)

loss_fn = HybridLoss()

optimizer = torch.optim.AdamW(
    model_A.parameters(),
    lr=1e-4,
    betas=(0.9, 0.98),
    weight_decay=0.01
)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode="min",
    factor=0.5,
    patience=3,
    min_lr=5e-6
)

path_gruppoA = "/content/projectID2ML/modello_lineare/checkpoint_best_speclin_gruppoA.pt"


if CONFIG["skip_training"] == True and os.path.exists(path_gruppoA):
    checkpoint = torch.load(path_gruppoA, map_location=device)
    model_A.load_state_dict(checkpoint["model_state_dict"])
    print(f"\nTraining saltato, caricamento checkpoint da: {path_gruppoA}")

elif CONFIG["skip_training"] == True:
    print(f"\nRichiesto skip training ma il checkpoint {path_gruppoA} non esiste. Avvio del training.")
    model_A, train_losses_A, val_losses_A, val_metrics_A = train_spectral_model(
        model=model_A,
        train_dl=train_loader_A,
        val_dl=val_loader_A,
        loss_fn=loss_fn,
        optimizer=optimizer,
        scheduler=scheduler,
        num_epochs=25,
        patience=5,
        device=device,
        save_path="/content/projectID2ML/modello_lineare/checkpoint_best_speclin_gruppoA.pt",
        show_example=True, # Visualizza uno spettrogramma per epoca
        show_metrics=True, # Mostra le metriche
        use_amp=True, # Mixed precision
        log_loss_components=True # Stampa breakdown della loss nel validation
    )

else:
    model_A, train_losses_A, val_losses_A, val_metrics_A = train_spectral_model(
        model=model_A,
        train_dl=train_loader_A,
        val_dl=val_loader_A,
        loss_fn=loss_fn,
        optimizer=optimizer,
        scheduler=scheduler,
        num_epochs=25,
        patience=5,
        device=device,
        save_path="/content/projectID2ML/modello_lineare/checkpoint_best_speclin_gruppoA.pt",
        show_example=True, # Visualizza uno spettrogramma per epoca
        show_metrics=True, # Mostra le metriche
        use_amp=True, # Mixed precision
        log_loss_components=True # Stampa breakdown della loss nel validation
    )

### Training Gruppo B

In [None]:
# Inizializziamo il modello
model_B = SpectralEnhancementNet().to(device)

loss_fn = HybridLoss()

optimizer = torch.optim.AdamW(
    model_B.parameters(),
    lr=1e-4,
    betas=(0.9, 0.98),
    weight_decay=0.01
)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode="min",
    factor=0.5,
    patience=3,
    min_lr=5e-6
)

path_gruppoB = "/content/projectID2ML/modello_lineare/checkpoint_best_speclin_gruppoB.pt"


if CONFIG["skip_training"] == True and os.path.exists(path_gruppoB):
    checkpoint = torch.load(path_gruppoB, map_location=device)
    model_B.load_state_dict(checkpoint["model_state_dict"])
    print(f"\nTraining saltato, caricamento checkpoint da: {path_gruppoB}")

elif CONFIG["skip_training"] == True:
    print(f"\nRichiesto skip training ma il checkpoint {path_gruppoB} non esiste. Avvio del training.")

    model_B, train_losses_B, val_losses_B, val_metrics_B = train_spectral_model(
        model=model_B,
        train_dl=train_loader_B,
        val_dl=val_loader_B,
        loss_fn=loss_fn,
        optimizer=optimizer,
        scheduler=scheduler,
        num_epochs=25,
        patience=5,
        device=device,
        save_path="/content/projectID2ML/modello_lineare/checkpoint_best_speclin_gruppoB.pt",
        show_example=True, # Visualizza uno spettrogramma per epoca
        show_metrics=True, # Mostra le metriche
        use_amp=True, # Mixed precision
        log_loss_components=True # Stampa breakdown della loss nel validation
    )


else:
    model_B, train_losses_B, val_losses_B, val_metrics_B = train_spectral_model(
      model=model_B,
      train_dl=train_loader_B,
      val_dl=val_loader_B,
      loss_fn=loss_fn,
      optimizer=optimizer,
      scheduler=scheduler,
      num_epochs=25,
      patience=5,
      device=device,
      save_path="/content/projectID2ML/modello_lineare/checkpoint_best_speclin_gruppoB.pt",
      show_example=True, # Visualizza uno spettrogramma per epoca
      show_metrics=True, # Mostra le metriche
      use_amp=True, # Mixed precision
      log_loss_components=True # Stampa breakdown della loss nel validation
  )

# Test

Durante la fase di Test e valutazione finale dobbiamo misurare quanto bene il modello ha ricostruito gli spettrogrammi.



## Funzioni Ausiliarie

La funzione `spec_to_audio` ricostruisce un segnale audio a partire da uno spettrogramma normalizzato nel range $[0, 1]$.

1. Converte i valori da [0,1] --> dB --> ampiezza lineare.

2. Applica la trasformata inversa di Fourier tramite l’algoritmo di Griffin-Lim, che stima iterativamente la fase mancante dello spettro.

3. Normalizza il segnale risultante per evitare clipping.





In [None]:
# Funzione di ricostruzione audio
def spec_to_audio(spec_01, griffinlim_iters):

    # Da [0,1] --> dB --> ampiezza lineare
    S_db = spec_01 * CONFIG["top_db"] - CONFIG["top_db"]
    S_amp = librosa.db_to_amplitude(S_db)

    # Finestra identica a quella usata nello STFT originale
    win = get_window(CONFIG["window"], CONFIG["win_length"], fftbins=True)

    # Ricostruzione con Griffin-Lim
    audio = librosa.griffinlim(
        S_amp,
        n_iter=griffinlim_iters,
        hop_length=CONFIG["hop_length"],
        win_length=CONFIG["win_length"],
        window=win,
        center=CONFIG["center"]
    )

    # Normalizzazione
    audio = audio / (np.max(np.abs(audio)) + CONFIG["epsilon"])
    return audio

## Test Loop

Con la funzione `test_spectral_model` eseguiamo la valutazione finale del modello sul set di test.
L’obiettivo è misurare le prestazioni del modello addestrato confrontando gli spettrogrammi enhanced con quelli clean, e verificare quanto riesca a migliorare rispetto ai degraded.

All’inizio viene caricato il checkpoint migliore salvato durante il training.
Il modello viene impostato in modalità `eval` e viene inizializzato un `GradScaler`.
Vengono poi creati i dizionari per accumulare i valori medi di loss e metriche (MSE, L1, COS).

Per ogni batch:

1. il modello genera lo spettrogramma `enhanced` a partire da `degraded`;

2. i tensori vengono allineati e normalizzati;

3. calcoliamo la loss complessiva;

4. calcoliamo le metriche;

5. confrontiamo le metriche di degraded ed enhanced calcolando il delta (Δ) per ogni metrica.

Inoltre, per un batch selezionato, vengono visualizzati e ascoltati gli spettrogrammi clean, degraded e enhanced, ricostruiti tramite algoritmo di Griffin Lim.


In [None]:
def test_spectral_model(
    model,
    test_dl,
    loss_fn,
    checkpoint_path="/content/checkpoint_best.pt",
    device="cuda",
    griffinlim_iters=32,
    save_audio=False,
    save_dir="/content/audio_tests",
    listen_to_first=True,
    log_loss_components=False,
    use_amp=True
):


    # Caricamento checkpoint
    print(f"\nCaricamento checkpoint da: {checkpoint_path}")
    checkpoint = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(checkpoint["model_state_dict"])
    model.to(device)
    model.eval()
    print(f"Checkpoint caricato (epoch {checkpoint['epoch']}, val_loss={checkpoint['val_loss']:.4f})")


    # Inizializza metriche
    total_loss = 0.0
    metrics_sum = {"MSE": 0, "L1": 0, "COS": 0}
    baseline_sum = {"MSE": 0, "L1": 0, "COS": 0}
    delta_sum = {"MSE": 0, "L1": 0, "COS": 0}
    n_batches = 0


    # Loop di test
    print("\nInizio test su set di test...")
    test_bar = tqdm(test_dl, desc="Testing", leave=False)

    scaler = amp.GradScaler('cuda', enabled=use_amp)

    with torch.no_grad():
        for batch_idx, (spec_degraded, spec_clean) in enumerate(test_bar):
            spec_degraded = spec_degraded.to(device, non_blocking=True)
            spec_clean = spec_clean.to(device, non_blocking=True)

            # Forward
            with amp.autocast(device_type='cuda', dtype=torch.float16, enabled=use_amp):
                enhanced = model(spec_degraded)
                enhanced = torch.clamp(enhanced, 0.0, 1.0)
                enhanced, spec_clean = crop_to_match(enhanced, spec_clean)
                spec_degraded, spec_clean = crop_to_match(spec_degraded, spec_clean)

                # Normalizzazione
                ref = spec_clean.max()
                ref = torch.clamp(ref, min=1e-8)
                spec_clean_n = spec_clean / ref
                spec_degraded_n = spec_degraded / ref
                enhanced_n = enhanced / ref

                # Loss
                if log_loss_components:
                    out = loss_fn(enhanced_n, spec_clean_n, degraded=spec_degraded_n, return_components=True)
                    loss = out["total"]
                    comp = {k: v.item() for k, v in out.items()}
                else:
                    loss = loss_fn(enhanced_n, spec_clean_n, spec_degraded_n)
                    comp = None


            total_loss += loss.item()

            # Metriche restored (enhanced vs clean)
            restored_metrics = {
                "MSE": mse_spec(enhanced.cpu(), spec_clean.cpu()),
                "L1": l1_spec(enhanced.cpu(), spec_clean.cpu()),
                "COS": cosine_spec(enhanced.cpu(), spec_clean.cpu())
            }

            # Baseline (degraded vs clean)
            baseline_metrics = {
                "MSE": mse_spec(spec_degraded.cpu(), spec_clean.cpu()),
                "L1": l1_spec(spec_degraded.cpu(), spec_clean.cpu()),
                "COS": cosine_spec(spec_degraded.cpu(), spec_clean.cpu())
            }

            # Delta Δ baseline - restored
            delta_metrics = {k: baseline_metrics[k] - restored_metrics[k] for k in restored_metrics}

            for k in metrics_sum.keys():
                metrics_sum[k] += restored_metrics[k]
                baseline_sum[k] += baseline_metrics[k]
                delta_sum[k] += delta_metrics[k]

            n_batches += 1
            test_bar.set_postfix({"loss": f"{loss.item():.4f}"})

            # Visualizzazione e ascolto
            if listen_to_first and batch_idx == 103:
                degraded_np = spec_degraded.squeeze().cpu().numpy()
                clean_np = spec_clean.squeeze().cpu().numpy()
                enhanced_np = enhanced.squeeze().cpu().numpy()

                enhanced_np = np.clip(enhanced_np / (enhanced_np.max() + 1e-8), 0.0, 1.0)


                plt.figure(figsize=(12, 8))
                plt.subplot(3, 1, 1)
                plt.imshow(clean_np ** 0.5, origin='lower', aspect='auto', cmap='viridis')
                plt.title("Clean (Target)")
                plt.colorbar()

                plt.subplot(3, 1, 2)
                plt.imshow(degraded_np ** 0.5, origin='lower', aspect='auto', cmap='viridis')
                plt.title("Degraded (Input)")
                plt.colorbar()

                plt.subplot(3, 1, 3)
                plt.imshow(enhanced_np ** 0.5, origin='lower', aspect='auto', cmap='viridis')
                plt.title("Enhanced (Output)")
                plt.colorbar()
                plt.tight_layout()
                plt.show()

                # Griffin-Lim
                y_clean = spec_to_audio(clean_np, griffinlim_iters)
                y_degraded = spec_to_audio(degraded_np, griffinlim_iters)
                y_enhanced = spec_to_audio(enhanced_np, griffinlim_iters)

                print("\nClean (Target)")
                display(Audio(y_clean, rate=CONFIG["sample_rate"]))
                print("Degraded (Input)")
                display(Audio(y_degraded, rate=CONFIG["sample_rate"]))
                print("Enhanced (Output)")
                display(Audio(y_enhanced, rate=CONFIG["sample_rate"]))

                if save_audio:
                    import os, soundfile as sf
                    os.makedirs(save_dir, exist_ok=True)
                    sf.write(f"{save_dir}/clean.wav", y_clean, CONFIG["sample_rate"])
                    sf.write(f"{save_dir}/degraded.wav", y_degraded, CONFIG["sample_rate"])
                    sf.write(f"{save_dir}/enhanced.wav", y_enhanced, CONFIG["sample_rate"])
                    print(f"Audio salvato in: {save_dir}")


    # Calcolo medie e report
    avg_loss = total_loss / n_batches
    restored = {k: metrics_sum[k] / n_batches for k in metrics_sum}
    baseline = {k: baseline_sum[k] / n_batches for k in baseline_sum}
    delta = {k: delta_sum[k] / n_batches for k in delta_sum}

    print("\nRISULTATI MEDI SU TEST SET:")
    print(f"Average Test Loss: {avg_loss:.4f}")
    for k in restored.keys():
        print(f"{k}: base={baseline[k]:.5f} | restored={restored[k]:.5f} | Δ={delta[k]:+.5f}")

    if log_loss_components and comp is not None:
        print("\nUltime componenti di loss viste:")
        print(f"Total: {comp['total']:.4f} | Charb: {comp['charb']:.4f} | HF: {comp['hf']:.4f}")

    return baseline, restored, delta

## Test Loop su Modello A

### Degrdazioni Gruppo A

In [None]:
model_A = SpectralEnhancementNet().to(device)

loss_fn = HybridLoss()

baseline_AA, restored_AA, delta_AA = test_spectral_model(
    model=model_A,
    test_dl=test_loader_A,
    loss_fn=loss_fn,
    checkpoint_path="/content/projectID2ML/modello_lineare/checkpoint_best_speclin_gruppoA.pt",
    device="cuda",
    griffinlim_iters=64,
    save_audio=False
)



### Degrdazioni Gruppo B

In [None]:
model_A = SpectralEnhancementNet().to(device)

loss_fn = HybridLoss()

baseline_AB, restored_AB, delta_AB = test_spectral_model(
    model=model_A,
    test_dl=test_loader_B,
    loss_fn=loss_fn,
    checkpoint_path="/content/projectID2ML/modello_lineare/checkpoint_best_speclin_gruppoA.pt",
    device="cuda",
    griffinlim_iters=64,
    save_audio=False
)

## Test Loop su Modello B

### Degrdazioni Gruppo A

In [None]:
model_B = SpectralEnhancementNet().to(device)

loss_fn = HybridLoss()

baseline_BA, restored_BA, delta_BA = test_spectral_model(
    model=model_B,
    test_dl=test_loader_A,
    loss_fn=loss_fn,
    checkpoint_path="/content/projectID2ML/modello_lineare/checkpoint_best_speclin_gruppoB.pt",
    device="cuda",
    griffinlim_iters=64,
    save_audio=False
)


### Degrdazioni Gruppo B

In [None]:
model_B = SpectralEnhancementNet().to(device)

loss_fn = HybridLoss()

baseline_BB, restored_BB, delta_BB = test_spectral_model(
    model=model_B,
    test_dl=test_loader_B,
    loss_fn=loss_fn,
    checkpoint_path="/content/projectID2ML/modello_lineare/checkpoint_best_speclin_gruppoB.pt",
    device="cuda",
    griffinlim_iters=64,
    save_audio=False
)


# Prova su output di MusicGen

Qui ho deciso di testare il modello addestrato sul gruppo A e il modello addestrato sul gruppo B su un output di MusciGen.

In [None]:
!pip install transformers accelerate torchaudio

In [None]:
from transformers import AutoProcessor, MusicgenForConditionalGeneration

print("Generazione audio con MusicGen")
# Carica modello e processor di Meta MusicGen
device = "cuda" if torch.cuda.is_available() else "cpu"
processor = AutoProcessor.from_pretrained("facebook/musicgen-small")
musicgen = MusicgenForConditionalGeneration.from_pretrained("facebook/musicgen-small").to(device)


In [None]:
# Prompt
prompt = "jazz"

# Genera audio
inputs = processor(
    text=[prompt],
    padding=True,
    return_tensors="pt"
).to(device)

audio_values = musicgen.generate(**inputs, max_new_tokens=256)

# Salva file audio
sf.write("/content/musicgen_output.wav", audio_values[0, 0].cpu().numpy(), 32000)
print("File generato: /content/musicgen_output.wav")

display(Audio("/content/musicgen_output.wav", rate=32000))

In [None]:
def enhance_musicgen_output(
    model,
    path_audio,
    save_path="/content/enhanced_musicgen.wav",
    griffinlim_iters=64,
    device="cuda"
):

    model.eval()
    model.to(device)


    # Caricamento audio
    audio, _ = librosa.load(path_audio, sr=CONFIG["sample_rate"], mono=True)
    print(f"Audio caricato: {path_audio} — Durata: {len(audio) / CONFIG['sample_rate']:.2f}s")


    # STFT e normalizzazione
    win = scipy.signal.get_window(CONFIG["window"], CONFIG["win_length"], fftbins=True)
    S = librosa.stft(
        audio,
        n_fft=CONFIG["n_fft"],
        hop_length=CONFIG["hop_length"],
        win_length=CONFIG["win_length"],
        window=win,
        center=CONFIG["center"]
    )
    Mag = np.abs(S)

    # Conversione in dB e normalizzazione [0,1]
    S_db = librosa.amplitude_to_db(np.maximum(Mag, CONFIG["epsilon"]), ref=np.max, top_db=CONFIG["top_db"])
    S_norm = (S_db + CONFIG["top_db"]) / CONFIG["top_db"]
    S_norm = np.clip(S_norm, 0.0, 1.0)


    # Conversione in tensore e normalizzazione
    spec_input = torch.from_numpy(S_norm).unsqueeze(0).unsqueeze(0).float().to(device)


    # Inference con clamp
    with torch.no_grad(), amp.autocast(device_type='cuda', dtype=torch.float16, enabled=True):
        # Riferimento
        ref = spec_input.max()
        ref = torch.clamp(ref, min=1e-8)
        spec_input_n = spec_input / ref

        enhanced = model(spec_input_n)
        enhanced = torch.clamp(enhanced, 0.0, 1.0)


    print(f"enhanced range: [{enhanced.min().item():.4f}, {enhanced.max().item():.4f}]")
    enhanced_np = enhanced.squeeze().cpu().numpy()


    # Ricostruzione audio
    enhanced_audio = spec_to_audio(enhanced_np, griffinlim_iters)

    # Normalizza l’output audio
    enhanced_audio = enhanced_audio / (np.max(np.abs(enhanced_audio)) + CONFIG["epsilon"])


    # Visualizzazione
    plt.figure(figsize=(12, 6))
    plt.subplot(2, 1, 1)
    plt.imshow(S_norm ** 0.5, origin='lower', aspect='auto', cmap='viridis', vmin=0, vmax=1)
    plt.title("Input (MusicGen Output)")
    plt.colorbar()
    plt.subplot(2, 1, 2)
    plt.imshow(enhanced_np ** 0.5, origin='lower', aspect='auto', cmap='viridis', vmin=0, vmax=1)
    plt.title("Enhanced by Model")
    plt.colorbar()
    plt.tight_layout()
    plt.show()


    # Ascolto e confronto
    print("\nAudio Originale (MusicGen):")
    display(Audio(audio, rate=CONFIG["sample_rate"]))
    print("Audio Migliorato (Enhanced):")
    display(Audio(enhanced_audio, rate=CONFIG["sample_rate"]))


    # Salvataggio
    sf.write(save_path, enhanced_audio, CONFIG["sample_rate"])
    print(f"\nOutput del modello salvato in: {save_path}")


## Modello addestrato sul Gruppo A

In [None]:
# Carica il modello

device = "cuda" if torch.cuda.is_available() else "cpu"

model_A = SpectralEnhancementNet().to(device)
ckpt_A = torch.load("/content/projectID2ML/modello_lineare/checkpoint_best_speclin_gruppoA.pt", map_location=device)
model_A.load_state_dict(ckpt_A["model_state_dict"])
print(f"Modello addestrato sul Gruppo A caricato (epoch {ckpt_A['epoch']} | val_loss={ckpt_A['val_loss']:.4f})")


In [None]:
# Enhancemnet con il modello
enhance_musicgen_output(model_A, "/content/musicgen_output.wav", save_path="/content/enhanced_A.wav")

## Modello addestrato sul Gruppo B

In [None]:
# Carica il modello

device = "cuda" if torch.cuda.is_available() else "cpu"

model_B = SpectralEnhancementNet().to(device)
ckpt_B = torch.load("/content/projectID2ML/modello_lineare/checkpoint_best_speclin_gruppoB.pt", map_location=device)
model_B.load_state_dict(ckpt_B["model_state_dict"])
print(f"Modello addestrato sul Gruppo B caricato (epoch {ckpt_B['epoch']} | val_loss={ckpt_B['val_loss']:.4f})")


In [None]:
# Enhancemnet con il modello
enhance_musicgen_output(model_B, "/content/musicgen_output.wav", save_path="/content/enhanced_B.wav")

# Analisi dei Risultati

Ho addestrato il modello `Spectral Enhancement` su due domini diversi, ciascuno dedicato a una diversa famiglia di degradazioni spettrali.
Il primo modello è stato addestrato sul **gruppo A** (quantize, tonal stripes, noise), caratterizzato da degradazioni additive o periodiche, mentre il secondo sul **gruppo B** (clipping, reverb, lowpass, distort), che comprende distorsioni non lineari o di tipo convolutivo.

**Modello Addestrato sul gruppo A**

Il modello addestrato sul gruppo A mostra una mostra una rapida discesa iniziale della validation loss.
La rete impara rapidamente le caratteristiche principali delle degradazioni additive, migliorando progressivamente le componenti della loss.

Già dalle prime epoche si osserva una buona coerenza spettrale, con gli spettrogrammi enhanced che recuperano le armoniche principali e riducono il rumore di fondo.
Tuttavia, la rete mostra una leggera tendenza alla sovra-levigazione, evidente nelle ultime epoche, dove alcune strutture spettrali fini vengono attenuate, portando a una lieve perdita di dettaglio. Questo effetto è coerente con l’azione regolarizzante della loss combinata e con l’uso di normalizzazione e mixed precision.

È importante notare che durante l’addestramento la validation loss risulta quasi sempre inferiore alla training loss. Questo comportamento, sebbene atipico, può avere diverse spiegazioni, come la presenza di dropout o data augmentation nel training può rendere il modello più "rumoroso" in fase di apprendimento rispetto alla validazione.


Nel test sullo stesso dominio (*gruppo A*), il modello mostra una chiara capacità di generalizzazione, le metriche migliorano. Visivamente, gli spettrogrammi enhanced risultano più puliti e regolari, con una riduzione del rumore.

Nel complesso, il modello addestrato sul gruppo A si comporta in modo stabile ed efficace, riduce il rumore e ricostruisce le componenti tonali principali, sacrificando leggermente i dettagli fini per ottenere un segnale più regolare.


Il test out-of-domain (cioè testando il modello A sul *gruppo B*) evidenzia invece la scarsa capacità di trasferimento del modello addestrato su degradazioni additive.
Le metriche mostrano un peggioramento generale.

Gli spettrogrammi rivelano che il modello non riesce a compensare distorsioni di tipo non lineare come il clipping o convolutive come il reverb.
L’output appare quindi meno coerente e, in media, non migliora rispetto al segnale degradato.

Questo comportamento conferma che il modello, pur efficace in-domain, non generalizza bene verso degradazioni di natura diversa, poiché ha appreso rappresentazioni specifiche del rumore additive e stazionarie.






**Modello Addestrato sul gruppo B**

Il modello addestrato sul gruppo B mostra un andamento di apprendimento più graduale e irregolare rispetto a quello addestrato sul gruppo A.

Anche in questo caso, la validation loss risulta sempre inferiore alla training loss. Questo comportamento può essere attribuito all’effetto regolarizzante di dropout e normalizzazione, o al fatto che il validation set presenti una variabilità minore o una minore intensità delle degradazioni rispetto al training set.


Nel test sullo stesso dominio (*gruppo B*), il modello mostra un comportamento coerente con quello osservato in validazione, le metriche mostrano lievi miglioramenti.

Il test out-of-domain (*cioè testando il modello B sul gruppo A*) mostra un calo delle prestazioni, con metriche peggiorate rispetto al degradato.
Il modello, avendo appreso pattern specifici delle degradazioni del gruppo B, non generalizza efficacemente su fenomeni di natura additiva come quantizzazione o rumore gaussiano.

Nel complesso, anche in questo caso, il modello si dimostra efficace sulle degradazioni del propio dominio, ma limitato nella capacità di generalizzare fuori dominio.

**Possibili Miglioramenti**

Per migliorare la robustezza cross-domain, un’estensione futura potrebbe includere strategie di continual learning (es. weight regularization o replay di esempi di entrambi i gruppi), per consentire al modello di adattarsi progressivamente a nuove degradazioni senza perdere le conoscenze apprese.


**Test su Output di MusicGen**

Ho deciso di testare il mio modello su un output di MusicGen, anche se questi output sono già molto buoni. Ovviamente qui non abbiamo una traccia clean su cui poterci basare, quindi possiamo fare un'analisi qualitativa, visualizzando gli spettrogrammi e ascoltando i due audio.

Il modello addestrato sul **gruppo A** mostra un comportamento conservativo, riduce artefatti a bassa intensità, come il rumore costante, senza compromettere la struttura armonica principale. Questo approccio è efficace quando gli artefatti di MusicGen sono di natura additiva.

Il modello addestrato sul **gruppo B** tende a intervenire in modo più "deciso". Riduce componenti spurie o distorsioni percepite, come clipping, ma può anche attenuare eccessivamente le alte frequenze, generando un suono più morbido ma meno brillante.
Questo comportamento è coerente con il tipo di degradazioni su cui è stato addestrato, il modello interpreta parte della brillantezza sintetica di MusicGen come distorsione e la sopprime.


Entrambi i modelli mantengono la coerenza temporale e armonica del segnale, ma differiscono nell'intervento. Il **Modello A** è più conservativo, rimuove rumore additivo e trame periodiche mantenendo l’ariosità del suono. Il **Modello B** risulta più aggressivo, riduce distorsioni e saturazioni, ma con rischio di over-smoothing nelle alte frequenze.


Poiché i risultati visivi e sonori possono variare a seconda dell’audio generato, le differenze osservate devono essere interpretate in termini qualitativi e di tendenza, non come valori assoluti.




