In [1]:
import os
import sys

PROJECT_ROOT = os.path.abspath(os.path.join(
                  os.path.dirname("test-split-audio"), 
                  os.pardir)
)
sys.path.append(PROJECT_ROOT)

import pandas as pd
import numpy as np
import torch
import json
import random
import torch.nn as nn
from src.utils import read_feature, pad_features
from src.features import extract_melspectrogram, extract_mfcc

# making sure the experiments are reproducible
seed = 2109
random.seed(seed)
np.random.seed(seed)
torch.cuda.manual_seed(seed)
os.environ["PYTHONHASHSEED"] = str(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

def seed_worker(worker_id: int):
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

g = torch.Generator()
g.manual_seed(seed)

<torch._C.Generator at 0x7fab4bdfb830>

In [2]:
features_path = "../features/propor2022/"

# loading training features
X_train = read_feature(path=features_path, fold="0", name="X_train.pth")
y_train = read_feature(path=features_path, fold="0", name="y_train.pth")
print(f"Train: {X_train.shape}, {y_train.shape}")

# loading validation features
X_valid = read_feature(path=features_path, fold="0", name="X_valid.pth")
y_valid = read_feature(path=features_path, fold="0", name="y_valid.pth")
print(f"Valid: {X_valid.shape}, {y_valid.shape}")

# loading testing features
X_test = read_feature(path=features_path, fold=None, name="X_test.pth")
y_test = read_feature(path=features_path, fold=None, name="y_test.pth")
print(f"Test: {X_test.shape}, {y_test.shape}")

Train: torch.Size([500, 1, 256000]), torch.Size([500, 3])
Valid: torch.Size([125, 1, 256000]), torch.Size([125, 3])
Test: torch.Size([308, 1, 256000]), torch.Size([308, 3])


In [3]:
test = X_train.view(X_train.size(0), -1, X_train.size(1), 8000)
test.shape

torch.Size([500, 32, 1, 8000])

In [4]:
import pywt
from copy import deepcopy

total_batches = test.shape[0]
total_frames = test.shape[1]
feats = []
feature = "mfcc"

for i in range(total_batches):
        
    for j in range(total_frames):
        audio = deepcopy(test[i, j, :, :].detach().squeeze().numpy())

        coeffs = pywt.wavedec(
            data=audio,
            wavelet="db8",
            mode="symmetric",
            level=4
        )

        coeffs = [torch.from_numpy(c).unsqueeze(0) for c in coeffs]

        for coeff in coeffs:
            if feature == "mfcc":
                feat = extract_mfcc(
                    audio=coeff,
                    sample_rate=8000,
                    n_fft=512,
                    hop_length=256,
                    n_mfcc=64,
                    f_min=0,
                    f_max=None
                )
            elif feature == "mel_spectrogram":
                feat = extract_melspectrogram(
                    audio=coeff,
                    sample_rate=8000,
                    n_fft=512,
                    hop_length=256,
                    n_mels=128
                )

            feats.append(feat)

# padding the mel spectrograms to be the same size
max_height = max([x.size(1) for x in feats])
max_width = max([x.size(2) for x in feats])

feats = pad_features(
    features=feats,
    max_height=max_height,
    max_width=max_width
)
feats = torch.concat(feats, dim=0)
feats = feats.permute(0, 2, 1) # time and frequency axis permutation
feats = feats.view(test.shape[0], test.shape[1], -1, feats.shape[1], feats.shape[2])
feats.shape

torch.Size([500, 32, 5, 16, 64])

In [5]:
# All credits to: https://discuss.pytorch.org/t/any-pytorch-function-can-work-as-keras-timedistributed/1346
class TimeDistributed(nn.Module):
    """
    Mimics the Keras TimeDistributed layer.
    """

    def __init__(self, module: torch.nn.Module, batch_first: bool, layer_name: str):
        super(TimeDistributed, self).__init__()
        self.module = module
        self.batch_first = batch_first
        self.layer_name = layer_name

    def forward(self, x):

        if len(x.size()) <= 2:
            return self.module(x)

        # Squash samples and timesteps into a single axis
        x_reshape = x.contiguous().view(-1, x.size(-3), x.size(-2), x.size(-1))

        y = self.module(x_reshape)

        if self.layer_name == "convolutional" or self.layer_name == "max_pooling":

            # We have to reshape Y
            if self.batch_first:
                y = y.contiguous().view(
                    x.size(0), x.size(1), y.size(-3), y.size(-2), y.size(-1)
                )
            else:
                y = y.view(-1, x.size(1), y.size(-1))

        else:

            # We have to reshape Y
            if self.batch_first:
                y = y.contiguous().view(x.size(0), x.size(1), y.size(-1))
            else:
                y = y.view(-1, x.size(1), y.size(-1))

        return y

In [None]:
class Extract_LSTM_Output(nn.Module):
    """
    Extracts only the output from the BiLSTM layer.
    """

    def forward(self, x):
        output, _ = x[1]
        output = output.permute(1, 0, 2).flatten(start_dim=1)
        return output
    
class CNN(nn.Module):
    def __init__(self) -> None:
        super().__init__()
        self.model = nn.Sequential(
            TimeDistributed(
                nn.Conv2d(in_channels=5, out_channels=64, kernel_size=2),
                batch_first=True,
                layer_name="convolutional"
            ),
            TimeDistributed(
                nn.BatchNorm2d(64),
                batch_first=True,
                layer_name="convolutional"
            ),
            nn.ReLU(),
            TimeDistributed(
                nn.MaxPool2d(kernel_size=2),
                batch_first=True,
                layer_name="convolutional"
            ),
            TimeDistributed(
                nn.Conv2d(in_channels=64, out_channels=128, kernel_size=2),
                batch_first=True,
                layer_name="convolutional"
            ),
            TimeDistributed(
                nn.BatchNorm2d(128),
                batch_first=True,
                layer_name="convolutional"
            ),
            nn.ReLU(),
            TimeDistributed(
                nn.MaxPool2d(kernel_size=2),
                batch_first=True,
                layer_name="convolutional"
            ),
            TimeDistributed(
                nn.Flatten(),
                batch_first=True,
                layer_name="flatten"
            ),
            TimeDistributed(
                nn.Linear(in_features=5760, out_features=128),
                batch_first=True,
                layer_name="dense",
            ),
            nn.LSTM(
                input_size=128,
                hidden_size=128,
                num_layers=1,
                batch_first=True,
                bidirectional=True,
            ),
            Extract_LSTM_Output(),
            nn.Linear(
                in_features=256,
                out_features=3
            )
        )
        
    def forward(self, X: torch.Tensor) -> torch.Tensor:
        return self.model(X)

model = CNN()
output = model(feats)
output.shape