In [None]:
import librosa
import librosa.display
import torch
import matplotlib.pyplot as plt
from tqdm import tqdm
import torch.backends.cudnn as cudnn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms as transforms

from torch.utils.data import Dataset, DataLoader

%matplotlib inline

In [None]:
! pip install tqdm

# Dataset preprocessing

https://zenodo.org/record/1302992

In [None]:
! ls /Users/rafalpilarczyk/Downloads/ismir04_genre/

In [None]:
DATASET_DIR = '/Users/rafalpilarczyk/Downloads/ismir04_genre/audio'

In [None]:
def get_files_list(directory):
    ls = []
    for root, _, files in os.walk(directory, topdown=False):
        for name in files:
            ls.append(os.path.join(root, name))
    ls = list(filter(lambda x : x.split('/')[-1][0] !='.', ls)) #removing .start_file :)
    return ls

In [None]:
import os

train_dir = os.path.join(DATASET_DIR, 'training')
eval_dir = os.path.join(DATASET_DIR, 'evaluation')

classes = os.listdir(train_dir)
classes = list(filter(lambda x : x[0] !='.', classes))


cc = []

def seconds_of_file(file_path):
    try:
        dr = librosa.get_duration(filename=file_path)
        dr = int(dr)
    except Exception as e:
        print(f"error with{file_path}")
        dr = 0
    return dr

for _cls in classes:
    print(_cls)
    files_dir = os.path.join(train_dir, _cls)
    files_list = get_files_list(files_dir)
    cc.extend([{'file_name' : file_name, 'class' : _cls, 'train' : True, 'seconds' : seconds_of_file(file_name)} for file_name in files_list])
    
    
    files_dir = os.path.join(eval_dir, _cls)
    files_list = get_files_list(files_dir)
    cc.extend([{'file_name' : file_name, 'class' : _cls, 'train' : False, 'seconds' : seconds_of_file(file_name)} for file_name in files_list])


In [None]:
import pandas as pd

df = pd.DataFrame(cc)
df.head()
len(df)

In [None]:
df = df[df['seconds']>10]
len(df)

In [None]:
df[df['train'] == True].to_csv('training.csv')

In [None]:
df[df['train'] == False].to_csv('testing.csv')

In [None]:
sample = df[df['train'] == False]['file_name'].iloc[6]

In [None]:
data, fs = librosa.core.load(sample)

In [None]:
plt.figure(figsize=(14, 5))
librosa.display.waveplot(data)

In [None]:
import IPython.display as ipd
ipd.Audio(sample) # load a local WAV file


In [None]:
plt.figure(figsize=(14, 5))
D = librosa.amplitude_to_db(np.abs(librosa.stft(data)), ref=np.max)
librosa.display.specshow(D, y_axis='linear')
plt.colorbar(format='%+2.0f dB')
plt.title('Linear-frequency power spectrogram')


# Pytorch loaders and class helpers

In [None]:
class LibrosaLoader(object):
    def __init__(self, duration=3):
        self.duration = duration
    
    def __call__(self, path, max_size):
        start = np.random.randint(0, max_size-self.duration-1)
        data, _ = librosa.load(path, dtype=np.float32, duration=self.duration, offset=start)
        return data

In [None]:
class STFT(object):
    
    def __init__(self, fft=1024, sr=16000, hop=160, win_length=400, clip=True):
        self.size = fft
        self.sr = sr
        self.hop = hop
        self.win_l = win_length
        self.clip = clip
        
    def __call__(self, audio_file):
        
        if len(audio_file.shape) == 1:
            audio_file = np.expand_dims(audio_file, axis=0)
        return self._stft(audio_file[0, :])
    
    def _stft(self, audio_file):
        y = np.abs(librosa.stft(audio_file, hop_length=self.hop, n_fft=self.size, win_length=self.win_l))
        if self.clip:
            y = y[1:513, :300] #remove last two elements
        else:
            y = y[1:513, :]
            
        y = (y - y.mean()) / y.std() #standarization in runtime
        return y
    
class ArrayToTensor(object):
    
    def __call__(self, audio_file):
        ten = torch.from_numpy(audio_file).float()
        ten.unsqueeze_(0)
        return ten
    
    def __repr__(self):
        format_string = self.__class__.__name__
        return format_string

In [None]:
def cls_to_idx_dict(list_of_speakers):
    list_of_speakers = sorted(list_of_speakers)
    class_to_idx = {list_of_speakers[i]: i for i in range(len(list_of_speakers))}
    return class_to_idx


class AudioSegmentBaseDataset(Dataset):
    def __init__(self, csv_file, loader, transform=None, sample_audio=None):
        self.dataset_df = pd.read_csv(csv_file)
        self.transform = transform
        self.loader = loader
        
        self._size_df = len(self.dataset_df)
        
        self.classes = list(self.dataset_df['class'].unique()) 
        self.class_to_idx = cls_to_idx_dict(self.classes)
        print(self.class_to_idx)

    def __getitem__(self, indice):
        
        sample_index = np.random.randint(0, self._size_df-1)
        
        row = self.dataset_df.iloc[sample_index]
        
        selected_class = row['class']
        audio_name = row['file_name']
        length = row['seconds']
        

        
        cls = self.class_to_idx[selected_class]
        audio = self.loader(audio_name, length)

        if self.transform:
            audio = self.transform(audio)

        return audio, cls
    
    def __len__(self):
        return 1

In [None]:
trs_train = transforms.Compose([STFT(clip=False), ArrayToTensor()])

ds_train = AudioSegmentBaseDataset('training.csv', loader=LibrosaLoader(duration=3.71), transform=trs_train)
print(len(ds_train))
ds_test = AudioSegmentBaseDataset('testing.csv', loader=LibrosaLoader(duration=3.71), transform=trs_train)

In [None]:
ds_train[0][0].unsqueeze(0).size()

# Model definition

In [None]:
import torchvision.models as models
from torchvision.models.resnet import ResNet

import time
import torch
import torch.nn as nn
import torchvision.models as models

class inception_modified(models.Inception3):

    def __init__(self, num_classes=8):
        self.inplanes = 64
        super(inception_modified, self).__init__(num_classes=num_classes, aux_logits=False)
        self.Conv2d_1a_3x3 = BasicConv2d(1, 32, kernel_size=3, stride=2)
        
        self.emb = nn.Linear(2048, 128)
        self.fc = nn.Linear(128, num_classes)
    
    def forward(self, x):
        emb = F.relu(self.extract_embedding(x), inplace=True)
        out = self.fc(emb)
        return out
        
    def extract_embedding(self, x):
        if self.transform_input:
            x_ch0 = torch.unsqueeze(x[:, 0], 1) * (0.229 / 0.5) + (0.485 - 0.5) / 0.5
            x_ch1 = torch.unsqueeze(x[:, 1], 1) * (0.224 / 0.5) + (0.456 - 0.5) / 0.5
            x_ch2 = torch.unsqueeze(x[:, 2], 1) * (0.225 / 0.5) + (0.406 - 0.5) / 0.5
            x = torch.cat((x_ch0, x_ch1, x_ch2), 1)
        # 299 x 299 x 3
        x = self.Conv2d_1a_3x3(x)
        # 149 x 149 x 32
        x = self.Conv2d_2a_3x3(x)
        # 147 x 147 x 32
        x = self.Conv2d_2b_3x3(x)
        # 147 x 147 x 64
        x = F.max_pool2d(x, kernel_size=3, stride=2)
        # 73 x 73 x 64
        x = self.Conv2d_3b_1x1(x)
        # 73 x 73 x 80
        x = self.Conv2d_4a_3x3(x)
        # 71 x 71 x 192
        x = F.max_pool2d(x, kernel_size=3, stride=2)
        # 35 x 35 x 192
        x = self.Mixed_5b(x)
        # 35 x 35 x 256
        x = self.Mixed_5c(x)
        # 35 x 35 x 288
        x = self.Mixed_5d(x)
        # 35 x 35 x 288
        x = self.Mixed_6a(x)
        # 17 x 17 x 768
        x = self.Mixed_6b(x)
        # 17 x 17 x 768
        x = self.Mixed_6c(x)
        # 17 x 17 x 768
        x = self.Mixed_6d(x)
        # 17 x 17 x 768
        x = self.Mixed_6e(x)
        # 17 x 17 x 768
        if self.training and self.aux_logits:
            aux = self.AuxLogits(x)
        # 17 x 17 x 768
        x = self.Mixed_7a(x)
        # 8 x 8 x 1280
        x = self.Mixed_7b(x)
        # 8 x 8 x 2048
        x = self.Mixed_7c(x)
        # 8 x 8 x 2048
        x = F.avg_pool2d(x, kernel_size=8)
        # 1 x 1 x 2048
        x = F.dropout(x, training=self.training)
        # 1 x 1 x 2048
        x = x.view(x.size(0), -1)

        # 2048
        x = self.emb(x)
        # 1000 (num_classes)
        if self.training and self.aux_logits:
            return x, aux
        return x
        
net = inception_modified()

In [None]:
net.forward(ds_train[0][0].unsqueeze(0)) #forward

In [None]:
net.extract_embedding(ds_train[0][0].unsqueeze(0)).size() #embeddings

In [None]:
def validate(net, data_val_loader, device, criterion, cuda=False):
    with torch.no_grad():
        loss_sum = 0
        total = 0
        correct_top1 = 0
        correct_top5 = 0
        for batch_idx, (inputs, targets) in tqdm(enumerate(data_val_loader)):
            if cuda:
                inputs, targets = inputs.to(device), targets.to(device)
            outputs = net(inputs)
            prec_1, prec_5 = accuracy_top1_5(outputs, targets, topk=(1, 5))
            correct_top1 += prec_1
            correct_top5 += prec_5
            loss = criterion(outputs, targets)
            loss_sum += loss.item()
            _, predicted = outputs.max(1)
            total += targets.size(0)
            # correct += predicted.eq(targets).sum().item()
    return loss_sum, correct_top1, correct_top5, total

In [None]:
def set_seeds_for_workers(worker_no):
    np.random.seed()
    
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()
    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0
    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

def accuracy_top1_5(output, target, topk=(1,)):
    with torch.no_grad():
        maxk = max(topk)
        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))
        res = []
        for k in topk:
            correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
            res.append(correct_k)
    return res

def show_metrics(prefix, index, size, loss_sum, performance):
    print(prefix + "Iter %d, size %d, loss %6.6f, performance %3.2f percent" % (
        index, size, loss_sum, 100.0 * performance))

In [1]:

def train(cuda=False, epochs=50):
    net = inception_modified()
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("Selected device {} ".format(device))
  
    print("Cuda devices found {}".format(torch.cuda.device_count()))
    if cuda:
        net = net.cuda()
        cudnn.benchmark = True

    
    net.to(device)
   
    optimizer = optim.SGD(net.parameters(), lr=0.1)
    #optimizer = optim.Adam(net.parameters(), lr=args.lr) # you can also use Adam
    criterion = torch.nn.CrossEntropyLoss() 
    if cuda:
        criterion = criterion.cuda()
        print("Criterion {} ".format(criterion))
        
    data_train_loader = DataLoader(ds_train, 1,
                                        num_workers=1,
                                        pin_memory=True, worker_init_fn=set_seeds_for_workers)
    data_val_loader = DataLoader(ds_test, 1,
                                      num_workers=1,
                                      pin_memory=True, drop_last=True, worker_init_fn=set_seeds_for_workers)
    
    print(len(ds_train), len(ds_test))
    
    number_of_epochs = epochs
    loss_sum = 0
    correct = 0
    total = 0
    iteration = 0

    for epoch in range(0, number_of_epochs):
        net.train()
        
        start_epoch_time = time.time()
        losses = AverageMeter() #metrics for loss, acc
        accs = AverageMeter()
        
        loss_sum = 0
        correct = 0
        total = 0
        
        for batch_idx, (inputs, targets) in enumerate(data_train_loader):
            inputs, targets = inputs.to(device), targets.to(device)
            optimizer.zero_grad()
            outputs = net(inputs)
            
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            
            loss_batch = loss.item()
            loss_sum += loss_batch
            _, predicted = outputs.max(1)
            total_batch = targets.size(0)
            total += total_batch
            correct_batch = predicted.eq(targets).sum().item()
            correct += correct_batch
            
            losses.update(loss.item(), inputs.size(0))
            accs.update(correct_batch / total_batch, inputs.size(0))
            show_metrics("Batch top 1: ", batch_idx,1, loss_batch, correct_batch / total_batch)
            
            iteration += 1
            
        stop_epoch_time = time.time() - start_epoch_time
        torch.save(net.state_dict(), output_path)

In [None]:
train()