In [None]:
from pathlib import Path
import pandas as pd
import numpy as np
from tqdm import tqdm
from easydict import EasyDict

import torch
import torchaudio
from torch import nn
from torch.utils.data import Dataset, DataLoader

from sklearn.utils.class_weight import compute_class_weight


device = "cuda:0"
DATA_ROOT = Path("Dataset")

In [None]:
from sklearn.model_selection import train_test_split

train_data = pd.read_csv(DATA_ROOT/"meta_train.csv")
pseudo_train_data = pd.read_csv(DATA_ROOT/"pseudo_train.csv")
X_data, y_data = list(train_data["Filename"]), list(train_data["Label"])

X_train, X_val, y_train, y_val = train_test_split(X_data, y_data, test_size=0.2, shuffle=True, random_state=87)

# Pseudo data
X_train = [f"train/{x}" for x in X_train] + list(pseudo_train_data['Filename'])
y_train = y_train + list(pseudo_train_data['Label'])

print(np.unique(y_train, return_counts=True))
print(np.unique(y_val, return_counts=True))

In [None]:
all_X_train, all_y_train = list(), list()
for x, y in zip(X_train, y_train):
    all_X_train.append(x)
    all_y_train.append(int(y))
    # speed
    all_X_train.append(f"augmented/1.1/{x}")
    all_y_train.append(int(y))
    all_X_train.append(f"augmented/0.9/{x}")
    all_y_train.append(int(y))
    
    # noise
    for i in range(5):
        all_X_train.append(f"augmented/noise/{x}_{i}")
        all_y_train.append(int(y))
    
print(len(all_X_train), len(all_y_train))

all_X_val = [f"train/{x}" for x in X_val]
all_y_val = list(y_val)

In [None]:
class_weight = compute_class_weight(class_weight='balanced', classes=[0,1,2,3,4,5], y=y_train)
print('class weight', class_weight)

In [None]:
cfg = EasyDict({
    'clip_length': 5.0,
    'sample_rate': 16000,
    'hop_length': 160,
    'n_fft': 400,
    'n_mels': 64,
    'f_min': 0,
    'f_max': 8000,
})
cfg.unit_length = int((cfg.clip_length * cfg.sample_rate + cfg.hop_length - 1) // cfg.hop_length)

print(cfg)

In [None]:
to_mel_spectrogram = torchaudio.transforms.MelSpectrogram(
    sample_rate=cfg.sample_rate, n_fft=cfg.n_fft, n_mels=cfg.n_mels,
    hop_length=cfg.hop_length, f_min=cfg.f_min, f_max=cfg.f_max)

In [None]:
# spec_list = list()
# for f in tqdm(X_train):
#     try:
#         waveform, sr = torchaudio.load(DATA_ROOT/f"{f}.wav")
#     except:
#         print((DATA_ROOT/f"{f}.wav").stat())
#         print(f)
#     if sr != cfg.sample_rate:
#         waveform = torchaudio.transforms.Resample(sr, cfg.sample_rate)(waveform)
    
#     if waveform.shape[-1] < 80000:
#         padding = 80000 - waveform.shape[-1]
#         waveform = torch.cat([waveform, torch.zeros([1, padding])], axis=-1)
#     elif waveform.shape[-1] > 80000:
#         waveform = waveform[:80000]
    
#     log_mel_spec = (to_mel_spectrogram(waveform) + torch.finfo(torch.float).eps).log()
#     spec_list.append(log_mel_spec)
    
# all_train_lms = np.hstack(spec_list)
# train_mean_std = all_train_lms.mean(), all_train_lms.std()

# ---
# Cached
train_mean_std = (-5.2729697, 5.651953)

In [None]:
def sample_length(log_mel_spec):
    return log_mel_spec.shape[-1]

class DogDataset(torch.utils.data.Dataset):
    def __init__(self, cfg, filenames, labels, norm_mean_std=None):
        assert len(filenames) == len(labels), f'Inconsistent length of filenames and labels.'

        self.filenames = filenames
        self.labels = labels
        self.norm_mean_std = norm_mean_std

        # Calculate length of clip this dataset will make
        self.unit_length = cfg.unit_length

        # Test with first file
        assert self[0][0].shape[-1] == self.unit_length, f'Check your files, failed to load {filenames[0]}'

        # Show basic info.
        print(f'Dataset will yield log-mel spectrogram {len(self)} data samples in shape [1, {cfg.n_mels}, {self.unit_length}]')

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self, index):
        assert 0 <= index and index < len(self)
        f = self.filenames[index]
        try:
            waveform, sr = torchaudio.load(DATA_ROOT/f"{f}.wav")
        except:
            print(f)
        if sr != cfg.sample_rate:
            waveform = torchaudio.transforms.Resample(sr, cfg.sample_rate)(waveform)
        
        mel_spec = to_mel_spectrogram(waveform)
        mel_spec = torchaudio.transforms.TimeMasking(time_mask_param=80)(mel_spec)
        mel_spec = torchaudio.transforms.FrequencyMasking(freq_mask_param=80)(mel_spec)
        
        log_mel_spec = (mel_spec + torch.finfo(torch.float).eps).log()
        
        # normalize - instance based
        if self.norm_mean_std is not None:
            log_mel_spec = (log_mel_spec - self.norm_mean_std[0]) / self.norm_mean_std[1]

        # Padding if sample is shorter than expected - both head & tail are filled with 0s
        pad_size = self.unit_length - sample_length(log_mel_spec)
        if pad_size > 0:
            offset = pad_size // 2
            log_mel_spec = np.pad(log_mel_spec, ((0, 0), (0, 0), (offset, pad_size - offset)), 'constant')

        # Random crop
        crop_size = sample_length(log_mel_spec) - self.unit_length
        if crop_size > 0:
            start = np.random.randint(0, crop_size)
            log_mel_spec = log_mel_spec[..., start:start + self.unit_length]

        # Apply augmentations
        log_mel_spec = torch.Tensor(log_mel_spec)

        return log_mel_spec, self.labels[index]

In [None]:
class ResNet(nn.Module):
    def __init__(self):
        super(ResNet, self).__init__()
        model = torch.hub.load('pytorch/vision:v0.9.0', 'resnet18', pretrained=False)
        model.fc = nn.Linear(512, 512)
        model.conv1 = nn.Conv2d(1, 64,
                                kernel_size=(7, 7), 
                                stride=(2, 2), 
                                padding=(3, 3), 
                                bias=False)
        self.encoder = model
        self.cf = nn.Sequential(
            nn.Dropout(0.1),
            nn.Linear(512, 6)
        )
        self.isdog = nn.Sequential(
            nn.Dropout(0.1),
            nn.Linear(512, 1)
        )
        
    def forward(self, x):
        x = self.encoder(x)
        
        output1 = self.cf(x)
        output2 = self.isdog(x)
        return output1, output2

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
from pytorch_lightning.metrics.functional import accuracy
from dlcliche.torch_utils import IntraBatchMixup

class MyLearner(pl.LightningModule):

    def __init__(self, model, learning_rate=3e-4, mixup_alpha=0.4, weight=None, transpose_tfm=True, logger=None):

        super().__init__()
        self.learning_rate = learning_rate
        self.model = model
        self.batch_mixer = IntraBatchMixup(nn.CrossEntropyLoss(weight=weight), alpha=mixup_alpha)
        self.criterion = nn.CrossEntropyLoss()
        self.transpose_tfm = transpose_tfm
        
        self.my_log = list()

    def forward(self, x):
        if self.transpose_tfm:
            x = x.squeeze(1).transpose(-1, -2) # (B, 1, F, T) -> (B, T, F)
        x = self.model(x)
        return x

    def training_step(self, batch, batch_idx):
        
        x, y = batch
        x, stacked_ys = self.batch_mixer.transform(x, y, train=True)
        preds, is_dog = self(x)
        loss = self.batch_mixer.criterion(preds, stacked_ys)
        
        loss2 = nn.BCELoss()(
            nn.Sigmoid()(is_dog), (y < 3).type(torch.float).unsqueeze(-1)
        )
        return loss + 0.3 * loss2

    def validation_step(self, batch, batch_idx, split='val'):
        x, y = batch
        x, stacked_ys = self.batch_mixer.transform(x, y, train=False)
        preds, _ = self(x)
        loss = self.batch_mixer.criterion(preds, stacked_ys)
        
        yhat = torch.argmax(preds, dim=1)
        acc = accuracy(yhat, y)

        self.log(f'{split}_loss', loss, prog_bar=True,logger=True)
        self.log(f'{split}_acc', acc, prog_bar=True, logger=True)
        
        self.my_log.append({"epoch": self.trainer.current_epoch,"val_loss": loss.item(), "val_acc": acc.item()})
        return loss

    def test_step(self, batch, batch_idx):
        return self.validation_step(batch, batch_idx, split='test')

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=self.learning_rate)
        return optimizer

    def train_dataloader(self):
        return train_loader

    def val_dataloader(self):
        return valid_loader

    def test_dataloader(self):
        return test_loader

In [None]:
train_dataset = DogDataset(cfg, all_X_train, all_y_train, norm_mean_std=train_mean_std)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=512, shuffle=True, pin_memory=True, num_workers=8)

valid_dataset = DogDataset(cfg, all_X_val, all_y_val, norm_mean_std=train_mean_std)
valid_loader = torch.utils.data.DataLoader(valid_dataset, batch_size=1024, pin_memory=True, num_workers=8)

In [None]:
from pytorch_lightning.loggers import TensorBoardLogger
model = ResNet()
learner = MyLearner(model, learning_rate=5e-4, mixup_alpha=0.2, transpose_tfm=False, weight=torch.Tensor(class_weight).to(device))
checkpoint = pl.callbacks.ModelCheckpoint(monitor='val_loss',mode="min", save_top_k=10, every_n_val_epochs=1)
trainer = pl.Trainer(gpus=[0], max_epochs=40, callbacks=[checkpoint])
trainer.fit(learner)