# Imports

In [None]:
import torch
import torch.nn as nn               
import torch.nn.functional as F       
import torch.optim as optim            
from torch.utils.data import Dataset, DataLoader 

#
import torchaudio
import torchaudio.transforms as T   
import numpy as np                     
import pandas as pd                   

import matplotlib.pyplot as plt

import os
from collections import OrderedDict, defaultdict

device = "cuda" if torch.cuda.is_available() else "cpu"


# BASE MODEL ARCHITECTURE - test


In [None]:
import torch
import torch.nn as nn

class CNNClassifier(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        
        self.conv = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),

            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )

        # Dynamically determine the correct input size for the Linear layer
        self.flattened_size = self._get_flattened_size()
        
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(self.flattened_size, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def _get_flattened_size(self):
        with torch.no_grad():
            dummy = torch.zeros(1, 1, 128, 801) # Same shape as input chunks
            out = self.conv(dummy)
            return out.view(1, -1).shape[1]

    def forward(self, x):
        x = self.conv(x)
        x = self.fc(x)
        return x

# HYPERPARAMETERS

In [None]:
EPOCHS = 25
INIT_LR = 1e-3
WEIGHT_DECAY = 1e-2
BATCH_SIZE = 512
LOSS = torch.nn.CrossEntropyLoss().to(device)



#model = CNNClassifier(num_classes=206)
#optimizer = optim.AdamW(model.parameters(), lr=INIT_LR, weight_decay=1e-2)

# Dataset Creation

In [None]:
import json
class SpectogramDataset(Dataset):
    def __init__(self, audio_dir:str, label_to_idx:dict, max_cache_size: int = 5):
        self.label_to_idx = label_to_idx
        self.audio_dir = audio_dir
        self.chunk_index_pairs = []
        self.cache = OrderedDict()
        self.max_cache_size = max_cache_size

        with open('./dataset_init.json', 'r') as file:
            data = json.load(file)

        for path in self.audio_dir:
            label = os.path.basename(path).replace(".pt", "")
            amount_of_chunks = data[label]
            for n in range(amount_of_chunks):
                self.chunk_index_pairs.append((path, label.split('_')[0], n))

    def load_cached_tensor(self, file_path):
        if file_path in self.cache:
            self.cache.move_to_end(file_path)
        else:
            tensor = torch.load(file_path)
            self.cache[file_path] = tensor
            if len(self.cache) > self.max_cache_size:
                self.cache.popitem(last=False)
        return self.cache[file_path]


    def __len__(self):
        return len(self.chunk_index_pairs)
    
    def __getitem__(self, idx:int):
        file_path, label, chunk_index = self.chunk_index_pairs[idx]
        tensor = self.load_cached_tensor(file_path)
        chunk = tensor[chunk_index].to(torch.float32)
        
        label_index = self.label_to_idx[label]
        
        #if chunk.size(0) == 1:
        #    chunk = chunk.repeat(3,1,1)
        #chunk = F.interpolate(chunk.unsqueeze(0), size=(224, 224), mode='bilinear', align_corners=False).squeeze(0)
        return chunk, label_index

In [None]:
import pandas as pd
metadata = pd.read_csv("./data/processed_data.csv")
unique_labels = sorted(metadata["primary_label"].astype(str).unique())
label_to_index = {label: idx for idx,label in enumerate(unique_labels)}
index_to_label = {idx: label for idx,label in enumerate(unique_labels)}
print(list(unique_labels)[:10])
#kann auch numerisch sorten theoretisch

# OLD-TEST Überarbeitetes  ChunkDataset von ino (path issue)

In [None]:
import os
import torch
from torch.utils.data import Dataset

class inoChunkedSpectrogramDataset(Dataset):
    def __init__(self, file_list:list, label_to_idx: dict):
        self.file_list = file_list
        self.label_to_idx = label_to_idx
        self.index_list = []

       
        for tensor_path in self.file_list:
            label = os.path.basename(tensor_path).replace(".pt", "")
            try:
                tensor = torch.load(tensor_path, map_location="cpu")
                n_chunks = tensor.shape[0]
                for i in range(n_chunks):
                    self.index_list.append((tensor_path, label, i))
            except Exception as e:
                print(f"Fehler beim Laden von {tensor_path}: {e}")

    def __len__(self):
        return len(self.index_list)

    def __getitem__(self, idx):
        tensor_path, label_name, chunk_idx = self.index_list[idx]
        try:
            all_chunks = torch.load(tensor_path, map_location="cpu")
            chunk = all_chunks[chunk_idx]
        except Exception as e:
            print(f"Fehler beim Laden von Chunk {chunk_idx} für Label {label_name}: {e}")
            chunk = torch.zeros((1, 128, 216))
        if label_name in self.label_to_idx:
            label_index = self.label_to_idx[label_name]
        else:
            raise ValueError(f"Unbekanntes Label: {label_name}")

        return chunk, label_index 

# DataLoader for ChunkSpectogramDataset

In [None]:
import pandas as pd
metadata = pd.read_csv("./data/processed_data.csv")
unique_labels = sorted(metadata["primary_label"].astype(str).unique())
label_to_idx = {label: idx for idx,label in enumerate(unique_labels)}
print(len(label_to_idx.keys()))

tensor_dir = "./data/processed_train_audio/"

In [None]:
import torch

# Load the tensor from file
tensor = torch.load('./data/processed_train_audio/21038/iNat65519.pt')
# Print the shape
print(tensor.shape)


In [None]:
import os 
import random 
import os 
import random 

train_files, test_files = [], []
label_to_idx = dict()
for idx, file in enumerate(sorted(os.listdir('./data/processed_train_audio'))):
    path = f'./data/processed_train_audio/{file}'

    if '_train' in file:
        train_files.append(path)
        label_to_idx[file.split('_')[0]] = idx//2

    elif '_test' in file:
        test_files.append(path)



assert not set(train_files).intersection(set(test_files))
assert len(label_to_idx.keys()) == 205
#train_dataset = inoChunkedSpectrogramDataset(train_files, label_to_idx)
#test_dataset = inoChunkedSpectrogramDataset(test_files, label_to_idx)

train_dataset = SpectogramDataset(train_files, label_to_idx)
test_dataset = SpectogramDataset(test_files, label_to_idx)

# DATALOADER

In [None]:
train_loader = DataLoader(train_dataset,batch_size=BATCH_SIZE, shuffle=True, pin_memory=True, num_workers = 3, persistent_workers=True)

In [None]:
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True, pin_memory=True, num_workers = 3, persistent_workers=True)

# Train Loop inoChunkedSpectogramDataset

In [None]:
import torch

import tqdm

loss_fn = LOSS.to(device)

model = model.to(device)

for epoch in range(EPOCHS):

    model.train()
    train_loss = 0
    for batch_idx, (data, target) in tqdm.tqdm(enumerate(train_loader)):
        
        data, target = data.to(device), target.to(device)

        optimizer.zero_grad()
        output = model(data)  # shape: [batch, num_classes]
        loss = loss_fn(output, target)  
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

    train_loss /= len(train_loader)

    model.eval()
    test_loss = 0
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in tqdm.tqdm(test_loader):
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = loss_fn(output, target)
            test_loss += loss.item()

            pred = torch.argmax(output, dim=1)
            correct += (pred == target).sum().item()
            total += data.size(0)

    test_loss /= len(test_loader)
    accuracy = 100. * correct / total

    print(f"\nEpoch: {epoch}, Train loss: {train_loss:.4f}, Test loss: {test_loss:.4f}, Accuracy: {correct}/{total} ({accuracy:.0f}%)")

In [None]:
len(os.listdir('./data/processed_train_audio'))

# ResNet

In [None]:
resnet_model = torch.hub.load('pytorch/vision:v0.10.0', 'resnet18', pretrained=True)

for name, layer in resnet_model.named_parameters():
    if not ('fc' in name or 'layer4' in name or 'layer3' in name):
        layer.requires_grad = False

resnet_model.fc = nn.Sequential(
    nn.Linear(512, 512),
    nn.ReLU(),
    nn.Dropout(0.2),
    nn.Linear(512, 205)
)


import torch.nn.functional as F
import tqdm
from sklearn.metrics import f1_score

loss_fn = nn.BCEWithLogitsLoss().to(device)
RESNET_EPOCHS = 5

train_loader = DataLoader(train_dataset,batch_size=256, shuffle=True, pin_memory=True, num_workers = 2, persistent_workers=False)
test_loader = DataLoader(test_dataset, batch_size=256, pin_memory=True, num_workers = 2, persistent_workers=False)
resnet_model = resnet_model.to(device)
optimizer = torch.optim.AdamW(resnet_model.parameters(), weight_decay=WEIGHT_DECAY, lr=3e-3)

scaler = torch.amp.GradScaler(device)

for epoch in range(RESNET_EPOCHS):
    epoch_loss = 0
    test_loss = 0
    resnet_model.train()
    for idx, (data, label) in tqdm.tqdm(enumerate(train_loader)):
        data, label = data.to(device, non_blocking=True), label.float().to(device, non_blocking=True)
        for label_tens in label:
            assert label_tens.sum(dim=-1) >=1
        optimizer.zero_grad()
        with torch.amp.autocast(device):
            model_output = resnet_model(data)
            loss_value = loss_fn(model_output, label)
        scaler.scale(loss_value).backward()
        scaler.step(optimizer)
        scaler.update()
        epoch_loss += loss_value.item()

    print(f"Loss of {epoch} epoch: {epoch_loss/len(train_loader)}")

    correct_preds = 0
    total_samples = 0
    resnet_model.eval()
    all_preds, all_labels = [], []
    with torch.no_grad():
        for idx, (data, label) in tqdm.tqdm(enumerate(test_loader)):
            data, label = data.to(device, non_blocking=True), label.float().to(device, non_blocking=True)
            test_output = resnet_model(data)
            loss_value = loss_fn(test_output, label)
            test_loss += loss_value.item()
            prediction = (torch.sigmoid(test_output) > 0.5).float()
            if epoch == 0 and idx == 0:
                print("Raw logits:", test_output[0][:10].cpu())
                print("Sigmoid:", torch.sigmoid(test_output[0])[:10].cpu())
                print("Prediction:", prediction[0][:10])
                print("Label:     ", label[0][:10])
                print("Prediction unique:", prediction.unique())
                print("Label unique:", label.unique())
            all_preds.append(prediction.cpu())
            all_labels.append(label.cpu())
        
        all_preds = torch.cat(all_preds).numpy()
        all_labels = torch.cat(all_labels).numpy()
        if epoch == 0:
            for thresh in [0.1, 0.2, 0.3, 0.4, 0.5]:
                preds = (torch.sigmoid(torch.tensor(all_preds)) > thresh).numpy()
                f1 = f1_score(all_labels, preds, average='samples')
                print(f"Threshold {thresh:.1f} → F1: {f1:.4f}")
        f1 = f1_score(all_labels, all_preds, average='samples')
        print(f'Accuracy of {epoch}th epoch: {f1:.4f}')
        print(f'Test-Loss of {epoch}th epoch: {test_loss/len(test_loader)}')

        
torch.save(resnet_model.state_dict(), './resnet_model.pth')



IndentationError: expected an indented block after 'if' statement on line 71 (26651095.py, line 72)

# Map von id to name für die Visualisierung

In [None]:
import pandas as pd

taxonomy_df = pd.read_csv("./data/taxonomy.csv")

id_to_name = {}
for index, row in taxonomy_df.iterrows():
    primary_label = row["primary_label"]
    common_name = row["common_name"] 
    id_to_name[primary_label] = common_name

#für die visualisierung später 

print(list(id_to_name.items())[:5])