In [1]:
!pip install -qqq pytorch-lightning

In [2]:
from pytorch_lightning.loggers import WandbLogger

# Pytorch modules
import torch
from torch.nn import functional as F
from torch import nn
from torch.optim import Adam
from torch.utils.data import DataLoader, random_split

# Pytorch-Lightning
from pytorch_lightning import LightningDataModule, LightningModule, Trainer
import pytorch_lightning as pl

In [3]:
import os
import torch
from torch.utils.data import Dataset
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt 
from keras.utils import to_categorical
import librosa
%matplotlib inline

base_path = ''

class VoiceDataset(Dataset):
    def __init__(self, path_npy = os.path.join(base_path, 'mfccs.npy'), label_path=os.path.join(base_path, 'label.csv'), num_classes=5, mode='train', train_size=0.8, test_size=0.1):
        self.mode = mode 
        self.num_classes = num_classes
        self.train_size = train_size
        self.test_size = test_size
        self.labels_unique = ['can ho', 'canh sat', 'com', 'hoc sinh', 'nguoi']
        self.labels_str2nb = {'can ho': 0,
                              'canh sat': 1,
                              'com': 2,
                              'hoc sinh': 3,
                              'nguoi': 4}
        self.labels_nb2str = {0: 'can ho',
                              1: 'canh sat',
                              2: 'com',
                              3: 'hoc sinh',
                              4: 'nguoi'}

        self.mfccs = np.load(path_npy)
        self.labels = list(pd.read_csv(label_path)['label'])
        tupes = list(zip(self.mfccs, self.labels))
        
        self.data = pd.DataFrame(tupes, columns=['mfcc','label'])
        # shuffle data
        self.data = self.data.sample(frac=1, random_state=42).reset_index(drop=True)

        # split dataset
        assert 0.0 <= train_size <= 1.0
        assert 0.0 <= test_size <= 1.0

        frac1 = int(train_size * len(self.data))
        frac2 = int((1 - test_size) * len(self.data))
        if mode == 'train':
            self.data = self.data[:frac1]
        elif mode == 'valid':
            self.data = self.data[frac1:frac2].reset_index(drop=True)
        else:
            self.data = self.data[frac2:].reset_index(drop=True)




    def wav2mfcc(self, path, n_mfcc=20, max_len=30):
        wave, sr = librosa.load(file_path, mono=True, sr=None)
        wave = np.asfortranarray(wave[::3])
        mfcc = librosa.feature.mfcc(wave, sr=16000, n_mfcc=n_mfcc)

        # If maximum length exceeds mfcc lengths then pad the remaining ones
        if (max_len > mfcc.shape[1]):
            pad_width = max_len - mfcc.shape[1]
            mfcc = np.pad(mfcc, pad_width=((0, 0), (0, pad_width)), mode='constant')

        # Else cutoff the remaining parts
        else:
            mfcc = mfcc[:, :max_len]
    
        return mfcc

    def to_one_hot(self, label) -> tuple:
        one_hot = [0] * len(self.labels_unique)
        one_hot[self.labels_str2nb[label]] = 1
        return tuple(one_hot)

    def __getitem__(self, idx: int) -> tuple:
        label = self.data['label'][idx]
        label = self.labels_str2nb[label]
        mfcc = self.data['mfcc'][idx]
        mfcc = mfcc[..., np.newaxis]
        return torch.tensor(mfcc).permute(2, 0, 1), torch.tensor(label)

    def __len__(self) -> int:
        return len(self.data)

In [5]:
import multiprocessing as mproc
import pytorch_lightning as pl
from torch.utils.data import DataLoader

class VoiceDM(pl.LightningDataModule):

    def __init__(
        self,
        batch_size: int = 64,
        num_workers: int = None,
    ):
        super().__init__()
        self.batch_size = batch_size
        self.num_workers = num_workers if num_workers is not None else mproc.cpu_count()
        self.train_dataset = None
        self.valid_dataset = None
        self.test_dataset = None

    def prepare_data(self):
        pass
    
    def wav2mfcc(self, path, n_mfcc=20, max_len=30):
        wave, sr = librosa.load(path, mono=True, sr=None)
        wave = np.asfortranarray(wave[::3])
        mfcc = librosa.feature.mfcc(wave, sr=16000, n_mfcc=n_mfcc)

        # If maximum length exceeds mfcc lengths then pad the remaining ones
        if (max_len > mfcc.shape[1]):
            pad_width = max_len - mfcc.shape[1]
            mfcc = np.pad(mfcc, pad_width=((0, 0), (0, pad_width)), mode='constant')

        # Else cutoff the remaining parts
        else:
            mfcc = mfcc[:, :max_len]
    
        return mfcc 

    @property
    def num_classes(self) -> int:
        return self.train_dataset.num_classes

    def setup(self, stage=None):
        self.train_dataset = VoiceDataset(mode='train')
        print(f"training dataset: {len(self.train_dataset)}")
        self.valid_dataset = VoiceDataset(mode='valid')
        print(f"validation dataset: {len(self.valid_dataset)}")
        self.test_dataset = VoiceDataset(mode='test')
        print(f"test dataset: {len(self.test_dataset)}")

    def train_dataloader(self):
        return DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=True,
        )

    def val_dataloader(self):
        return DataLoader(
            self.valid_dataset,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=False,
        )

    def test_dataloader(self):
        return DataLoader(
            self.valid_dataset,
            batch_size=self.batch_size,
            num_workers=self.num_workers,
            shuffle=False,
        )


# ==============================
# ==============================

dm = VoiceDM()
dm.prepare_data()
dm.setup()
print(dm.num_classes)

training dataset: 2158
validation dataset: 270
test dataset: 270
5


In [16]:
class LitVoice(pl.LightningModule):

    def __init__(self, n_classes=5, lr=1e-3):
        super().__init__()

        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 16, kernel_size=3, stride=1, padding=1)
        self.linear1 = nn.Linear(16*5*7, 64)
        self.linear2 = nn.Linear(64, n_classes)
        

        self.lr = lr
        
        self.labels_str2nb = {'can ho': 0,
                              'canh sat': 1,
                              'com': 2,
                              'hoc sinh': 3,
                              'nguoi': 4}
        self.labels_nb2str = {0: 'can ho',
                              1: 'canh sat',
                              2: 'com',
                              3: 'hoc sinh',
                              4: 'nguoi'}

        self.accuracy = pl.metrics.Accuracy()

        self.save_hyperparameters()

    def forward(self, x):

        batch_size, channels, width, height = x.size()

        x = self.conv1(x)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        x = torch.flatten(x).view(batch_size, -1)
        x = self.linear1(x)
        x = F.relu(x)
        x = nn.Dropout(p=0.2)(x)
        x = self.linear2(x)
        
        return x
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        outs = self(x)
        outs = F.log_softmax(outs, dim=1)
        loss = F.nll_loss(outs, y)
        _, preds = torch.max(outs, dim=1)

        self.log('Train loss', loss)

        self.log('Train accuracy', self.accuracy(preds, y))
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        outs = self(x)
        outs = F.log_softmax(outs, dim=1)
        loss = F.nll_loss(outs, y)
        _, preds = torch.max(outs, dim=1)

        self.log('Validate loss', loss)

        self.log('Validate accuracy', self.accuracy(preds, y))

    def on_validation_end(self):
        trainer.save_checkpoint('voice_recognition_cnn.pth')
        wandb.save('voice_recognition_cnn.pth')

    def test_step(self, batch, batch_idx):
        x, y = batch
        outs = self(x)
        outs = F.log_softmax(outs, dim=1)
        loss = F.nll_loss(outs, y)
        _, preds = torch.max(outs, dim=1)

        self.log('Test loss', loss)

        self.log('Test accuracy', self.accuracy(preds, y))

    def configure_optimizers(self):
        return Adam(self.parameters(), lr=self.lr)
    
    def wav2mfcc(self, path, n_mfcc=20, max_len=30):
        wave, sr = librosa.load(path, mono=True, sr=None)
        wave = np.asfortranarray(wave[::3])
        mfcc = librosa.feature.mfcc(wave, sr=16000, n_mfcc=n_mfcc)

        # If maximum length exceeds mfcc lengths then pad the remaining ones
        if (max_len > mfcc.shape[1]):
            pad_width = max_len - mfcc.shape[1]
            mfcc = np.pad(mfcc, pad_width=((0, 0), (0, pad_width)), mode='constant')

        # Else cutoff the remaining parts
        else:
            mfcc = mfcc[:, :max_len]
    
        return mfcc 
    
    def prepare_inference_input(self, paths):
        mfccs = [self.wav2mfcc(path) for path in paths]
        mfccs = [mfcc[..., np.newaxis] for mfcc in mfccs]
        return torch.tensor(mfccs).permute(0, 3, 1, 2)
    
    def nb_2_label(self, nbs):
        return [self.labels_nb2str[nb] for nb in nbs.tolist()]
    
    def predict(self, paths):
        mfccs = self.prepare_inference_input(paths)
        outs = self(mfccs)
        outs = F.log_softmax(outs, dim=1)
        _, preds = torch.max(outs, dim=1)
        return self.nb_2_label(preds)

In [17]:
# setup data
dm = VoiceDM()

# setup model
model = LitVoice()
ckpt = torch.load('voice_recognition_cnn_tv.pth', map_location=torch.device('cpu'))

In [18]:
model.load_state_dict(ckpt['state_dict'])

<All keys matched successfully>

In [33]:
# paths = []
# for path in os.listdir('data/bed/'):
#     paths.append('data/bed/' + path)

paths = ['all/com/010.wav']

# outs = model(x)
# outs = F.log_softmax(outs, dim=1)
# _, preds = torch.max(outs, dim=1)
# preds.bincount()
preds = model.predict(paths)
preds

['com']

In [36]:
!pip install IPython



In [49]:
import IPython.display as ipd

sig, sr = librosa.load('happy3.wav')
print(sr)

22050
