In [14]:
import torch
import os
import librosa
import numpy as np
import torchaudio
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models import resnet18 
import random

In [6]:
conda install pytorch torchvision torchaudio -c pytorch -c=conda-forge -c apple

Channels:
 - pytorch
 - conda-forge
 - apple
 - defaults
Platform: osx-arm64
Collecting package metadata (repodata.json): done
Solving environment: done

## Package Plan ##

  environment location: /Users/karolina/anaconda3

  added / updated specs:
    - pytorch
    - torchaudio
    - torchvision


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    certifi-2024.2.2           |     pyhd8ed1ab_0         157 KB  conda-forge
    conda-24.4.0               |  py311h267d04e_0         1.2 MB  conda-forge
    libexpat-2.6.2             |       hebf3989_0          62 KB  conda-forge
    libsqlite-3.45.3           |       h091b4b1_0         805 KB  conda-forge
    libzlib-1.2.13             |       h53f4e23_5          47 KB  conda-forge
    openssl-3.3.0              |       h0d3ecfb_0         2.8 MB  conda-forge
    python-3.11.8              |hdf0ec26_0_cpython        13.9 MB  conda-forge
    pyth

In [None]:
pip install coremltools

In [13]:
torch.__version__

torch.tensor([1,2,3], device="mps")

tensor([1, 2, 3], device='mps:0')

In [3]:
if torch.backends.mps.is_available():
    mps_device= torch.device("mps")
    x= torch.ones(1, device= mps_device)
    print(x)
else:
    print("MPS not found")

tensor([1.], device='mps:0')


  nonzero_finite_vals = torch.masked_select(


In [5]:
os.environ["TORCH_METAL_LAUNCH_TIME_DEFAULT"] = "1"

if torch.cuda.is_available():
    device = torch.device('cuda')
    print("CUDA is available. Using GPU:", torch.cuda.get_device_name(0))
else:
    device = torch.device('mps')
    print("CUDA is not available. Falling back to MPS.")

x = torch.ones(1, device=device)
print("Tensor device:", x.device)

CUDA is not available. Falling back to MPS.
Tensor device: mps:0


In [4]:
class AudioDataset(Dataset):
    def __init__(self, directory):
        self.file_paths, self.labels = self.load_data(directory)
        self.melspectrograms = []
        for file_path in tqdm(self.file_paths, desc="Processing audio files"):
            melspectrogram = self.load_and_process_audio(file_path)
            self.melspectrograms.append(melspectrogram)

    def load_data(self, directory):
        file_paths = [
            os.path.join(directory, f)
            for f in os.listdir(directory)
            if f.endswith(".mp3")
        ]
        file_paths.sort(key=lambda f: int(os.path.splitext(os.path.basename(f))[0]))
        with open("train_labels.npy", "rb") as f:
            labels = np.load(f, allow_pickle=True)
            labels = [int(label) for label in labels] 
            labels = np.array(labels)       
        return file_paths, labels

    def load_and_process_audio(self, file_path):
        audio, sample_rate = librosa.load(file_path, sr=None, mono=True)
        audio_tensor = torch.from_numpy(audio).float().unsqueeze(0)

        mel_transform = torchaudio.transforms.MelSpectrogram(
            sample_rate=sample_rate,
            n_fft=2048,
            win_length=1024,
            hop_length=512,
            n_mels=128,
        )
        melspectrogram = mel_transform(audio_tensor)
        melspectrogram_db = torchaudio.transforms.AmplitudeToDB()(melspectrogram)
        return melspectrogram_db

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        melspectrogram_db = self.melspectrograms[idx]
        label = self.labels[idx]
        label = torch.tensor(label, dtype=torch.long)
        return melspectrogram_db, label

In [5]:
def create_dataloader(dataset, batch_size=256, shuffle=True, split_ratio=0.8):

    if split_ratio == 1:
        # Create dataloader for the entire dataset
        dataloader = DataLoader(
            dataset, batch_size=batch_size, shuffle=shuffle
        )
        return dataloader, None
    
    train_size = int(split_ratio * len(dataset))
    test_size = len(dataset) - train_size
    
    train_dataset, test_dataset = torch.utils.data.random_split(
        dataset, [train_size, test_size]
    )
    train_dataloader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=shuffle
    )
    test_dataloader = DataLoader(
        test_dataset, batch_size=batch_size, shuffle=False
    )
    return train_dataloader, test_dataloader

In [33]:
class Res(nn.Module):
    def __init__(self, num_classes):
        super(Res, self).__init__()
        self.resnet = resnet18(pretrained=False)
        self.resnet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.resnet.fc = nn.Sequential(
            nn.Linear(512, 256), 
            nn.ReLU(inplace=True), 
            nn.Linear(256, num_classes)  
        )

    def forward(self, x):
        return self.resnet(x)

def train(train_loader, val_loader, lr=0.001, num_epochs=10, transform=None, save_path=None):
    device = torch.device("cuda" if torch.cuda.is_available() else "mps")
    model = Res(num_classes=4).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=0.001)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=5)
    
    train_loss_list = []
    train_accuracy_list = []
    val_loss_list = []
    val_accuracy_list = []

    best_val_accuracy = 0.0
    epochs_without_improvement = 0

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        correct_train = 0
        total_train = 0
        for images, labels in tqdm(train_loader):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()

        train_accuracy = correct_train / total_train
        train_loss /= len(train_loader)
        train_loss_list.append(train_loss)
        train_accuracy_list.append(train_accuracy)
        
        model.eval()
        val_loss = 0.0
        correct_val = 0
        total_val = 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total_val += labels.size(0)
                correct_val += (predicted == labels).sum().item()

            val_accuracy = correct_val / total_val
            val_loss /= len(val_loader)
            val_loss_list.append(val_loss)
            val_accuracy_list.append(val_accuracy)

            # Check if current model has best validation accuracy
            if val_accuracy > best_val_accuracy:
                best_val_accuracy = val_accuracy
                if save_path is not None:
                    torch.save(model.state_dict(), save_path)
                epochs_without_improvement = 0
            else:
                epochs_without_improvement += 1
                # Randomly decide whether to stop training
                if epochs_without_improvement >= random.randint(5, 10):
                    print("Random stopping triggered.")
                    break  # Stop training

        scheduler.step()

    return model, train_loss_list, train_accuracy_list, val_loss_list, val_accuracy_list

num_classes = 4

In [38]:
label_dataset = "/Users/karolina/Desktop/Machine Learning/competition/train_label.npy"

train_directory = "/Users/karolina/Desktop/Machine Learning/competition/train_mp3s"
dataset = AudioDataset(train_directory)

Loading and processing audio files: 100%|█| 11886/11886 [00:58<00:00, 201.96it/s


In [39]:
from torchaudio.transforms import FrequencyMasking, TimeMasking, TimeStretch
from torchvision.transforms import RandomHorizontalFlip
import random

def apply_transforms(x):
    frequency_mask = torchaudio.transforms.FrequencyMasking(freq_mask_param=100)  # Increase frequency masking
    time_mask = torchaudio.transforms.TimeMasking(time_mask_param=100)  # Increase time masking

    x = frequency_mask(x)
    x = time_mask(x)

    if random.random() < 0.9:
        # noise
        noise = torch.randn_like(x) * 0.5 
        x += noise
    
    if random.random() < 0.8:
        # Increase pitch
        pitch_shift = random.uniform(-10, 10) 
        x = torchaudio.transforms.PitchShift(sample_rate=44100, n_steps=pitch_shift)(x)
    
    if random.random() < 0.7:
        #Stretching
        time_stretch_factor = random.uniform(0.7, 1.3)  # Stretch or compress time
        x = torchaudio.transforms.TimeStretch(sample_rate=44100, fixed_rate=time_stretch_factor)(x)
    
    return x

def audio_transforms(batch):
    results = [apply_transforms(sample) for sample in batch]
    return torch.stack(results)

In [40]:
train_loader, val_loader = create_dataloader(dataset, split_ratio=0.85)
trained_model, train_loss_list, train_accuracy_list, val_loss_list, val_accuracy_list = train(train_loader, val_loader, num_epochs=12, transform=None)

for epoch in range(len(train_loss_list)):
    print(f"Resnet - Epoch {epoch + 1}/{len(train_loss_list)}:")
    print(f"  Train Loss: {train_loss_list[epoch]:.4f}, Accuracy: {train_accuracy_list[epoch]:.4f}")
    print(f"  Validation Loss: {val_loss_list[epoch]:.4f}, Accuracy: {val_accuracy_list[epoch]:.4f}")

100%|███████████████████████████████████████████| 40/40 [08:28<00:00, 12.71s/it]
100%|███████████████████████████████████████████| 40/40 [19:33<00:00, 29.34s/it]
100%|███████████████████████████████████████████| 40/40 [16:48<00:00, 25.22s/it]
100%|███████████████████████████████████████████| 40/40 [08:14<00:00, 12.36s/it]
100%|███████████████████████████████████████████| 40/40 [14:30<00:00, 21.77s/it]
100%|███████████████████████████████████████████| 40/40 [18:49<00:00, 28.23s/it]
100%|███████████████████████████████████████████| 40/40 [07:51<00:00, 11.78s/it]
100%|███████████████████████████████████████████| 40/40 [11:25<00:00, 17.13s/it]
100%|███████████████████████████████████████████| 40/40 [10:12<00:00, 15.31s/it]
100%|███████████████████████████████████████████| 40/40 [21:50<00:00, 32.76s/it]
100%|███████████████████████████████████████████| 40/40 [15:29<00:00, 23.24s/it]
100%|███████████████████████████████████████████| 40/40 [08:16<00:00, 12.41s/it]


Resnet - Epoch 1/12:
  Train Loss: 0.8186, Accuracy: 0.6333
  Validation Loss: 0.8202, Accuracy: 0.6629
Resnet - Epoch 2/12:
  Train Loss: 0.5491, Accuracy: 0.7651
  Validation Loss: 0.7439, Accuracy: 0.6870
Resnet - Epoch 3/12:
  Train Loss: 0.4437, Accuracy: 0.8151
  Validation Loss: 0.5835, Accuracy: 0.7336
Resnet - Epoch 4/12:
  Train Loss: 0.3695, Accuracy: 0.8471
  Validation Loss: 0.4889, Accuracy: 0.8037
Resnet - Epoch 5/12:
  Train Loss: 0.2699, Accuracy: 0.8915
  Validation Loss: 0.3484, Accuracy: 0.8620
Resnet - Epoch 6/12:
  Train Loss: 0.2331, Accuracy: 0.9098
  Validation Loss: 0.3391, Accuracy: 0.8716
Resnet - Epoch 7/12:
  Train Loss: 0.2384, Accuracy: 0.9048
  Validation Loss: 0.4118, Accuracy: 0.8284
Resnet - Epoch 8/12:
  Train Loss: 0.2774, Accuracy: 0.8873
  Validation Loss: 0.4109, Accuracy: 0.8441
Resnet - Epoch 9/12:
  Train Loss: 0.3758, Accuracy: 0.8451
  Validation Loss: 0.4432, Accuracy: 0.8245
Resnet - Epoch 10/12:
  Train Loss: 0.3700, Accuracy: 0.8486
  V

In [44]:
torch.save(trained_model.state_dict(), 'trained_model.pth')

In [45]:
test_directory = "./test_mp3s"
test_dataset = AudioDataset(test_directory)
data_loader, _ = create_dataloader(test_dataset, 64, split_ratio=1, shuffle=False)

Loading and processing audio files: 100%|██| 2447/2447 [00:11<00:00, 217.77it/s]


In [46]:
import csv
def predict_and_save(model_path, test_loader, output_csv):
    # Load the saved model
    model = Res(num_classes)
    model.load_state_dict(torch.load(model_path))
    model.eval() 
    
    predictions = []
    file_names = []
    file_id_counter = 0

    for data, _ in test_loader:
        batch_size = data.size(0)
        file_ids = list(range(file_id_counter, file_id_counter + batch_size))
        output = model(data)
        _, predicted = torch.max(output, 1)
        predictions.extend(predicted.tolist())
        file_names.extend(file_ids)
        file_id_counter += batch_size

    # Write predictions to CSV
    with open(output_csv, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['id', 'category'])
        for file_id, prediction in zip(file_names, predictions):
            writer.writerow([file_id, prediction])

# Usage
output_csv = "resnet4.csv"
predict_and_save('trained_model.pth', data_loader, output_csv)