In [None]:
import random
import matplotlib.pyplot as plt
import os
from tqdm import tqdm
import pickle
import pandas as pd
import numpy as np
import soundfile as sf
import torch.nn.functional as F
import torch.nn as nn
import torch
import torch.optim as optim
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score
from torch.utils.data import Dataset
import torchaudio
import torchvision
from torchaudio import transforms
from efficientnet_pytorch import EfficientNet
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score

In [None]:
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed(42)
torch.backends.cudnn.deterministic = True

In [None]:
train_fld = './data/audio_train/train/'
train_fname = 'train.csv'
test_fld = './data/audio_test/test/'
test_fname = 'sample_submission.csv'


In [None]:
train_frame = pd.read_csv(os.path.join('./data',train_fname))
test_frame = pd.read_csv(os.path.join('./data',test_fname))


In [None]:
noc = train_frame['label'].nunique()
label_to_id = {cl : id for id, cl in enumerate(train_frame['label'].unique())}
train_frame['id'] = train_frame['label'].map(label_to_id)

In [None]:
# https://github.com/lukemelas/EfficientNet-PyTorch
class BaseLineModel(nn.Module):
    def __init__(self, sample_rate = 16000, n_classes = 41):
        super().__init__()
        self.ms = torchaudio.transforms.MelSpectrogram(sample_rate,n_mels = 64)

        self.cnn1 = nn.Conv2d(in_channels = 1, out_channels = 20, kernel_size = 3, padding =1)
        self.cnn2 = nn.Conv2d(in_channels = 20, out_channels = 40, kernel_size = 3, padding =1)
        self.cnn3 = nn.Conv2d(in_channels = 40, out_channels = 20, kernel_size = 3, padding = 1)
        self.cnn4 = nn.Conv2d(in_channels = 20, out_channels = 3, kernel_size = 3, padding = 1)


        self.features = EfficientNet.from_pretrained('efficientnet-b0')

        self.lin1 = nn.Linear(1000,333)

        self.lin2 = nn.Linear(333,111)

        self.lin4 = nn.Linear(111,noc)

    def forward(self, x):
        x = self.ms(x)

        x = F.relu(self.cnn1(x))
        x = F.relu(self.cnn2(x))
        x = F.relu(self.cnn3(x))
        x = F.relu(self.cnn4(x))

        x = self.features(x)

        x = x.view(x.shape[0], -1)
        x = F.relu(x)

        x = F.relu(self.lin1(x))
        x = F.relu(self.lin2(x))
        x = self.lin4(x)
        return x

In [None]:
def sample_or_pad(wave, wav_len = 32000):
    m, n = wave.shape
    if n < wav_len:
        padded_wav = torch.zeros(1,wav_len)
        padded_wav[:, :n] = wave
        return padded_wav
    elif n > wav_len:
        offset = np.random.randint(0, n-wav_len)
        sampled_wav = wave[:, offset:offset+wav_len]
        return sampled_wav
    else:
        return wave

class EventDetectionDataset(Dataset):
    def __init__(self,data_path,x,y = None):
        self.x = x
        self.y = y
        self.data_path = data_path

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        path2wav = os.path.join(self.data_path, self.x[idx])
        wave, sample_rate = torchaudio.load(path2wav, normalize = True)
        wave = sample_or_pad(wave)

        if self.y is not None:
            return wave, self.y[idx]
        return wave


In [None]:
X_t, X_v, y_t, y_v = train_test_split(train_frame['fname'].values, train_frame['id'].values,
                                      test_size = 0.2, random_state = 42, stratify = train_frame['id'])
train_loader = DataLoader(EventDetectionDataset(os.path.join(train_fld), X_t, y_t),
                          batch_size = 41)
val_loader = DataLoader(EventDetectionDataset(os.path.join(train_fld), X_v, y_v),
                          batch_size = 41)
test_loader = DataLoader(EventDetectionDataset(os.path.join(test_fld), test_frame['fname'].values, None),
                          batch_size = 41, shuffle = False)

In [None]:
def eval_model(model, eval_dset):
    model.eval()
    forecast, true_labs = [], []
    with torch.no_grad():
        for wavs, labs in tqdm(eval_dset):
            wavs,labs = wavs, labs.detach().numpy()
            true_labs.append(labs)
            outputs = model.forward(wavs)

            outputs = outputs.detach().cpu().numpy().argmax(axis = 1)
            forecast.append(outputs)
    forecast = [x for sublist in forecast for x in sublist]
    true_labs = [x for sublist in true_labs for x in sublist]
    return f1_score(forecast, true_labs, average = 'macro')

In [None]:
criterion = nn.CrossEntropyLoss()
model = BaseLineModel()
lr = 1e-3
optimizer = torch.optim.Adam(model.parameters(), lr = lr)

In [None]:
n_epoch = 50
best_f1 = 0
f1s = []
f1s_train = []
for epoch in range(n_epoch):
    model.train()
    for wavs, labs in tqdm(train_loader):
        optimizer.zero_grad()
        outputs = model(wavs)
        loss = criterion(outputs,labs)
        loss.backward()
        optimizer.step()
    f1 = eval_model(model, val_loader)
    f1_train = eval_model(model, train_loader)
    f1s.append(f1)
    f1s_train.append(f1_train)
    print('epoch: {}, f1_val: {}, f1_train: {}'.format(epoch, f1, f1_train))
    if f1 > best_f1:
        best_f1 = f1
        torch.save(model.state_dict(),'model.pt')
    lr *= 0.95
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

In [None]:
model = BaseLineModel()
model.load_state_dict(torch.load(os.path.join('model.pt')))
model.eval()
forecast = []
with torch.no_grad():
    for wavs in tqdm(test_loader):
        outputs = model.forward(wavs)
        outputs = outputs.detach().numpy().argmax(axis = 1)
        forecast.append(outputs)
forecast = [x for sublist in forecast for x in sublist]
decoder = {label_to_id[i] : i for i in label_to_id}
forecast = pd.Series(forecast).map(decoder)
test_frame['label'] = forecast
test_frame.to_csv('sample_submission.csv', index = None)