IMPORT LIBRARIES

In [1]:
import os
import numpy as np
import librosa
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import torch.nn as nn
import torch.nn.functional as F
from torch import optim
from torch.optim import Adam
from sklearn.metrics import accuracy_score
from torchvision import models

LOAD LABELS

In [2]:
train_label_path = r'C:\Users\ramis\OneDrive - nyu.edu\Desktop\New ML Project\train_label.txt'
with open(train_label_path, "r") as f:
    train_labels = f.readlines()
    train_labels = [int(label.strip()) for label in train_labels]
    train_labels = np.array(train_labels)

FUNCTION TO PROCESS MEL SPEC

In [32]:
def mel_filter_banks(path):
    # Load audio file
    y, sr = librosa.load(path, sr=16000)

    # Define frame parameters
    frame_size, frame_stride = 0.025, 0.01
    frame_length, frame_step = int(round(sr * frame_size)), int(round(sr * frame_stride))
    signal_length = 3 * sr  # Extract 0-3 seconds part
    frame_num = int(np.ceil((signal_length - frame_length) / frame_step)) + 1
    pad_frame = (frame_num - 1) * frame_step + frame_length - signal_length
    pad_y = np.append(y, np.zeros(pad_frame))
    signal_len = signal_length + pad_frame

    # Frame splitting
    indices = np.tile(np.arange(0, frame_length), (frame_num, 1)) + np.tile(np.arange(0, frame_num * frame_step, frame_step), (frame_length, 1)).T
    frames = pad_y[indices]
    frames *= np.hamming(frame_length)

    # FFT and power spectra
    NFFT = 1024
    mag_frames = np.absolute(np.fft.rfft(frames, NFFT))
    pow_frames = mag_frames ** 2 / NFFT

    # Define Mel filter bank parameters
    mel_N = 128
    mel_low, mel_high = 0, (2595 * np.log10(1 + (sr / 2) / 700))
    mel_freq = np.linspace(mel_low, mel_high, mel_N + 2)
    hz_freq = (700 * (10 ** (mel_freq / 2595) - 1))
    bins = np.floor((NFFT + 1) * hz_freq / sr)

    # Construct Mel filter bank
    fbank = np.zeros((mel_N, int(NFFT / 2 + 1)))
    for m in range(1, mel_N + 1):
        f_m_minus = int(bins[m - 1])  # left
        f_m = int(bins[m])  # center
        f_m_plus = int(bins[m + 1])  # right

        for k in range(f_m_minus, f_m):
            fbank[m - 1, k] = (k - bins[m - 1]) / (bins[m] - bins[m - 1])
        for k in range(f_m, f_m_plus):
            fbank[m - 1, k] = (bins[m + 1] - k) / (bins[m + 1] - bins[m])

    # Apply Mel filter bank
    filter_banks = np.matmul(pow_frames, fbank.T)
    filter_banks = np.where(filter_banks == 0, np.finfo(float).eps, filter_banks)  # Avoid log of zero
    filter_banks = 20 * np.log10(filter_banks)
    filter_banks = (filter_banks - np.mean(filter_banks)) / np.std(filter_banks)

    return filter_banks

FUNCTION TO ADD TIME SHIFT

In [4]:
def time_shift(audio, shift_limit):
    shift_amt = int(np.random.uniform(-shift_limit, shift_limit) * len(audio))
    return np.roll(audio, shift_amt)


Train data Loaded Successfully


FUNCTION TO LOAD FILES

In [None]:
def load_audio_files(file_paths, augment=False):
    melspectrograms = []
    for file_path in file_paths:
        y, sr = librosa.load(file_path)
        if augment:
            y = time_shift(y, 0.05)
        melspectrogram = mel_filter_banks(file_path)
        melspectrograms.append(melspectrogram)
    return melspectrograms

train_file_paths = [f'C:\\Users\\ramis\\OneDrive - nyu.edu\\Desktop\\New ML Project\\train_output\\{i}\\vocals.wav' for i in range(11886)]
train_mel_spectrograms = load_audio_files(train_file_paths)
train_audios = np.array(train_mel_spectrograms)
print("Train data loaded successfully.")


In [5]:
class AudioDataset(Dataset):
    def __init__(self, audios, labels):
        self.audios = audios
        self.labels = labels

    def __len__(self):
        return len(self.audios)

    def __getitem__(self, idx):
        audio = self.audios[idx]
        audio = np.expand_dims(audio, axis=0)
        audio = torch.tensor(audio, dtype=torch.float32)
        label = self.labels[idx]
        label = torch.tensor(label, dtype=torch.long)
        return audio, label

MODEL

In [6]:
import torch
import torch.nn as nn

class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.downsample = downsample

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)
        return out

class ResNet(nn.Module):
    def __init__(self, block, layers, num_classes=4):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0])
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, num_classes)

    def _make_layer(self, block, out_channels, blocks, stride=1):
        downsample = None
        if stride != 1 or self.in_channels != out_channels * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, out_channels * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * block.expansion),
            )

        layers = []
        layers.append(block(self.in_channels, out_channels, stride, downsample))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

# Example instantiation
model = ResNet(BasicBlock, [2, 2, 2, 2], num_classes=4)


In [7]:
from torch.optim.lr_scheduler import ReduceLROnPlateau

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0004)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=5, verbose=True)
def train_model(model, train_loader, val_loader, epochs=32, lr=0.0001):
    for epoch in range(epochs):
        model.train()
        train_correct = 0
        train_total = 0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            _, predicted = torch.max(outputs.data, 1)
            train_total += labels.size(0)
            train_correct += (predicted == labels).sum().item()

        train_accuracy = 100 * train_correct / train_total


        model.eval()
        val_correct = 0
        val_total = 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

        val_accuracy = 100 * val_correct / val_total
        print(f'Epoch {epoch+1}: Train Acc: {train_accuracy:.2f}%, Val Acc: {val_accuracy:.2f}%')
        scheduler.step(val_accuracy)



In [26]:
train_mel_spectrograms, val_mel_spectrograms, train_labels, val_labels = train_test_split(
    train_mel_spectrograms, train_labels, test_size=0.2, random_state=42)

# Create dataset instances for training and validation sets
train_dataset = AudioDataset(train_mel_spectrograms, train_labels)
val_dataset = AudioDataset(val_mel_spectrograms, val_labels)

# Create data loaders for batching
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)

print("Data loaders are set up and ready.")

In [27]:
train_model(model, train_loader, val_loader, epochs=40, lr=0.0001)

Epoch 1: Train Acc: 99.33%, Val Acc: 99.11%
Epoch 2: Train Acc: 99.38%, Val Acc: 98.90%
Epoch 3: Train Acc: 99.34%, Val Acc: 98.84%
Epoch 4: Train Acc: 99.38%, Val Acc: 98.90%
Epoch 5: Train Acc: 99.43%, Val Acc: 98.84%
Epoch 6: Train Acc: 99.46%, Val Acc: 98.84%
Epoch 7: Train Acc: 99.37%, Val Acc: 98.74%
Epoch 8: Train Acc: 99.43%, Val Acc: 98.84%
Epoch 9: Train Acc: 99.42%, Val Acc: 98.84%
Epoch 10: Train Acc: 99.49%, Val Acc: 98.79%
Epoch 11: Train Acc: 99.57%, Val Acc: 98.79%
Epoch 12: Train Acc: 99.50%, Val Acc: 98.74%
Epoch 13: Train Acc: 99.41%, Val Acc: 98.79%
Epoch 14: Train Acc: 99.50%, Val Acc: 98.79%
Epoch 15: Train Acc: 99.42%, Val Acc: 98.79%
Epoch 16: Train Acc: 99.34%, Val Acc: 98.74%
Epoch 17: Train Acc: 99.50%, Val Acc: 98.79%
Epoch 18: Train Acc: 99.43%, Val Acc: 98.74%
Epoch 19: Train Acc: 99.47%, Val Acc: 98.79%
Epoch 20: Train Acc: 99.45%, Val Acc: 98.84%
Epoch 21: Train Acc: 99.47%, Val Acc: 98.79%
Epoch 22: Train Acc: 99.45%, Val Acc: 98.74%
Epoch 23: Train Acc

In [31]:
#save the whole model
torch.save(model, 'a1final_model.pth')

LOAD TEST AND SAVE PREDICTIONS

In [33]:
test_file_paths = [f'C:\\Users\\ramis\\OneDrive - nyu.edu\\Desktop\\New ML Project\\test_output\\{i}\\vocals.wav' for i in range(2447)]
test_mel_spectrograms = load_audio_files(test_file_paths,augment=True)
test_audios = np.array(test_audios)
test_dataset = AudioDataset(test_audios, np.zeros(len(test_audios)))
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

In [34]:
def predict_and_save_csv(model, test_loader, output_file='final_submission6_newmodel_spleeter.csv'):
    model.eval()
    predictions = []
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    with torch.no_grad():
        for inputs, _ in test_loader:
            inputs = inputs.to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            predictions.extend(predicted.cpu().numpy())

    with open(output_file, 'w') as f:
        f.write('id,category\n')
        for i, label in enumerate(predictions):
            f.write(f'{i},{label}\n')

predict_and_save_csv(model, test_loader)
torch.save(model, "trained_model.pth")
