### Notebook zawiera kod z atakiem adwersalnym na sieć ResNET-18 wytrenowaną na datasecie nEMO. 
### Atak adwersalny (ang. adversarial attack) na sieć neuronową to technika, która polega na celowym modyfikowaniu danych wejściowych w taki sposób, aby wprowadzić w błąd model uczenia maszynowego. Przekształcenia danych wejściowych są często niemal niewidoczne dla człowieka.



### 1. Import bibliotek

In [4]:
import sys
import os

# Dodaj katalog główny projektu do sys.path
current_dir = (
    os.path.dirname(os.path.abspath(__file__))
    if "__file__" in globals()
    else os.getcwd()
)
project_root = os.path.abspath(os.path.join(current_dir, "..", ".."))

if project_root not in sys.path:
    sys.path.insert(0, project_root)

print(f"Katalog główny projektu: {project_root}")
print(f"Czy katalog src istnieje: {os.path.exists(os.path.join(project_root, 'src'))}")


import librosa
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from tqdm import tqdm
import seaborn as sns
import soundfile as sf
from datasets import load_from_disk


from src.helpers.early_stopping import EarlyStopping
from src.helpers.augmentation import AugmentedAudioDataset
from src.helpers.resnet_model_definition import AudioResNet
from src.config import (
    BATCH_SIZE,
    EARLY_STOPPING_PATIENCE,
    MODEL_PATH,
    MAX_LENGTH,
    SEED,
    DATASET_PATH,
)
from src.create_data import download_and_save_dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report
import datetime

Katalog główny projektu: c:\Users\kubas\Desktop\Projekt dyplomowy\Audio-Emotion-Recognition
Czy katalog src istnieje: True


### 2. Ładowanie datasetu

In [5]:
# Weryfikacja istnienia folderu z danymi oraz załadowanie zbioru danych
dataset_path = DATASET_PATH
if os.path.exists(dataset_path):
    try:
        print("Rozpoczęcie ładowania datasetu z dysku...")
        dataset = load_from_disk(dataset_path)
    except Exception as e:
        print(f"Wystąpił błąd podczas ładowania datasetu: {e}")
        print("Inicjowanie ponownego pobierania datasetu...")
        dataset = download_and_save_dataset()
else:
    print("Folder 'data' nie został znaleziony. Inicjowanie pobierania datasetu...")
    dataset = download_and_save_dataset()

Rozpoczęcie ładowania datasetu z dysku...


### 3. Przetwarzanie próbek audio

In [4]:
# Ustawienie seed dla powtarzalności wyników
torch.manual_seed(SEED)
np.random.seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

# Przetwarzanie danych audio
print("Przetwarzanie próbek audio...")
features = []
labels = []

for sample in tqdm(dataset["train"], desc="Przetwarzanie próbek audio"):
    audio_array = sample["audio"]["array"]
    sr = sample["audio"]["sampling_rate"]

    # Ujednolicenie długości
    target_length = int(MAX_LENGTH * sr)
    if len(audio_array) > target_length:
        audio_array = audio_array[:target_length]
    else:
        padding = np.zeros(target_length - len(audio_array))
        audio_array = np.concatenate([audio_array, padding])

    # Ekstrakcja melspektrogramu
    S = librosa.feature.melspectrogram(y=audio_array, sr=sr, n_mels=128)
    S_db = librosa.power_to_db(S, ref=np.max)
    features.append(S_db)

    # Dodanie etykiety
    labels.append(sample["emotion"])

Przetwarzanie próbek audio...


Przetwarzanie próbek audio: 100%|██████████| 4481/4481 [00:38<00:00, 115.94it/s]


#### Konwersja i normalizacja danych audio oraz etykiet

In [5]:
# Konwersja list na tablice numpy
features = np.array(features)
features = features.reshape(features.shape[0], 1, features.shape[1], features.shape[2])

# Normalizacja danych (standardyzacja)
mean = np.mean(features)
std = np.std(features)
features = (features - mean) / std

# Konwersja etykiet na liczby
label_encoder = LabelEncoder()
labels = label_encoder.fit_transform(labels)
num_classes = len(np.unique(labels))
print(f"Liczba klas emocji: {num_classes}")
print(
    f"Mapowanie klas: {dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_)))}"
)

Liczba klas emocji: 6
Mapowanie klas: {'anger': 0, 'fear': 1, 'happiness': 2, 'neutral': 3, 'sadness': 4, 'surprised': 5}


### 4. Podział danych na zbiory treningowe, walidacyjne i testowe


In [6]:
# Podział na zbiory
X_train, X_test, y_train, y_test = train_test_split(
    features, labels, test_size=0.2, random_state=SEED, stratify=labels
)
X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.2, random_state=SEED, stratify=y_train
)

### 5. Inicjalizacja modelu i optymalizatora

In [7]:
# Inicjalizacja modelu, funkcji straty i optymalizatora
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Używane urządzenie: {device}")

# Inicjalizacja modelu
model = AudioResNet(num_classes=num_classes, dropout_rate=0.5)
model = model.to(device)

# Inicjalizacja funkcji straty i optymalizatora
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(
    model.parameters(), lr=0.001, weight_decay=1e-5
)  # Dodanie regularyzacji L2
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode="min", factor=0.5, patience=3
)

Używane urządzenie: cpu


 #### Przygotowanie zestawów danych z augmentacją i ładowanie ich do DataLoaderów

In [8]:
# Przygotowanie zestawów danych z augmentacją
train_dataset = AugmentedAudioDataset(X_train, y_train, augment=True)
val_dataset = AugmentedAudioDataset(X_val, y_val, augment=False)
test_dataset = AugmentedAudioDataset(X_test, y_test, augment=False)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

# Ścieżka do zapisywania modelu
early_stopping = EarlyStopping(patience=EARLY_STOPPING_PATIENCE, path=MODEL_PATH)

# Śledzenie historii treningu
history = {"train_loss": [], "val_loss": [], "val_accuracy": []}

### 6. Ładowanie wytrenowanego modelu

In [9]:
def load_resnet18_model(model_path, num_classes, device, dropout_rate=0.5):
    """
    Ładuje istniejący model ResNet-18 z podanej ścieżki

    Parametry:
    - model_path: ścieżka do katalogu lub pliku z modelem
    - num_classes: liczba klas wyjściowych (emocji)
    - device: urządzenie (CPU/GPU)
    - dropout_rate: współczynnik dropout (domyślnie 0.5)

    Zwraca:
    - załadowany model ResNet-18
    """
    print(f"Ładowanie modelu ResNet-18 z: {model_path}")

    # Sprawdzenie, czy podana ścieżka jest katalogiem czy plikiem
    if os.path.isdir(model_path):
        # Jeśli katalog, znajdź plik z modelem
        model_files = [f for f in os.listdir(model_path) if f.endswith((".pt", ".pth"))]
        if not model_files:
            raise FileNotFoundError(
                f"Nie znaleziono plików modelu w katalogu {model_path}"
            )

        # Wybierz najnowszy plik
        model_files.sort(
            key=lambda x: os.path.getmtime(os.path.join(model_path, x)), reverse=True
        )
        model_file = os.path.join(model_path, model_files[0])
        print(f"Wybrany plik modelu: {model_files[0]}")
    else:
        # Jeśli bezpośrednia ścieżka do pliku
        model_file = model_path

    # WAŻNE: Inicjalizacja modelu z tymi samymi parametrami co podczas treningu
    model = AudioResNet(num_classes=num_classes, dropout_rate=dropout_rate)

    try:
        # Ładowanie wag modelu
        checkpoint = torch.load(model_file, map_location=device)

        # Sprawdzenie typu zapisanego modelu
        if isinstance(checkpoint, dict):
            if "model_state_dict" in checkpoint:
                model.load_state_dict(checkpoint["model_state_dict"])
                print("Załadowano model_state_dict z checkpointu")
            elif "state_dict" in checkpoint:
                model.load_state_dict(checkpoint["state_dict"])
                print("Załadowano state_dict z checkpointu")
            else:
                # Próba załadowania bezpośrednio
                model.load_state_dict(checkpoint)
                print("Załadowano checkpoint jako state_dict")
        else:
            # Bezpośredni state_dict
            model.load_state_dict(checkpoint)
            print("Załadowano bezpośredni state_dict")

    except Exception as e:
        print(f"Błąd podczas ładowania modelu: {e}")
        print("Sprawdzam strukturę checkpointu...")

        # Diagnostyka - sprawdź co jest w checkpoincie
        if isinstance(checkpoint, dict):
            print(f"Klucze w checkpoincie: {list(checkpoint.keys())}")

        # Próba załadowania z ignorowaniem niekompatybilnych warstw
        try:
            model.load_state_dict(checkpoint, strict=False)
            print("Załadowano model z strict=False")
        except Exception as e2:
            print(f"Nie udało się załadować modelu nawet z strict=False: {e2}")
            raise e2

    model = model.to(device)
    model.eval()  # Ustawienie modelu w trybie ewaluacji dla ataku

    print(f"Model ResNet-18 załadowany pomyślnie na urządzenie: {device}")

    # Sprawdzenie architektury modelu
    total_params = sum(p.numel() for p in model.parameters())
    print(f"Całkowita liczba parametrów: {total_params:,}")

    return model

 #### Funkcja sprawdzająca gotowość modelu do ataku adwersalnego

In [10]:
def verify_model_compatibility(model, test_loader, device):
    """
    Sprawdza kompatybilność załadowanego modelu z danymi testowymi
    """
    print("Sprawdzanie kompatybilności modelu...")

    try:
        # Pobierz jedną próbkę
        for batch_features, batch_labels in test_loader:
            batch_features = batch_features.to(device)
            batch_labels = batch_labels.to(device)

            # Test forward pass
            with torch.no_grad():
                outputs = model(batch_features)
                print(f"Wejście: {batch_features.shape}")
                print(f"Wyjście: {outputs.shape}")
                print(f"Oczekiwana liczba klas: {outputs.shape[1]}")

            break

        print("✓ Model jest kompatybilny z danymi")
        return True

    except Exception as e:
        print(f"✗ Błąd kompatybilności: {e}")
        return False

In [11]:
# Poprawiona ścieżka do katalogu z modelem ResNet wytrenowanym na mel-spectrogramach
MODEL_PATH = os.path.join(project_root, "src", "ResNet_mel", "model_outputs")
NUM_CLASSES = 6
DROPOUT_RATE = 0.5  # Upewnij się, że to jest ta sama wartość co podczas treningu

# Załaduj model
model = load_resnet18_model(MODEL_PATH, NUM_CLASSES, device, DROPOUT_RATE)

# Sprawdź kompatybilność
if verify_model_compatibility(model, test_loader, device):
    print("Model gotowy do ataku adwersalnego!")
else:
    print("Sprawdź konfigurację modelu i danych!")

Ładowanie modelu ResNet-18 z: c:\Users\kubas\Desktop\Projekt dyplomowy\Audio-Emotion-Recognition\src\ResNet_mel\model_outputs
Wybrany plik modelu: best_model_20250515_090117.pt
Załadowano checkpoint jako state_dict
Model ResNet-18 załadowany pomyślnie na urządzenie: cpu
Całkowita liczba parametrów: 11,173,318
Sprawdzanie kompatybilności modelu...
Wejście: torch.Size([32, 1, 128, 141])
Wyjście: torch.Size([32, 6])
Oczekiwana liczba klas: 6
✓ Model jest kompatybilny z danymi
Model gotowy do ataku adwersalnego!


## ATAK ADWERSALNY

### Wykorzystaną metodą ataku adwersalnego jest Fast Gradient Sign Method (FGSM).
Jest to jedna z najprostszych i najszybszych metod. FGSM modyfikuje wejście tak, aby maksymalizować stratę, czyli wprowadza model w błąd.  
Etapy ataku:
1. Obliczamy gradient funkcji straty względem danych wejściowych.
2. Tworzymy perturbację (zakłócenie) poprzez pomnożenie znaku gradientu przez niski parametr ε.
3. Dodajemy perturbację do oryginalnego obrazu.


In [12]:
def fgsm_attack(model, data, target, epsilon, criterion):
    """
    Implementacja ataku Fast Gradient Sign Method (FGSM)

    Parametry:
    - model: model neural network
    - data: tensor wejściowy (batch of mel-spectrograms)
    - target: prawdziwe etykiety
    - epsilon: siła ataku
    - criterion: funkcja straty

    Zwraca:
    - perturbed_data: dane po ataku
    - perturbation: dodana perturbacja
    """
    # Upewnij się, że dane wymagają gradientów
    data.requires_grad = True

    # Forward pass
    output = model(data)

    # Oblicz stratę
    loss = criterion(output, target)

    # Backward pass - oblicz gradienty
    model.zero_grad()
    loss.backward()

    # Zbierz gradienty z danych wejściowych
    data_grad = data.grad.data

    # Stwórz zakłócenie używając znaku gradientu
    sign_data_grad = data_grad.sign()

    # Stwórz zakłócone dane przez dodanie epsilon * sign(gradient)
    perturbed_data = data + epsilon * sign_data_grad

    # Opcjonalnie: ogranic wartości do odpowiedniego zakresu (dla normalized mel-spectrograms)
    # Zakładając, że dane wejściowe są znormalizowane (mean=0, std=1)
    # perturbed_data = torch.clamp(perturbed_data, -3, 3)  # ±3 std deviations

    # Oblicz rzeczywistą perturbację
    perturbation = perturbed_data - data

    return perturbed_data.detach(), perturbation.detach()

#### Denormalizacja spektrogramów
Aby odtworzyć dźwięk z melspektrogramu po ataku adwersalnym należy przywrócić do oryginalnego zakresu wartości, w jakim był przed normalizacją.

In [13]:
def denormalize_melspectrogram(normalized_melspec, mean, std):
    """
    Denormalizuje mel-spektrogram

    Parametry:
    - normalized_melspec: znormalizowany mel-spektrogram
    - mean: średnia użyta do normalizacji
    - std: odchylenie standardowe użyte do normalizacji

    Zwraca:
    - denormalized_melspec: denormalizowany mel-spektrogram
    """
    return normalized_melspec * std + mean

#### Zamiana spetrogramów z powrotem na dźwięk

In [14]:
def melspectrogram_to_audio_advanced(
    melspec_db, sr=24000, n_fft=2048, hop_length=512, n_mels=128, n_iter=32, length=None
):
    """
    Zaawansowana konwersja mel-spektrogramu na audio z lepszą jakością

    Parametry:
    - melspec_db: mel-spektrogram w dB
    - sr: sampling rate
    - n_fft: FFT window size
    - hop_length: hop length
    - n_mels: liczba mel bins
    - n_iter: iteracje Griffin-Lim
    - length: docelowa długość

    Zwraca:
    - audio: zrekonstruowany sygnał
    """
    # Konwersja z dB na power
    melspec_power = librosa.db_to_power(melspec_db)

    # Tworzymy mel filter bank
    mel_basis = librosa.filters.mel(sr=sr, n_fft=n_fft, n_mels=n_mels)

    # Próba odwrócenia mel-skali (pseudo-inverse)
    mel_basis_inv = np.linalg.pinv(mel_basis)

    # Rekonstrukcja spektrogramu
    spec_power = np.dot(mel_basis_inv, melspec_power)

    # Upewnij się, że wartości nie są negatywne
    spec_power = np.abs(spec_power)

    # Griffin-Lim do rekonstrukcji fazy
    audio = librosa.griffinlim(
        spec_power, n_iter=n_iter, hop_length=hop_length, length=length
    )

    return audio

#### Funkcja poniżej przeprowadza atak FGSM na całą partię danych z loadera, zapisuje audio przed i po ataku dla kilku przykładów i liczy statystyki:
    - dokładność modelu przed i po ataku
    - skuteczność ataku (ile predykcji się zmieniło)

In [15]:
def process_adversarial_batch(
    model,
    data_loader,
    epsilon,
    criterion,
    device,
    output_dir="adversarial_audio",
    mean=0,
    std=1,
    sr=24000,
):
    """
    Przetwarza całą partię danych przez atak FGSM i zapisuje audio

    Parametry:
    - model: trenowany model
    - data_loader: DataLoader z danymi
    - epsilon: siła ataku
    - criterion: funkcja straty
    - device: urządzenie obliczeniowe
    - output_dir: katalog na pliki wyjściowe
    - mean, std: parametry normalizacji
    - sr: sampling rate

    Zwraca:
    - results: słownik z wynikami
    """
    model.eval()
    os.makedirs(output_dir, exist_ok=True)

    results = {
        "original_predictions": [],
        "adversarial_predictions": [],
        "original_accuracy": 0,
        "adversarial_accuracy": 0,
        "attack_success_rate": 0,
    }

    correct_original = 0
    correct_adversarial = 0
    attack_success = 0
    total_samples = 0

    for batch_idx, (data, target) in enumerate(
        tqdm(data_loader, desc=f"FGSM ε={epsilon}")
    ):
        data, target = data.to(device), target.to(device)

        # Oryginalny forward pass
        with torch.no_grad():
            original_output = model(data)
            original_pred = original_output.argmax(dim=1)

        # Atak FGSM - tutaj tensor będzie wymagał gradientów
        perturbed_data, perturbation = fgsm_attack(
            model, data, target, epsilon, criterion
        )

        # Forward pass na danych po ataku (perturbed_data już jest detached w fgsm_attack)
        with torch.no_grad():
            adversarial_output = model(perturbed_data)
            adversarial_pred = adversarial_output.argmax(dim=1)

        # Statystyki
        correct_original += (original_pred == target).sum().item()
        correct_adversarial += (adversarial_pred == target).sum().item()
        attack_success += (original_pred != adversarial_pred).sum().item()
        total_samples += data.size(0)

        # Zapisz przykłady audio (pierwszy przykład z każdej partii)
        if batch_idx < 5:  # Zapisz tylko pierwsze 5 partii
            for i in range(min(3, data.size(0))):  # Pierwsze 3 próbki z partii
                try:
                    # Denormalizuj spektrogramy - używaj .detach().cpu().numpy()
                    orig_melspec = denormalize_melspectrogram(
                        data[i].detach().cpu().numpy().squeeze(), mean, std
                    )
                    adv_melspec = denormalize_melspectrogram(
                        perturbed_data[i].detach().cpu().numpy().squeeze(), mean, std
                    )

                    # Konwertuj na audio
                    orig_audio = melspectrogram_to_audio_advanced(orig_melspec, sr=sr)
                    adv_audio = melspectrogram_to_audio_advanced(adv_melspec, sr=sr)

                    # Zapisz pliki audio
                    orig_filename = (
                        f"{output_dir}/original_batch{batch_idx}_sample{i}.wav"
                    )
                    adv_filename = f"{output_dir}/adversarial_batch{batch_idx}_sample{i}_eps{epsilon}.wav"

                    sf.write(orig_filename, orig_audio, sr)
                    sf.write(adv_filename, adv_audio, sr)

                except Exception as e:
                    print(
                        f"Błąd podczas konwersji audio (batch {batch_idx}, sample {i}): {e}"
                    )
                    continue

        # Zapisz predykcje - upewnij się, że są detached
        results["original_predictions"].extend(original_pred.detach().cpu().tolist())
        results["adversarial_predictions"].extend(
            adversarial_pred.detach().cpu().tolist()
        )

    # Oblicz końcowe statystyki
    results["original_accuracy"] = correct_original / total_samples
    results["adversarial_accuracy"] = correct_adversarial / total_samples
    results["attack_success_rate"] = attack_success / total_samples

    print(f"\nWyniki dla ε = {epsilon}:")
    print(f"Dokładność oryginalna: {results['original_accuracy']:.4f}")
    print(f"Dokładność po ataku: {results['adversarial_accuracy']:.4f}")
    print(f"Sukces ataku: {results['attack_success_rate']:.4f}")

    return results

#### Testowanie różnych wartości epsilon

In [16]:
def test_multiple_epsilons(
    model,
    test_loader,
    epsilons,
    criterion,
    device,
    output_dir="fgsm_results",
    mean=0,
    std=1,
    sr=24000,
):
    """
    Testuje atak FGSM dla różnych wartości epsilon

    Parametry:
    - model: model do testowania
    - test_loader: DataLoader z danymi testowymi
    - epsilons: lista wartości epsilon
    - criterion: funkcja straty
    - device: urządzenie obliczeniowe
    - output_dir: katalog wyjściowy
    - mean, std: parametry normalizacji
    - sr: sampling rate

    Zwraca:
    - all_results: wyniki dla wszystkich epsilon
    """
    os.makedirs(output_dir, exist_ok=True)
    all_results = {}

    for epsilon in epsilons:
        epsilon_dir = os.path.join(output_dir, f"epsilon_{epsilon}")
        results = process_adversarial_batch(
            model, test_loader, epsilon, criterion, device, epsilon_dir, mean, std, sr
        )
        all_results[epsilon] = results

    # Wygeneruj wykres podsumowujący
    plot_adversarial_results(all_results, output_dir)

    return all_results

#### Wizualizacja wyników ataku adwersalnego

In [17]:
def plot_adversarial_results(results_dict, output_dir):
    """
    Tworzy wykres wyników ataku adwersalnego
    """
    epsilons = list(results_dict.keys())
    original_acc = [results_dict[eps]["original_accuracy"] for eps in epsilons]
    adversarial_acc = [results_dict[eps]["adversarial_accuracy"] for eps in epsilons]
    attack_success = [results_dict[eps]["attack_success_rate"] for eps in epsilons]

    plt.figure(figsize=(12, 8))

    # Wykres 1: Dokładność vs Epsilon
    plt.subplot(2, 2, 1)
    plt.plot(epsilons, original_acc, "b-o", label="Oryginalna dokładność")
    plt.plot(epsilons, adversarial_acc, "r-o", label="Dokładność po ataku")
    plt.xlabel("Epsilon")
    plt.ylabel("Dokładność")
    plt.title("Dokładność vs Siła ataku")
    plt.legend()
    plt.grid(True)

    # Wykres 2: Sukces ataku vs Epsilon
    plt.subplot(2, 2, 2)
    plt.plot(epsilons, attack_success, "g-o", label="Sukces ataku")
    plt.xlabel("Epsilon")
    plt.ylabel("Wskaźnik sukcesu ataku")
    plt.title("Skuteczność ataku vs Epsilon")
    plt.legend()
    plt.grid(True)

    # Wykres 3: Degradacja wydajności
    plt.subplot(2, 2, 3)
    performance_drop = [orig - adv for orig, adv in zip(original_acc, adversarial_acc)]
    plt.plot(epsilons, performance_drop, "m-o", label="Spadek wydajności")
    plt.xlabel("Epsilon")
    plt.ylabel("Spadek dokładności")
    plt.title("Degradacja wydajności")
    plt.legend()
    plt.grid(True)

    # Tabela wyników
    plt.subplot(2, 2, 4)
    plt.axis("tight")
    plt.axis("off")

    table_data = []
    for eps in epsilons:
        table_data.append(
            [
                f"{eps:.3f}",
                f"{results_dict[eps]['original_accuracy']:.3f}",
                f"{results_dict[eps]['adversarial_accuracy']:.3f}",
                f"{results_dict[eps]['attack_success_rate']:.3f}",
            ]
        )

    table = plt.table(
        cellText=table_data,
        colLabels=["Epsilon", "Org. Acc", "Adv. Acc", "Attack Success"],
        cellLoc="center",
        loc="center",
    )
    table.auto_set_font_size(False)
    table.set_fontsize(9)
    plt.title("Podsumowanie wyników")

    plt.tight_layout()
    plt.savefig(
        os.path.join(output_dir, "adversarial_attack_results.png"),
        dpi=300,
        bbox_inches="tight",
    )
    plt.close()

    print(f"Wykres został zapisany w: {output_dir}/adversarial_attack_results.png")

In [18]:
def visualize_adversarial_examples(
    all_results, epsilons, label_encoder, data_loader, model, device, output_dir
):
    """
    Wizualizuje przykłady adwersalne dla melspektrogramów

    Parametry:
    - all_results: słownik z wynikami ataku dla różnych wartości epsilon
    - epsilons: lista wartości epsilon
    - label_encoder: enkoder etykiet
    - data_loader: DataLoader z danymi testowymi
    - model: model do testowania
    - device: urządzenie obliczeniowe
    - output_dir: katalog do zapisywania wyników
    """

    class_names = label_encoder.classes_

    # Utworzenie katalogu dla wizualizacji
    visualization_dir = os.path.join(output_dir, "visualizations")
    os.makedirs(visualization_dir, exist_ok=True)
    print(f"Utworzono katalog: {visualization_dir}")

    # Znajdź przykłady, które zostały błędnie sklasyfikowane po ataku
    model.eval()
    cnt = 0

    # Dla każdego epsilon (pomijając 0)
    for eps in epsilons:
        if eps == 0 or eps not in all_results:
            continue

        print(f"Przetwarzam epsilon={eps}")

        # Utwórz katalog dla danego epsilon
        eps_dir = os.path.join(visualization_dir, f"epsilon_{eps}")
        os.makedirs(eps_dir, exist_ok=True)

        # Poszukaj przykładów, dla których atak był skuteczny
        example_count = 0
        for batch_idx, (data, target) in enumerate(
            tqdm(data_loader, desc=f"Szukanie przykładów dla ε={eps}")
        ):
            if example_count >= 5:  # Ograniczenie do 5 przykładów na każdy epsilon
                break

            data, target = data.to(device), target.to(device)

            # Oryginalny forward pass
            with torch.no_grad():
                original_output = model(data)
                original_pred = original_output.argmax(dim=1)

            # Atak FGSM
            perturbed_data, perturbation = fgsm_attack(
                model, data, target, eps, criterion
            )

            # Forward pass na danych po ataku
            with torch.no_grad():
                adversarial_output = model(perturbed_data)
                adversarial_pred = adversarial_output.argmax(dim=1)

            # Znajdź przykłady, dla których predykcja się zmieniła
            for i in range(data.size(0)):
                if original_pred[i] != adversarial_pred[i]:
                    # Znaleziono przykład, dla którego atak był skuteczny
                    orig_img = data[i].detach().cpu().squeeze(0).numpy()
                    adv_img = perturbed_data[i].detach().cpu().squeeze(0).numpy()
                    diff_img = adv_img - orig_img

                    # Wizualizacja
                    fig, axes = plt.subplots(1, 3, figsize=(15, 5))

                    # Oryginalny melspektrogram
                    im0 = axes[0].imshow(
                        orig_img, aspect="auto", origin="lower", cmap="viridis"
                    )
                    axes[0].set_title(
                        f"Oryginalny\nKlasa: {class_names[original_pred[i]]}"
                    )
                    axes[0].set_ylabel("Mel Bins")
                    axes[0].set_xlabel("Frames")

                    # Perturbacja (różnica)
                    im1 = axes[1].imshow(
                        diff_img, aspect="auto", origin="lower", cmap="coolwarm"
                    )
                    axes[1].set_title(f"Perturbacja\nEpsilon: {eps}")
                    axes[1].set_xlabel("Frames")

                    # Przykład adwersalny
                    im2 = axes[2].imshow(
                        adv_img, aspect="auto", origin="lower", cmap="viridis"
                    )
                    axes[2].set_title(
                        f"Adwersalny\nKlasa: {class_names[adversarial_pred[i]]}"
                    )
                    axes[2].set_xlabel("Frames")

                    # Dodaj paski kolorów
                    plt.colorbar(im0, ax=axes[0], fraction=0.046, pad=0.04)
                    plt.colorbar(im1, ax=axes[1], fraction=0.046, pad=0.04)
                    plt.colorbar(im2, ax=axes[2], fraction=0.046, pad=0.04)

                    plt.tight_layout()
                    plt.savefig(
                        os.path.join(
                            eps_dir, f"adversarial_example_{example_count}.png"
                        )
                    )
                    plt.close()

                    example_count += 1
                    cnt += 1

                    if (
                        example_count >= 5
                    ):  # Ograniczenie do 5 przykładów na każdy epsilon
                        break

            if cnt >= 15:  # Ograniczenie do 15 przykładów łącznie
                return

    # Jeśli nie znaleziono żadnych przykładów
    if cnt == 0:
        print("Nie znaleziono żadnych przykładów, dla których atak byłby skuteczny.")
        print("Spróbuj zwiększyć wartości epsilon lub użyć innego modelu.")

### Uruchomienie ataku dla różnych wartości epsilon

In [19]:
criterion = nn.CrossEntropyLoss()


epsilons = [0, 0.001, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2]
results = test_multiple_epsilons(
    model=model,
    test_loader=test_loader,
    epsilons=epsilons,
    criterion=criterion,
    device=device,
    output_dir="fgsm_attack_results",
    mean=mean,
    std=std,
    sr=24000,
)

FGSM ε=0: 100%|██████████| 29/29 [00:33<00:00,  1.15s/it]



Wyniki dla ε = 0:
Dokładność oryginalna: 0.9130
Dokładność po ataku: 0.9130
Sukces ataku: 0.0000


FGSM ε=0.001: 100%|██████████| 29/29 [00:33<00:00,  1.14s/it]



Wyniki dla ε = 0.001:
Dokładność oryginalna: 0.9130
Dokładność po ataku: 0.8986
Sukces ataku: 0.0145


FGSM ε=0.005: 100%|██████████| 29/29 [00:33<00:00,  1.15s/it]



Wyniki dla ε = 0.005:
Dokładność oryginalna: 0.9130
Dokładność po ataku: 0.7982
Sukces ataku: 0.1148


FGSM ε=0.01: 100%|██████████| 29/29 [00:34<00:00,  1.19s/it]



Wyniki dla ε = 0.01:
Dokładność oryginalna: 0.9130
Dokładność po ataku: 0.6132
Sukces ataku: 0.2999


FGSM ε=0.02: 100%|██████████| 29/29 [00:34<00:00,  1.19s/it]



Wyniki dla ε = 0.02:
Dokładność oryginalna: 0.9130
Dokładność po ataku: 0.3133
Sukces ataku: 0.6020


FGSM ε=0.05: 100%|██████████| 29/29 [00:35<00:00,  1.23s/it]



Wyniki dla ε = 0.05:
Dokładność oryginalna: 0.9130
Dokładność po ataku: 0.0268
Sukces ataku: 0.8896


FGSM ε=0.1: 100%|██████████| 29/29 [00:35<00:00,  1.21s/it]



Wyniki dla ε = 0.1:
Dokładność oryginalna: 0.9130
Dokładność po ataku: 0.0011
Sukces ataku: 0.9175


FGSM ε=0.2: 100%|██████████| 29/29 [00:35<00:00,  1.23s/it]



Wyniki dla ε = 0.2:
Dokładność oryginalna: 0.9130
Dokładność po ataku: 0.0000
Sukces ataku: 0.9186
Wykres został zapisany w: fgsm_attack_results/adversarial_attack_results.png


In [20]:
visualize_adversarial_examples(
    all_results=results,
    epsilons=[0.001, 0.005, 0.02],
    label_encoder=label_encoder,
    data_loader=test_loader,
    model=model,
    device=device,
    output_dir="fgsm_attack_results",
)

Utworzono katalog: fgsm_attack_results\visualizations
Przetwarzam epsilon=0.001


Szukanie przykładów dla ε=0.001:  38%|███▊      | 11/29 [00:14<00:23,  1.32s/it]


Przetwarzam epsilon=0.005


Szukanie przykładów dla ε=0.005:   3%|▎         | 1/29 [00:03<01:41,  3.61s/it]


Przetwarzam epsilon=0.02


Szukanie przykładów dla ε=0.02:   0%|          | 0/29 [00:03<?, ?it/s]


#### Funkcja podgłaśniająca nagrania wyekstrahowane ze spektrogramów po ataku adwersalnym (pierwotne były bardzo ciche)

In [21]:
def boost_audio_folder(input_folder, output_folder, target_amp=0.9):
    """
    Podgłaśnia wszystkie pliki WAV w folderze input_folder
    i zapisuje je w output_folder z maksymalną amplitudą target_amp.
    """
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for filename in os.listdir(input_folder):
        if filename.lower().endswith(".wav"):
            input_path = os.path.join(input_folder, filename)
            output_path = os.path.join(output_folder, filename)

            # Wczytanie audio
            audio, sr = sf.read(input_path)

            # Obliczenie obecnej max amplitudy
            max_amp = np.max(np.abs(audio))
            if max_amp == 0:
                print(f"Plik {filename} jest pusty lub ma zero amplitudy — pominięty.")
                continue

            # Obliczenie wzmocnienia i podgłośnienie
            gain = target_amp / max_amp
            audio_boosted = audio * gain

            # Zapisanie podgłośnionego nagrania
            sf.write(output_path, audio_boosted, sr)
            print(f"Podgłośniono: {filename} (gain: {gain:.2f})")

    print("Podgłaśnianie zakończone.")


# Przykład użycia:
boost_audio_folder(
    "fgsm_attack_results/epsilon_0.2", "fgsm_attack_results/epsilon_0.2/new"
)

Podgłośniono: adversarial_batch0_sample0_eps0.2.wav (gain: 9.33)
Podgłośniono: adversarial_batch0_sample1_eps0.2.wav (gain: 8.59)
Podgłośniono: adversarial_batch0_sample2_eps0.2.wav (gain: 8.07)
Podgłośniono: adversarial_batch1_sample0_eps0.2.wav (gain: 9.83)
Podgłośniono: adversarial_batch1_sample1_eps0.2.wav (gain: 6.03)
Podgłośniono: adversarial_batch1_sample2_eps0.2.wav (gain: 16.56)
Podgłośniono: adversarial_batch2_sample0_eps0.2.wav (gain: 9.72)
Podgłośniono: adversarial_batch2_sample1_eps0.2.wav (gain: 22.24)
Podgłośniono: adversarial_batch2_sample2_eps0.2.wav (gain: 4.88)
Podgłośniono: adversarial_batch3_sample0_eps0.2.wav (gain: 5.90)
Podgłośniono: adversarial_batch3_sample1_eps0.2.wav (gain: 8.49)
Podgłośniono: adversarial_batch3_sample2_eps0.2.wav (gain: 10.43)
Podgłośniono: adversarial_batch4_sample0_eps0.2.wav (gain: 5.72)
Podgłośniono: adversarial_batch4_sample1_eps0.2.wav (gain: 7.26)
Podgłośniono: adversarial_batch4_sample2_eps0.2.wav (gain: 6.65)
Podgłośniono: original

#### Analiza podatności poszczególnych klas na atak FGSM dla różnych wartości epsilon
Wynikiem działania funkcji są raporty klasyfikacji oraz macierze pomyłek dla modeli "zaatakowanych" poszczególnymi wartościami epsilon i porównanie dokładności modelu bazowego do dokładności modeli po ataku. 

In [27]:
# Utworzenie katalogu dla wyników ataku FGSM
TIMESTAMP = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
OUTPUT_DIR = os.path.join(project_root, "src", "Atak_adw", "model_outputs")
FGSM_DIR = os.path.join(OUTPUT_DIR, f"fgsm_audio_results_{TIMESTAMP}")
os.makedirs(FGSM_DIR, exist_ok=True)  # Utworzenie katalogu dla wykresów i raportów
AUDIO_FGSM_DIR = os.path.join(OUTPUT_DIR, f"fgsm_audio_results_{TIMESTAMP}")
os.makedirs(AUDIO_FGSM_DIR, exist_ok=True)  # Utworzenie katalogu dla audio

# Analiza najbardziej podatnych klas na atak dla wielu wartości epsilon
print(
    "\nAnalizuję podatność poszczególnych klas na atak FGSM dla różnych wartości epsilon..."
)

# Lista wartości epsilon do analizy podatności klas
target_epsilons = [0.005, 0.01, 0.02]

# Słownik do przechowywania wyników dla każdej wartości epsilon
class_accuracy_results = {}

# Dla każdej wartości epsilon wykonaj analizę
for target_epsilon in target_epsilons:
    print(f"\nAnaliza dla epsilon = {target_epsilon}")

    # Przechowywanie macierzy konfuzji dla ataku
    all_preds_adv = []
    all_labels_adv = []

    # Uruchomienie ataku z wybraną wartością epsilon
    model.eval()
    for data, target in tqdm(test_loader, desc=f"Atak FGSM (eps={target_epsilon})"):
        data, target = data.to(device), target.to(device)

        # Przeprowadzenie ataku FGSM
        # Przeprowadzenie ataku FGSM
        perturbed_data, _ = fgsm_attack(
            model, data.clone(), target, target_epsilon, criterion
        )  # Predykcja na przykładach adwersalnych
        with torch.no_grad():
            output = model(perturbed_data)

        # Wyznaczenie predykcji
        _, predicted = torch.max(output.data, 1)

        # Zapisanie predykcji i rzeczywistych etykiet
        all_preds_adv.extend(predicted.cpu().numpy())
        all_labels_adv.extend(target.cpu().numpy())

    # Macierz konfuzji dla ataku
    cm_adv = confusion_matrix(all_labels_adv, all_preds_adv)
    class_names = label_encoder.classes_

    # Wizualizacja macierzy konfuzji dla ataku
    plt.figure(figsize=(10, 8))
    cm_normalized_adv = cm_adv.astype("float") / cm_adv.sum(axis=1)[:, np.newaxis]
    sns.heatmap(
        cm_normalized_adv,
        annot=True,
        fmt=".2f",
        cmap="Reds",
        xticklabels=class_names,
        yticklabels=class_names,
    )
    plt.title(f"Znormalizowana macierz konfuzji po ataku FGSM (ε={target_epsilon})")
    plt.ylabel("Rzeczywista etykieta")
    plt.xlabel("Przewidziana etykieta")
    plt.tight_layout()
    plt.savefig(os.path.join(FGSM_DIR, f"confusion_matrix_fgsm_{target_epsilon}.png"))
    plt.close()

    # Raport klasyfikacji dla ataku
    report_adv = classification_report(
        all_labels_adv, all_preds_adv, target_names=class_names, output_dict=True
    )
    report_df_adv = pd.DataFrame(report_adv).transpose()
    print(f"\nRaport klasyfikacji po ataku FGSM (epsilon={target_epsilon}):")
    print(report_df_adv)
    report_df_adv.to_csv(
        os.path.join(FGSM_DIR, f"classification_report_fgsm_{target_epsilon}.csv")
    )

    # Zapisanie dokładności dla każdej klasy przy danej wartości epsilon
    class_accuracies = {name: report_adv[name]["recall"] for name in class_names}
    class_accuracy_results[target_epsilon] = class_accuracies

# Utworzenie DataFrame z wynikami dla wszystkich wartości epsilon
comparison_df = pd.DataFrame(index=class_names)

all_preds_orig = []
all_labels_orig = []
model.eval()
with torch.no_grad():
    for data, target in tqdm(test_loader, desc="Ewaluacja na oryginalnych danych"):
        data, target = data.to(device), target.to(device)
        output = model(data)
        _, predicted = torch.max(output.data, 1)
        all_preds_orig.extend(predicted.cpu().numpy())
        all_labels_orig.extend(target.cpu().numpy())

# Bazowa dokładność (bez ataku)
report = classification_report(
    all_labels_orig, all_preds_orig, target_names=class_names, output_dict=True
)
baseline_accuracies = {name: report[name]["recall"] for name in class_names}
comparison_df["Dokładność bazowa"] = pd.Series(baseline_accuracies)

# Dodanie kolumn dla każdej wartości epsilon
for eps in target_epsilons:
    comparison_df[f"Dokładność (ε={eps})"] = pd.Series(class_accuracy_results[eps])
    comparison_df[f"Spadek (ε={eps})"] = (
        comparison_df["Dokładność bazowa"] - comparison_df[f"Dokładność (ε={eps})"]
    )

# Sortowanie według średniego spadku dokładności
comparison_df["Średni spadek"] = comparison_df[
    [f"Spadek (ε={eps})" for eps in target_epsilons]
].mean(axis=1)
comparison_df = comparison_df.sort_values("Średni spadek", ascending=False)

print(
    "\nPorównanie dokładności klas przed i po ataku FGSM dla różnych wartości epsilon:"
)
print(comparison_df)
comparison_df.to_csv(os.path.join(FGSM_DIR, "accuracy_comparison_all_epsilons.csv"))

# Wizualizacja spadku dokładności dla każdej klasy w funkcji epsilon
plt.figure(figsize=(12, 8))
for class_name in class_names:
    accuracies = [baseline_accuracies[class_name]] + [
        class_accuracy_results[eps][class_name] for eps in target_epsilons
    ]
    plt.plot(
        [0] + target_epsilons, accuracies, marker="o", linewidth=2, label=class_name
    )

plt.xlabel("Epsilon (siła ataku)")
plt.ylabel("Dokładność klasyfikacji")
plt.title("Spadek dokładności klasyfikacji emocji pod wpływem ataku FGSM")
plt.legend()
plt.grid(True)
plt.ylim(0, 1.05)
plt.savefig(os.path.join(FGSM_DIR, "accuracy_vs_epsilon_by_class.png"))
plt.close()

print(f"\nAnaliza zakończona. Wszystkie wyniki zostały zapisane w katalogu: {FGSM_DIR}")


Analizuję podatność poszczególnych klas na atak FGSM dla różnych wartości epsilon...

Analiza dla epsilon = 0.005


Atak FGSM (eps=0.005): 100%|██████████| 29/29 [00:20<00:00,  1.41it/s]



Raport klasyfikacji po ataku FGSM (epsilon=0.005):
              precision    recall  f1-score     support
anger          0.917293  0.813333  0.862191  150.000000
fear           0.885906  0.897959  0.891892  147.000000
happiness      0.666667  0.680000  0.673267  150.000000
neutral        0.797619  0.827160  0.812121  162.000000
sadness        0.871166  0.922078  0.895899  154.000000
surprised      0.641221  0.626866  0.633962  134.000000
accuracy       0.798216  0.798216  0.798216    0.798216
macro avg      0.796645  0.794566  0.794889  897.000000
weighted avg   0.799464  0.798216  0.798116  897.000000

Analiza dla epsilon = 0.01


Atak FGSM (eps=0.01): 100%|██████████| 29/29 [00:20<00:00,  1.39it/s]



Raport klasyfikacji po ataku FGSM (epsilon=0.01):
              precision    recall  f1-score     support
anger          0.745283  0.526667  0.617188  150.000000
fear           0.754967  0.775510  0.765101  147.000000
happiness      0.441176  0.500000  0.468750  150.000000
neutral        0.619048  0.641975  0.630303  162.000000
sadness        0.757576  0.811688  0.783699  154.000000
surprised      0.386861  0.395522  0.391144  134.000000
accuracy       0.613155  0.613155  0.613155    0.613155
macro avg      0.617485  0.608560  0.609364  897.000000
weighted avg   0.621785  0.613155  0.613793  897.000000

Analiza dla epsilon = 0.02


Atak FGSM (eps=0.02): 100%|██████████| 29/29 [00:21<00:00,  1.34it/s]



Raport klasyfikacji po ataku FGSM (epsilon=0.02):
              precision    recall  f1-score     support
anger          0.407407  0.220000  0.285714  150.000000
fear           0.526316  0.544218  0.535117  147.000000
happiness      0.192513  0.240000  0.213650  150.000000
neutral        0.248521  0.259259  0.253776  162.000000
sadness        0.506579  0.500000  0.503268  154.000000
surprised      0.083333  0.097015  0.089655  134.000000
accuracy       0.313266  0.313266  0.313266    0.313266
macro avg      0.327445  0.310082  0.313530  897.000000
weighted avg   0.330877  0.313266  0.316829  897.000000


Ewaluacja na oryginalnych danych: 100%|██████████| 29/29 [00:05<00:00,  5.20it/s]


Porównanie dokładności klas przed i po ataku FGSM dla różnych wartości epsilon:
           Dokładność bazowa  Dokładność (ε=0.005)  Spadek (ε=0.005)  \
surprised           0.813433              0.626866          0.186567   
anger               0.906667              0.813333          0.093333   
happiness           0.853333              0.680000          0.173333   
neutral             0.950617              0.827160          0.123457   
sadness             0.980519              0.922078          0.058442   
fear                0.959184              0.897959          0.061224   

           Dokładność (ε=0.01)  Spadek (ε=0.01)  Dokładność (ε=0.02)  \
surprised             0.395522         0.417910             0.097015   
anger                 0.526667         0.380000             0.220000   
happiness             0.500000         0.353333             0.240000   
neutral               0.641975         0.308642             0.259259   
sadness               0.811688         0.168831       




### PODSUMOWANIE
1.	Krytyczny próg epsilon - wartość ε=0.02 wydaje się być punktem krytycznym, przy którym dokładność spada poniżej 50%.
2.	Podatność na klasy - model wykazuje znacząco różną odporność dla różnych emocji. Klasy "zaskoczenie", "złość" i "szczęście" są znacznie bardziej podatne na ataki niż "strach", "smutek" i "neutralność".
3.	Praktyczna interpretacja - nawet niewielkie perturbacje rzędu ε=0.01 (odpowiadające nieznacznym zmianom pikseli, często niewidocznym dla ludzkiego oka) powodują spadek dokładności o około 26%, co wskazuje na istotną podatność modelu ResNet-18 na ataki adwersalne.
4.	Implikacje dla zastosowań - model wymaga dodatkowych mechanizmów obrony przed atakami adwersalnymi, szczególnie w zastosowaniach krytycznych lub związanych z bezpieczeństwem, gdzie rozpoznawanie emocji może być kluczowym elementem systemu.
Przedstawione wyniki potwierdzają znaną w dziedzinie uczenia maszynowego podatność sieci neuronowych na ataki adwersalne typu FGSM, nawet przy zastosowaniu zaawansowanej architektury takiej jak ResNet-18.
 