In [1]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import json
from tqdm import trange
from functools import lru_cache
import matplotlib.pyplot as plt; plt.ion()

from umap import UMAP
from librosa.feature import mfcc
from librosa.core.spectrum import stft

from scipy.io import wavfile as wav
from scipy.signal import spectrogram

from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
from sklearn.preprocessing import LabelEncoder

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import torch
import torch.nn as nn
import torchaudio
from torch.utils.data import Dataset, DataLoader

In [3]:
SR = 44100

In [27]:
DATA_LOC = 'data/Calls for ML/labelled_data/'

In [4]:
class Files:
    """Class to house file paths for labelled data.
    """
    data_loc = 'data/Calls for ML/'

    # create symlinks so that all the data can be seen from labelled_data
    lb_data_loc = 'data/Calls for ML/labelled_data/'

    state_dict = 'data/Calls for ML/simple_rnn_sd.pth'

    ml_test = 'ML_Test.wav'
    labels_file = 'Calls_ML.xlsx'

In [7]:
DATA_LOC = 'data/Calls for ML/labelled_data/'

class AudioDataset(torch.utils.data.Dataset):
    def __init__(self, device='cpu'):
        self.audio = {
            f.replace('.wav', ''): self.load_audio(Files.lb_data_loc + f).to(device) for f in os.listdir(Files.lb_data_loc) if '.wav' in f
        }
        self.audio_lens = {k: (len(a), len(a)/SR) for k, a in self.audio.items()}

        calls = pd.read_excel(os.path.join(Files.lb_data_loc, Files.labels_file))
        calls = calls.loc[(calls.Call_Type != 'interference'), ['File', 'Call_Type', 'Start', 'End']]
        calls = calls.loc[~calls.Call_Type.isna(), ['File', 'Call_Type', 'Start', 'End']]
        # calls['File'] = 'Calls_ML'
        calls.columns = calls.columns.str.lower()


        calls_shaldon = pd.read_excel(os.path.join(Files.lb_data_loc, 'Shaldon_Training_Labels.xlsx'))
        calls_shaldon = calls_shaldon.loc[~calls_shaldon.Call_Type.isna(), ['File', 'Call_Type', 'Start', 'End']]
        calls_shaldon['File'] = 'Shaldon_Combined'
        calls_shaldon.columns = calls_shaldon.columns.str.lower()

        calls_blackpool = pd.read_excel(os.path.join(Files.lb_data_loc, 'Blackpool_Labels.xlsx'))
        calls_blackpool = calls_blackpool.loc[~calls_blackpool.Call_Type.isna(), ['File', 'Call_Type', 'Start', 'End']]
        calls_blackpool['File'] = 'Blackpool_Combined_FINAL'
        calls_blackpool.columns = calls_blackpool.columns.str.lower()

        labels = pd.concat([calls, calls_shaldon, calls_blackpool], axis=0).reset_index(drop=True)

        labels.loc[labels.call_type.isin(['Phee', 'Trill', 'Whistle']), 'call_type'] = 'LongCalls'
        labels.loc[labels.call_type.isin(['Cheep', 'Chuck', 'Tsit']), 'call_type'] = 'ShortCalls'

        # Remove calls that have length 0
        self.labels = labels.loc[labels.end - labels.start > 0].reset_index(drop=True)

        self.X = np.vstack([
            self.process_file(*self.labels.loc[i, ['file', 'start', 'end']], sr=SR)
            for i in self.labels.index
        ])

        y = np.array(self.labels.call_type, dtype=str)
        self.le = LabelEncoder()
        self.le.fit(y)
        self.y_transformed = self.le.transform(y)


    def __len__(self):
        return 1
    
    
    def load_audio(self, file_path):
        sr, audio = self.load_audio_file(file_path)
        audio = torchaudio.functional.resample(torch.tensor(audio), sr, SR)
        return audio
    
    def process_file(self, f, start, end, sr, n_fft_prop=1/3):
        a = self.load_audio(os.path.join(DATA_LOC, f + '.wav'))[int(start * sr):int(end * sr)].numpy()
        # S = spectrogram(a, nperseg=len(a)//3, noverlap=len(a)//12, fs=sr)[-1]
        S = np.abs(stft(a,
            n_fft=int(len(a) * n_fft_prop),
            hop_length=int(len(a) * n_fft_prop/2
        )))
        mel_features = mfcc(S=S, n_mfcc=20)
        mel_features = (mel_features - mel_features.mean()) / (mel_features.std() + 1e-6)

        features = np.hstack([
            mel_features.reshape(-1),
            self.additional_features(start, end)
        ])
        return features

    def additional_features(self, start, end):
        duration = end - start
        additional_features = np.hstack([
            duration,
        ])
        return additional_features

    
    @staticmethod
    @lru_cache(maxsize=100)
    def load_audio_file(filename):
        sr, audio = wav.read(filename)
        if len(audio.shape) == 2:
            audio = audio[:, 0]  # take the first channel
        audio = audio.astype('f')/1000  # scale values down by 1000.
        return sr, audio

In [8]:
dataset = AudioDataset()


In [10]:
dataset.X.shape

(3302, 141)

In [11]:
import torch
import torch.nn as nn

In [70]:
class Classifier(torch.nn.Module):
    def __init__(self, num_classes, input_size, hidden_size):
        super().__init__()
        self.linear1 = nn.Linear(input_size, hidden_size)
        self.linear2 = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x = self.linear1(x)
        x = nn.ReLU()(x)
        x = self.linear2(x)
        x = nn.Softmax(dim=1)(x)
        return x

In [71]:
# train test split
X_train, X_test, y_train, y_test = train_test_split(dataset.X, dataset.y_transformed, test_size=0.15, random_state=1337)

In [72]:
# Scale the data
from sklearn.preprocessing import StandardScaler, MinMaxScaler
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [73]:
# make dataset and data loader
from torch.utils.data import Dataset, DataLoader

class CallsDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y
        self.n = X.shape[0]
        
    def __len__(self):
        return self.n
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]
    
train_dataset = CallsDataset(X_train, y_train)
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)

test_dataset = CallsDataset(X_test, y_test)
test_dataloader = DataLoader(test_dataset, batch_size=8, shuffle=True)

In [75]:
model = Classifier(num_classes=len(dataset.le.classes_), input_size=dataset.X.shape[1], hidden_size=128)
loss = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# train the model
for epoch in range(100):
    for X_batch, y_batch in train_dataloader:
        y_pred = model(X_batch.float())
        l = loss(y_pred, y_batch)
        l.backward()
        optimizer.step()
        optimizer.zero_grad()
    if epoch % 10 == 0:
        print(f'Epoch {epoch}: {l.item():.6f}')

Epoch 0: 1.663270
Epoch 10: 1.515315
Epoch 20: 1.442126
Epoch 30: 1.322674
Epoch 40: 1.566721
Epoch 50: 1.295004
Epoch 60: 1.276551
Epoch 70: 1.277666
Epoch 80: 1.623746
Epoch 90: 1.411573


In [76]:
# evaluate the model
model.eval()
with torch.no_grad():
    y_pred = model(torch.from_numpy(X_test).float())
    l = loss(y_pred, torch.from_numpy(y_test))
    print(f'Loss: {l.item():.6f}')
    y_pred = y_pred.argmax(dim=1).numpy()
    print(f'{100*(y_pred == y_test).sum() / len(y_test):.2f}% accuracy')

Loss: 1.401187
88.10% accuracy


In [25]:
dataset.le.classes_

array(['Jagged Trill', 'LongCalls', 'Moan', 'Resonate', 'Resonating Note',
       'ShortCalls', 'Sneeze', 'Whistle '], dtype='<U15')

In [80]:
# get unique labels counts from dataset.labels
dataset.labels.call_type.value_counts()

call_type
ShortCalls         1264
LongCalls           843
Resonating Note     667
Resonate            285
Moan                100
Jagged Trill         96
Whistle              24
Sneeze               23
Name: count, dtype: int64

In [78]:
counts

array([  82,  705,   84,  247,  563, 1088,   18,   19])

In [79]:
unique

array([0, 1, 2, 3, 4, 5, 6, 7])