In [89]:
import sys
import os
import json
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torchaudio.transforms as T
import torchaudio
from torch.utils.data import Dataset, DataLoader

current_directory = os.getcwd()
data_folder_path = os.path.join(current_directory, 'data')
datasets_path_file = os.path.join(data_folder_path, 'datasets_path.json')

In [91]:
with open(datasets_path_file, 'r', encoding='utf-8') as file:
    datasets_path = json.load(file)

path_to_UrbanSound8K = datasets_path.get("UrbanSound8K", None)

In [93]:
def collate_fn(batch):
    """ Function to bring audio files to the same length """
    waveforms, labels = zip(*batch)

    max_length = max([w.shape[1] for w in waveforms])

    # add zeros or cut off
    padded_waveforms = []
    for w in waveforms:
        if w.shape[1] < max_length:
            pad = torch.zeros((1, max_length - w.shape[1]))
            padded_waveforms.append(torch.cat((w, pad), dim=1))
        else:
            padded_waveforms.append(w[:, :max_length])

    return torch.stack(padded_waveforms), torch.tensor(labels)

In [95]:
class UrbanSound8KDataset(Dataset):
    def __init__(self, data_dir, metadata_file, folds, target_sample_rate=22050, target_length=22050):
        self.data_dir = data_dir
        self.target_sample_rate = target_sample_rate
        self.target_length = target_length
        metadata = pd.read_csv(metadata_file)
        self.metadata = metadata[metadata['fold'].isin(folds)].reset_index(drop=True)

    def __len__(self):
        return len(self.metadata)

    def __getitem__(self, idx):
        row = self.metadata.iloc[idx]
        file_path = os.path.join(self.data_dir, f"fold{row['fold']}", row['slice_file_name'])
        label = torch.tensor(row['classID'], dtype=torch.long)

        # upload the audio file
        waveform, sample_rate = torchaudio.load(file_path)

        # convert to mono (if stereo)
        if waveform.shape[0] > 1:
            waveform = waveform.mean(dim=0, keepdim=True)

        return waveform, label

In [97]:
data_dir = os.path.join(path_to_UrbanSound8K, 'audio')
metadata_file = os.path.join(path_to_UrbanSound8K, 'metadata//UrbanSound8K.csv')

train_folds = list(range(1, 10))
test_folds = [10] 

train_dataset = UrbanSound8KDataset(data_dir, metadata_file, train_folds)
test_dataset = UrbanSound8KDataset(data_dir, metadata_file, test_folds)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, collate_fn=collate_fn)

for batch in train_loader:
    waveform, label = batch 
    print("Waveform shape:", waveform.shape)
    print("Labels:", label)
    break

Waveform shape: torch.Size([8, 1, 192000])
Labels: tensor([7, 3, 8, 7, 3, 0, 2, 0])


In [111]:
class MelSpectrogramClassifier(nn.Module):
    def __init__(self, sample_rate=16000, n_mels=64, num_classes=10):
        super(MelSpectrogramClassifier, self).__init__()

        # audio → Mel Spectrogram
        self.mel_spec = T.MelSpectrogram(sample_rate=sample_rate, n_mels=n_mels)

        # linear classifier
        self.fc = nn.Linear(n_mels * 375, num_classes)  # 375 — фиксированное количество временных шагов

    def forward(self, x):
        # audio → Mel Spectrogram
        x = self.mel_spec(x)  # (batch, 1, n_mels, time)

        # remove 1-channel size
        x = x.squeeze(1)  # (batch, n_mels, time)

        # trim/fall to a fixed size (375 time-steps)
        if x.shape[-1] > 375:
            x = x[..., :375]
        else:
            pad_size = 375 - x.shape[-1]
            x = torch.nn.functional.pad(x, (0, pad_size))

        # expand into a vector
        x = x.reshape(x.size(0), -1)  # (batch, features)

        # classification
        x = self.fc(x)  # (batch, num_classes)
        return x

# checking the model
model = MelSpectrogramClassifier()
dummy_waveform = torch.randn(8, 1, 192000)  # Твои данные
output = model(dummy_waveform)

print("Output shape:", output.shape)  # Ожидаемый размер (8, 10)

Output shape: torch.Size([8, 10])


In [113]:
import torch.nn.functional as F

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MelSpectrogramClassifier().to(device)

# optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

In [115]:
def train_model(model, train_loader, val_loader, optimizer, criterion, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        train_loss, correct, total = 0, 0, 0
        
        for waveforms, labels in train_loader:
            waveforms, labels = waveforms.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(waveforms)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = outputs.max(1)
            correct += predicted.eq(labels).sum().item()
            total += labels.size(0)

        train_acc = 100. * correct / total
        val_acc, val_loss = evaluate_model(model, val_loader, criterion)
        print(f"Epoch {epoch+1}/{num_epochs}: Train Loss: {train_loss/len(train_loader):.4f}, Train Acc: {train_acc:.2f}%, Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")

def evaluate_model(model, loader, criterion):
    model.eval()
    loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for waveforms, labels in loader:
            waveforms, labels = waveforms.to(device), labels.to(device)
            outputs = model(waveforms)
            loss += criterion(outputs, labels).item()
            _, predicted = outputs.max(1)
            correct += predicted.eq(labels).sum().item()
            total += labels.size(0)

    acc = 100. * correct / total
    return acc, loss / len(loader)

In [117]:
train_model(model, train_loader, test_loader, optimizer, criterion, num_epochs=10)

Epoch 1/10: Train Loss: 149.9836, Train Acc: 31.20%, Val Loss: 209.5145, Val Acc: 28.55%
Epoch 2/10: Train Loss: 95.2716, Train Acc: 40.00%, Val Loss: 286.5601, Val Acc: 33.81%
Epoch 3/10: Train Loss: 59.6150, Train Acc: 47.14%, Val Loss: 328.2712, Val Acc: 30.35%
Epoch 4/10: Train Loss: 67.0583, Train Acc: 48.03%, Val Loss: 279.9898, Val Acc: 30.47%
Epoch 5/10: Train Loss: 36.1193, Train Acc: 53.98%, Val Loss: 218.5443, Val Acc: 34.41%
Epoch 6/10: Train Loss: 46.1874, Train Acc: 54.15%, Val Loss: 305.8724, Val Acc: 35.72%
Epoch 7/10: Train Loss: 38.7649, Train Acc: 56.12%, Val Loss: 278.4018, Val Acc: 29.15%
Epoch 8/10: Train Loss: 31.4592, Train Acc: 58.87%, Val Loss: 314.4935, Val Acc: 29.63%
Epoch 9/10: Train Loss: 32.4565, Train Acc: 59.53%, Val Loss: 311.5493, Val Acc: 34.17%
Epoch 10/10: Train Loss: 38.6307, Train Acc: 58.71%, Val Loss: 303.4810, Val Acc: 32.50%


In [118]:
test_acc, test_loss = evaluate_model(model, test_loader, criterion)
print(f"Final Test Accuracy: {test_acc:.2f}%")

Final Test Accuracy: 32.50%
