In [12]:
import os
import torch
import torchaudio
import numpy as np
import pandas as pd

In [13]:
def rechannel(aud, new_ch):
    sig, sr = aud
    if (sig.shape[0] == new_ch):
        return sig, sr
    if (new_ch == 1):
        resig = sig[:1, :]
    else:
        resig = torch.cat([sig, sig])
    return (resig, sr)

In [14]:
def resample(aud, new_sr):
    sig, sr = aud
    if (sr == new_sr):
        return sig, sr
    resampled_ch1 = torchaudio.transforms.Resample(sr, new_sr)(sig[:1,:])
    resampled_ch2 = torchaudio.transforms.Resample(sr, new_sr)(sig[1:,:])
    resampled_sig = torch.cat([resampled_ch1, resampled_ch2])
    return (resampled_sig, new_sr)

In [15]:
def resize(aud, max_ms):
    sig, sr = aud
    num_ch, sig_len = sig.shape
    max_len = sr // 1000 * max_ms
    if sig_len > max_len:
        sig = sig[:,:max_len]
    elif sig_len < max_len:
        pad_begin_len = np.random.randint(0, max_len - sig_len)
        pad_end_len = max_len - sig_len - pad_begin_len
        pad_begin = torch.zeros((num_ch, pad_begin_len))
        pad_end = torch.zeros((num_ch, pad_end_len))
        sig = torch.cat((pad_begin, sig, pad_end), 1)
    return (sig, sr)

In [16]:
def time_shift(aud, shift_limit):
    sig, sr = aud
    sig_len = sig.shape[1]
    shift_amt = int(np.random.random() * shift_limit * sig_len)
    return (sig.roll(shift_amt), sr)

In [17]:
def mel_spectrogram(aud, n_mels=64, n_fft=1024, hop_len=None):
    sig, sr = aud
    top_db = 80
    # shape [channel, n_mels, time]
    spec = torchaudio.transforms.AmplitudeToDB(top_db=top_db)(sig)
    spec = torchaudio.transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(spec)
    return spec

In [18]:
def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
    _, n_mels, n_steps = spec.shape
    mask_value = spec.mean()
    aug_spec = spec
    freq_mask_param = max_mask_pct * n_mels
    for _ in range(n_freq_masks):
        aug_spec = torchaudio.transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)
    time_mask_param = max_mask_pct * n_steps
    for _ in range(n_time_masks):
        aug_spec = torchaudio.transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)
    return aug_spec

In [19]:
class AudioDataSet(torch.utils.data.Dataset):

    def __init__(self, df_path, data_path):
        self.df = pd.read_csv(df_path)
        self.data_path = data_path
        self.duration = 2300
        self.sr = 16000
        self.channel = 1
        self.shift_pct = 0.4

    def __len__(self):
        return len(self.df)    

    def __getitem__(self, idx):
        audio_file = self.data_path + self.df.loc[idx, 'path']
        # print(audio_file)
        class_id = self.df.loc[idx, 'gender']
        # print(class_id)
        class_id = 0 if class_id == 'female' else 1
        raw_aud = torchaudio.load(audio_file)
        resr_aud = resample(raw_aud, self.sr)
        rech_aud = rechannel(resr_aud, self.channel)
        resz_aud = resize(rech_aud, self.duration)
        shft_aud = time_shift(resz_aud, self.shift_pct)
        raw_spec = mel_spectrogram(shft_aud, n_mels=64, n_fft=1024, hop_len=None)
        aug_spec = spectro_augment(raw_spec, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)
        return aug_spec, class_id #, audio_file

In [20]:
# df_path = './fluent_speech/index.csv'
# data_path = './fluent_speech/'
# myds = AudioDataSet(df_path, data_path)
# tensor, label, path = myds.__getitem__(5000)
# # !mpv $path
# tensor, label, path


In [21]:
class AudioClassifier (torch.nn.Module):

    def __init__(self):
        super().__init__()
        conv_layers = []
        # 1st conv layer
        self.conv1 = torch.nn.Conv2d(1, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = torch.nn.ReLU()
        self.bn1 = torch.nn.BatchNorm2d(8)
        torch.nn.init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]
        # 2nd conv layer
        self.conv2 = torch.nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = torch.nn.ReLU()
        self.bn2 = torch.nn.BatchNorm2d(16)
        torch.nn.init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]
        # 3rd conv layer
        self.conv3 = torch.nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = torch.nn.ReLU()
        self.bn3 = torch.nn.BatchNorm2d(32)
        torch.nn.init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]
        # 4th conv layer
        self.conv4 = torch.nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = torch.nn.ReLU()
        self.bn4 = torch.nn.BatchNorm2d(64)
        torch.nn.init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]
        # Linear Classifier
        self.ap = torch.nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = torch.nn.Linear(in_features=64, out_features=2)
        # Wrap the Convolutional Blocks
        self.conv = torch.nn.Sequential(*conv_layers)

    def forward(self, x):
        # Run the convolutional blocks
        x = self.conv(x)
        # Adaptive pool and flatten for input to linear layer
        x = self.ap(x)
        x = x.view(x.shape[0], -1)
        # Linear layer
        x = self.lin(x)
        # Final output
        return x

In [22]:
def training(model, train_dl, num_epochs, max_lr):
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(),lr=max_lr)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(
        optimizer,
        max_lr=max_lr,
        steps_per_epoch=int(len(train_dl)),
        epochs=num_epochs,
        anneal_strategy='linear'
        )
    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct_prediction = 0
        total_prediction = 0
        for i, data in enumerate(train_dl):
            inputs, labels = data[0].to(device), data[1].to(device)
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            scheduler.step()
            running_loss += loss.item()
            _, prediction = torch.max(outputs,1)
            correct_prediction += (prediction == labels).sum().item()
            total_prediction += prediction.shape[0]
        num_batches = len(train_dl)
        avg_loss = running_loss / num_batches
        acc = correct_prediction / total_prediction
        print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')
    print('Finished Training')

In [23]:
def validation (model, val_dl):
    correct_prediction = 0
    total_prediction = 0
    model.eval()
    with torch.no_grad():
        for data in val_dl:
            inputs, labels = data[0].to(device), data[1].to(device)
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s
            outputs = model(inputs)
            _, prediction = torch.max(outputs, 1)
            correct_prediction += (prediction == labels).sum().item()
            total_prediction += prediction.shape[0]
    acc = correct_prediction / total_prediction
    print(f'Accuracy: {acc:.2f}, Total items: {total_prediction}')

In [24]:
def inference_preprocessor(path, sr, ch, duration, shift_pct):
    raw_aud = torchaudio.load(path)
    resr_aud = resample(raw_aud, sr)
    rech_aud = rechannel(resr_aud, ch)
    resz_aud = resize(rech_aud, duration)
    shft_aud = time_shift(resz_aud, shift_pct)
    raw_spec = mel_spectrogram(shft_aud, n_mels=64, n_fft=1024, hop_len=None)
    aug_spec = spectro_augment(raw_spec, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)
    return aug_spec

def inference (model, path, sr, ch, duration, shift_pct):
    input = inference_preprocessor(path, sr, ch, duration, shift_pct)
    model.eval()
    with torch.no_grad():
        output = model(input.unsqueeze(0).to(device))
        print(f'Prediction: {output}')
        _, prediction = torch.max(output, 1)
        print(f'Prediction: {prediction}')

In [25]:
df_path = './fluent_speech/index.csv'
data_path = './fluent_speech/'
split_ratio = 0.8
num_workers = 8
batch_size = 64
num_epochs = 5
max_lr = 1e-3

myds = AudioDataSet(df_path, data_path)
num_items = len(myds)
num_train = round(num_items * split_ratio)
num_val = num_items - num_train
train_ds, val_ds = torch.utils.data.random_split(myds, [num_train, num_val])

train_dl = torch.utils.data.DataLoader(
    train_ds, batch_size=batch_size, shuffle=True, num_workers=num_workers
    )
val_dl = torch.utils.data.DataLoader(
    val_ds, batch_size=batch_size, shuffle=False, num_workers=num_workers
    )

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
myModel = AudioClassifier()
myModel = myModel.to(device)
next(myModel.parameters()).device

training(myModel, train_dl, num_epochs, max_lr)
validation(myModel, val_dl)

Epoch: 0, Loss: 0.55, Accuracy: 0.72
Epoch: 1, Loss: 0.29, Accuracy: 0.89
Epoch: 2, Loss: 0.22, Accuracy: 0.91
Epoch: 3, Loss: 0.20, Accuracy: 0.92
Epoch: 4, Loss: 0.19, Accuracy: 0.93
Finished Training
Accuracy: 0.93, Total items: 4626


In [26]:
test_path_female = './fluent_speech/wavs/speakers/2BqVo8kVB2Skwgyb/0a3129c0-4474-11e9-a9a5-5dbec3b8816a.wav'
inference(
    model=myModel,
    path=test_path_female,
    sr=16000,
    ch=1,
    duration=2300,
    shift_pct=0.4
    )
# !mpv $test_path_female

Prediction: tensor([[33484520., 13053734.]], device='cuda:0')
Prediction: tensor([0], device='cuda:0')


In [27]:
test_path_male = './fluent_speech/wavs/speakers/g2dnA9Wpvzi2WAmZ/f3870450-4546-11e9-aa52-bf2189a03a60.wav'
inference(
    model=myModel,
    path=test_path_male,
    sr=16000,
    ch=1,
    duration=2300,
    shift_pct=0.4
    )
# !mpv $test_path_male

Prediction: tensor([[-20428352.,  56580116.]], device='cuda:0')
Prediction: tensor([1], device='cuda:0')
