In [None]:
import os
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
import torch
import torchaudio
import matplotlib.pyplot as plt
import random
from torch.utils.data import Dataset, DataLoader, random_split
from efficientnet_pytorch import EfficientNet
import torch.nn as nn
import time
import copy
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

<h1> 1. Preprocessing </h1>

<h3>1.1 Load the training data</h3>

In [None]:
DIR_PATH = '../input/birdsong-recognition'

In [None]:
def get_dataset_path(ebird_code, relative_path):
    path = '../input/birdsong-resampled-train-audio-'
    if ('a' <= ebird_code[0] <= 'b'):
        path += '00'
    elif ('c' <= ebird_code[0] <= 'f'):
        path += '01'
    elif ('g' <= ebird_code[0] <= 'm'):
        path += '02'
    elif ('n' <= ebird_code[0] <= 'r'):
        path += '03'
    elif ('s' <= ebird_code[0] <= 'y'):
        path += '04'
    return path + relative_path

In [None]:
metadata = pd.read_csv(f'{DIR_PATH}/train.csv')
metadata = metadata.query('rating >= 4.5') # Use only high quality recordings

In [None]:
birds_count = {}
for bird_species, count in zip(metadata.ebird_code.unique(), metadata.groupby("ebird_code")["ebird_code"].count().values):
    birds_count[bird_species] = count

birds = sorted(birds_count.items(), key=lambda kv: kv[1], reverse=True)[:20] # Select top 20 most represented birds
birds = [key for key, value in birds]
metadata = metadata.query('ebird_code in @birds')

In [None]:
print(birds)

In [None]:
df = pd.DataFrame()
df['path'] = "/" + metadata['ebird_code'].astype(str) + "/" + metadata['filename'].str.replace('mp3', 'wav')
df['class'] = metadata['ebird_code']
df['path'] = df.apply(lambda row: get_dataset_path(row['class'], row['path']), axis=1)

In [None]:
df = df.reset_index(drop=True)

In [None]:
le = LabelEncoder()
le.fit(df["class"])
df["class"] = le.transform(df["class"]) # Encode class names as integers

<h3>1.2 Convert audio to mel spectrograms</h3>

In [None]:
def read(file_path):
    waveform, sample_rate = torchaudio.load(file_path)
    return waveform, sample_rate

def rechannel(audio):
    waveform, sample_rate = audio
    if waveform.shape[0] == 1:
        return waveform, sample_rate
    waveform = waveform[:1, :]
    return waveform, sample_rate

def resize(audio, seconds=10):
    waveform, sample_rate = audio
    length = waveform.shape[1]
    max_length = sample_rate * seconds
    
    if length > max_length:
        trim_start = random.randint(0, length - max_length)
        trim_end = trim_start + max_length
        
        waveform  = waveform[:,trim_start:trim_end]
    
    elif length < max_length:
        pad_start_lenght = (max_length - length) // 2
        pad_end_length = max_length - length - pad_start_lenght
        
        pad_start = torch.zeros((1, pad_start_lenght))
        pad_end = torch.zeros((1, pad_end_length))
        
        waveform = torch.cat((pad_start, waveform, pad_end), 1)
    
    return waveform, sample_rate

def high_pass(audio, f_min=1400):
    waveform, sample_rate = audio
    
    waveform = torchaudio.functional.highpass_biquad(waveform, sample_rate, f_min)
    
    return waveform, sample_rate

def preprocess(file_path):
    return high_pass(resize(rechannel(read(file_path))))

def get_spectrogram(file_path, n_mels=128, n_fft=1024, hop_len=1024, top_db=80):
    audio, sr = preprocess(file_path)
    spec = torchaudio.transforms.MelSpectrogram(sr, n_mels=n_mels, n_fft=n_fft, hop_length=hop_len)(audio)
    spec = torchaudio.transforms.AmplitudeToDB(top_db=top_db)(spec)
    
    return spec

In [None]:
spec = get_spectrogram(df['path'][0])
ax = plt.imshow(spec.squeeze().numpy())

<h1> 2. Training </h1>

<h3>2.1 Define datasets</h3>

In [None]:
class BirdsongDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transform
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index):
        path = self.df.iloc[index, 0]
        label = self.df.iloc[index, 1]
        spec = get_spectrogram(path)
        if (self.transform):
            spec = self.transform(spec)
        spec = spec.expand(3, -1, -1) # Expand to 3 channel (RGB) image
        return spec, label

In [None]:
dataset = BirdsongDataset(df)

<h3>2.2 Split dataset into training, validation, and test sets</h3>

In [None]:
num_train = round(0.8 * len(dataset)) # 80% train set
num_test = len(dataset) - num_train

train_ds, test_ds = random_split(dataset, [num_train, num_test])

num_val = len(test_ds) // 2 # 10% validation set
num_test = len(test_ds) - num_val # 10% test set

val_ds, test_ds = random_split(test_ds, [num_val, num_test])

image_datasets = {
    'train': train_ds,
    'val': val_ds,
    'test': test_ds
}

dataloaders = {
    'train': DataLoader(train_ds, batch_size=16, shuffle=True),
    'val': DataLoader(val_ds, batch_size=16, shuffle=False),
    'test': DataLoader(test_ds, batch_size=16, shuffle=False)
}

dataset_sizes = {
    'train': len(train_ds),
    'val': len(val_ds),
    'test': len(test_ds)
}

<h3>2.3 Load pretrained model</h3>

In [None]:
model = EfficientNet.from_pretrained('efficientnet-b7', num_classes=df['class'].max() + 1)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

<h3>2.4 Train the model</h3>

In [None]:
criterion = torch.nn.CrossEntropyLoss()
optimizer_ft = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer_ft, step_size=10, gamma=0.1)

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=50):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluation mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward
                # Track history only if training
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # Backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # Stats
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            print('{} Loss: {:.4f} Accuracy: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # Save best weights
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
                torch.save(best_model_wts, f'state-{epoch}-{time.time()}.pt')
        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best validation accuracy: {:4f}'.format(best_acc))

    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [None]:
model = train_model(model, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=75)

<h1> 3. Testing </h1>

<h3>3.1 Evaluate model on test set</h3>

In [None]:
def predict():
    model.eval()
    y_pred = None
    y_true = None
    for i, [images, labels] in enumerate(dataloaders['test']):
        with torch.no_grad():
            images = images.to(device)
            outputs = model(images)
            _, preds = torch.max(outputs, 1)
            if (y_pred is None):
                y_pred = preds
            else:
                y_pred = torch.cat((y_pred, preds))
            if (y_true is None):
                y_true = labels
            else:
                y_true = torch.cat((y_true, labels))
    return y_pred.cpu().numpy(), y_true.numpy()

In [None]:
y_pred, y_true = predict()

<h3>3.2 Generate confusion matrix and classification report</h3>

In [None]:
conf_matrix = confusion_matrix(y_true, y_pred)
print(conf_matrix)

In [None]:
plt.figure(figsize=(11, 9))
ax = sns.heatmap(conf_matrix, annot=True)
ax.xaxis.set_ticklabels(birds)
ax.yaxis.set_ticklabels(birds)
plt.xticks(rotation=90)
plt.yticks(rotation=0)
plt.savefig('confusion_matrix.png')

In [None]:
print(classification_report(y_true, y_pred, target_names=birds))