<a href="https://colab.research.google.com/github/wjzhan/ML_Lib/blob/main/ML2023/DNN_VoiceClassification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Phoneme Classification**


Objectives:
* Solve a classification problem with deep neural networks (DNNs).
* Understand recursive neural networks (RNNs).

source: https://speech.ee.ntu.edu.tw/~hylee/ml/2023-spring.php

# Mount to Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# %cd /content/drive/MyDrive/ML2023
# !mkdir ClassificationDNN
# %cd ./ClassificationDNN
%cd /content/drive/MyDrive/ML2023/ClassificationDNN

/content/drive/MyDrive/ML2023/ClassificationDNN


In [None]:
!pwd

/content/drive/MyDrive/ML2023/ClassificationDNN


# Download Data
Download data from google drive, then unzip it.

You should have
- `libriphone/train_split.txt`: training metadata
- `libriphone/train_labels`: training labels
- `libriphone/test_split.txt`: testing metadata
- `libriphone/feat/train/*.pt`: training feature
- `libriphone/feat/test/*.pt`:  testing feature

after running the following block.

> **Notes: if the google drive link is dead, you can download the data directly from [Kaggle](https://www.kaggle.com/c/ml2023spring-hw2/data) and upload it to the workspace.**


In [None]:
# !pip install --upgrade gdown

# Main link
# !gdown --id '1N1eVIDe9hKM5uiNRGmifBlwSDGiVXPJe' --output libriphone.zip
# !gdown --id '1qzCRnywKh30mTbWUEjXuNT2isOCAPdO1' --output libriphone.zip

# !unzip -q libriphone.zip
# !ls libriphone

Downloading...
From: https://drive.google.com/uc?id=1qzCRnywKh30mTbWUEjXuNT2isOCAPdO1
To: /content/drive/MyDrive/ML2023/ClassificationDNN/libriphone.zip
100% 384M/384M [00:08<00:00, 44.9MB/s]
feat  test_split.txt  train_labels.txt	train_split.txt


In [None]:
# !rm libriphone.zip

# Import packages

In [None]:
# Numerical Operations
# import math
import numpy as np
import random

# Reading/Writing Data
# import pandas as pd
import os
# import csv

# For Progress Bar
from tqdm import tqdm

# Pytorch
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# Garbage Collection
import gc

# For plotting learning curve
from torch.utils.tensorboard import SummaryWriter

# Some Utility Functions
**Fixes random number generator seeds for reproducibility.**

In [None]:
def same_seed(seed):
    '''Fixes random number generator seeds for reproducibility.'''
    random.seed(seed)
    torch.manual_seed(seed)
    np.random.seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

**Helper functions to pre-process the training data from raw MFCC features of each utterance.**

A phoneme may span several frames and is dependent to past and future frames. \
Hence we concatenate neighboring phonemes for training to achieve higher accuracy. The **concat_feat** function concatenates past and future k frames (total 2k+1 = n frames), and we predict the center frame.

Feel free to modify the data preprocess functions, but **do not drop any frame** (if you modify the functions, remember to check that the number of frames are the same as mentioned in the slides)

In [None]:
def load_feat(path):
    feat = torch.load(path)
    return feat

def shift(x, n):
    if n < 0:
        left = x[0].repeat(-n, 1)
        right = x[:n]
    elif n > 0:
        right = x[-1].repeat(n, 1)
        left = x[n:]
    else:
        return x

    return torch.cat((left, right), dim=0)

def concat_feat(x, concat_n):
    assert concat_n % 2 == 1 # n must be odd
    if concat_n < 2:
        return x
    seq_len, feature_dim = x.size(0), x.size(1)
    x = x.repeat(1, concat_n)
    x = x.view(seq_len, concat_n, feature_dim).permute(1, 0, 2) # concat_n, seq_len, feature_dim
    mid = (concat_n // 2)
    for r_idx in range(1, mid+1):
        x[mid + r_idx, :] = shift(x[mid + r_idx], r_idx)
        x[mid - r_idx, :] = shift(x[mid - r_idx], -r_idx)

    return x.permute(1, 0, 2).view(seq_len, concat_n * feature_dim)

def preprocess_data(split, feat_dir, phone_path, concat_nframes, train_ratio=0.8, random_seed=1213):
    class_num = 41 # NOTE: pre-computed, should not need change

    if split == 'train' or split == 'val':
        mode = 'train'
    elif split == 'test':
        mode = 'test'
    else:
        raise ValueError('Invalid \'split\' argument for dataset: PhoneDataset!')

    label_dict = {}
    if mode == 'train':
        for line in open(os.path.join(phone_path, f'{mode}_labels.txt')).readlines():
            line = line.strip('\n').split(' ')
            # label_dict key is pt file's name
            label_dict[line[0]] = [int(p) for p in line[1:]]

        # split training and validation data
        usage_list = open(os.path.join(phone_path, 'train_split.txt')).readlines()
        random.seed(random_seed)
        random.shuffle(usage_list)
        train_len = int(len(usage_list) * train_ratio)
        usage_list = usage_list[:train_len] if split == 'train' else usage_list[train_len:]

    elif mode == 'test':
        usage_list = open(os.path.join(phone_path, 'test_split.txt')).readlines()

    usage_list = [line.strip('\n') for line in usage_list]
    print('[Dataset] - # phone classes: ' + str(class_num) + ', number of utterances for ' + split + ': ' + str(len(usage_list)))

    max_len = 3000000
    X = torch.empty(max_len, 39 * concat_nframes)
    if mode == 'train':
        y = torch.empty(max_len, dtype=torch.long)

    idx = 0
    for i, fname in tqdm(enumerate(usage_list)):
        feat = load_feat(os.path.join(feat_dir, mode, f'{fname}.pt'))
        cur_len = len(feat)
        feat = concat_feat(feat, concat_nframes)
        if mode == 'train':
          label = torch.LongTensor(label_dict[fname])

        X[idx: idx + cur_len, :] = feat
        if mode == 'train':
          y[idx: idx + cur_len] = label

        idx += cur_len

    X = X[:idx, :]
    if mode == 'train':
      y = y[:idx]

    print(f'[INFO] {split} set')
    print(X.shape)
    if mode == 'train':
      print(y.shape)
      return X, y
    else:
      return X


# Dataset

In [None]:
class LibriDataset(Dataset):
    def __init__(self, X, y=None):
        self.data = X
        if y is not None:
            self.label = torch.LongTensor(y)
        else:
            self.label = None

    def __getitem__(self, idx):
        if self.label is not None:
            return self.data[idx], self.label[idx]
        else:
            return self.data[idx]

    def __len__(self):
        return len(self.data)


# Model
Feel free to modify the structure of the model.

In [None]:
class BasicBlock(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(BasicBlock, self).__init__()

        # apply batch normalization and dropout for strong baseline.
        # Reference: https://pytorch.org/docs/stable/generated/torch.nn.BatchNorm1d.html (batch normalization)
        #       https://pytorch.org/docs/stable/generated/torch.nn.Dropout.html (dropout)
        self.block = nn.Sequential(
            nn.Linear(input_dim, output_dim),
            nn.BatchNorm1d(output_dim, track_running_stats=False),
            nn.ReLU(),
            nn.Dropout(0.5),
        )

    def forward(self, x):
        x = self.block(x)
        return x


class Classifier(nn.Module):
    def __init__(self, input_dim, output_dim=41, hidden_layers=1, hidden_dim=256):
        super(Classifier, self).__init__()

        self.fc = nn.Sequential(
            BasicBlock(input_dim, hidden_dim),
            *[BasicBlock(hidden_dim, hidden_dim) for _ in range(hidden_layers)],
            nn.Linear(hidden_dim, output_dim)
        )

    def forward(self, x):
        x = self.fc(x)
        return x

# Training Loop

In [None]:
def trainer(train_loader, valid_loader, model, config, device):

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=config['learning_rate'])

    writer = SummaryWriter() # Writer of tensoboard.

    if not os.path.isdir('./models'):
        os.mkdir('./models') # Create directory of saving models.

    num_epoch, best_acc, step, early_stop_count = config['num_epoch'], 0.0, 0, 0

    for epoch in range(num_epoch):
        train_acc = 0.0
        train_loss = 0.0
        val_acc = 0.0
        val_loss = 0.0
        train_batch = 0
        val_batch = 0

        # training
        model.train() # set the model to training mode
        train_pbar = tqdm(train_loader)
        for features, labels in train_pbar:
              optimizer.zero_grad()   # Set gradient to zero.
              features, labels = features.to(device), labels.to(device)   # Move data to device.
              outputs = model(features)
              loss = criterion(outputs, labels)
              loss.backward()
              optimizer.step()
              step += 1
              train_batch += 1

              # Display current epoch number and loss on tqdm progress bar.
              train_pbar.set_description(f'Epoch [{epoch+1}/{num_epoch}]')
              train_pbar.set_postfix({'loss': loss.detach().item()})

              _, train_pred = torch.max(outputs, 1) # get the index of the class with the highest probability
              train_acc += ((train_pred.detach() == labels.detach()).sum().item())/len(labels)
              train_loss += loss.item()

        mean_train_loss = train_loss/train_batch
        writer.add_scalar('Loss/train', mean_train_loss, step)
        mean_train_acc = train_acc/train_batch
        writer.add_scalar('ACC/train', mean_train_acc, step)


        # validation
        model.eval() # set the model to evaluation mode
        val_pbar = tqdm(val_loader)
        with torch.no_grad():
            for features, labels in val_pbar:
                features, labels = features.to(device), labels.to(device)
                outputs = model(features)
                loss = criterion(outputs, labels)
                val_batch += 1
                _, val_pred = torch.max(outputs, 1)
                val_acc += ((val_pred.cpu() == labels.cpu()).sum().item())/len(labels) # get the index of the class with the highest probability
                val_loss += loss.item()

        mean_valid_loss = val_loss/val_batch
        writer.add_scalar('Loss/valid', mean_valid_loss, step)
        mean_valid_acc = val_acc/val_batch
        writer.add_scalar('ACC/valid', mean_valid_acc, step)

        print(f'[{epoch+1:03d}/{num_epoch:03d}] Train Acc: {mean_train_acc:3.5f} Loss: {mean_train_loss:3.5f} | Val Acc: {mean_valid_acc:3.5f} loss: {mean_valid_loss:3.5f}')

        # if the model improves, save a checkpoint at this epoch
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), config['model_path'])
            print(f'saving model with acc {best_acc/len(val_set):.5f}')
            early_stop_count = 0
        else:
            early_stop_count += 1

        if early_stop_count >= config['early_stop']:
            print('\nModel is not improving, so we halt the training session.')
            return

# Hyper-parameters

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'DEVICE: {device}')

config = {
    # data prarameters
    # change the value of "concat_nframes" for medium baseline
    'concat_nframes': 3,   # the number of frames to concat with, n must be odd (total 2k+1 = n frames)
    'train_ratio': 0.75,   # the ratio of data used for training, the rest will be used for validation

    # training parameters
    'seed': 1213,          # random seed
    'batch_size': 512,        # batch size
    'num_epoch': 10,         # the number of training epoch
    'learning_rate': 1e-4,      # learning rate
    'early_stop': 6,
    'model_path': './models/model.ckpt',  # the path where the checkpoint will be saved

    # model parameters
    # change the value of "hidden_layers" or "hidden_dim" for medium baseline
    'hidden_layers': 3,          # the number of hidden layers
    'hidden_dim': 256           # the hidden dim
}

# the input dim of the model, you should not change the value
config['input_dim'] = 39 * config['concat_nframes']

DEVICE: cuda


# Dataloader

In [None]:
same_seed(config['seed'])

# preprocess data
train_X, train_y = preprocess_data(split='train', feat_dir='./libriphone/feat', phone_path='./libriphone', concat_nframes=config['concat_nframes'], train_ratio=config['train_ratio'], random_seed=config['seed'])
val_X, val_y = preprocess_data(split='val', feat_dir='./libriphone/feat', phone_path='./libriphone', concat_nframes=config['concat_nframes'], train_ratio=config['train_ratio'], random_seed=config['seed'])

# get dataset
train_set = LibriDataset(train_X, train_y)
val_set = LibriDataset(val_X, val_y)

# remove raw feature to save memory
del train_X, train_y, val_X, val_y
gc.collect()

# get dataloader
train_loader = DataLoader(train_set, batch_size=config['batch_size'], shuffle=True)
val_loader = DataLoader(val_set, batch_size=config['batch_size'], shuffle=False)

[Dataset] - # phone classes: 41, number of utterances for train: 2571


2571it [00:07, 343.41it/s]


[INFO] train set
torch.Size([1588590, 117])
torch.Size([1588590])
[Dataset] - # phone classes: 41, number of utterances for val: 858


858it [00:01, 453.42it/s]

[INFO] val set
torch.Size([528204, 117])
torch.Size([528204])





# Training

In [None]:
# create model, define a loss function, and optimizer
model = Classifier(input_dim=config['input_dim'], hidden_layers=config['hidden_layers'], hidden_dim=config['hidden_dim']).to(device)

trainer(train_loader, val_loader, model, config, device)

In [None]:
del train_set, val_set
del train_loader, val_loader
gc.collect()

703

# Plot learning curves with `tensorboard` (optional)

`tensorboard` is a tool that allows you to visualize your training progress.

If this block does not display your learning curve, please wait for few minutes, and re-run this block. It might take some time to load your logging information.

In [None]:
%reload_ext tensorboard
%tensorboard --logdir=./runs/

# Testing
Create a testing dataset, and load model from the saved checkpoint.

In [None]:
# load data
test_X = preprocess_data(split='test', feat_dir='./libriphone/feat', phone_path='./libriphone', concat_nframes=config['concat_nframes'])
test_set = LibriDataset(test_X, None)
test_loader = DataLoader(test_set, batch_size=config['batch_size'], shuffle=False)

[Dataset] - # phone classes: 41, number of utterances for test: 857


857it [03:19,  4.29it/s]

[INFO] test set
torch.Size([527364, 117])





In [None]:
# load model
model = Classifier(input_dim=config['input_dim'], hidden_layers=config['hidden_layers'], hidden_dim=config['hidden_dim']).to(device)
model.load_state_dict(torch.load(config['model_path']))

<All keys matched successfully>

Make prediction.

In [None]:
pred = np.array([], dtype=np.int32)

model.eval()
with torch.no_grad():
    for i, batch in enumerate(tqdm(test_loader)):
        features = batch
        features = features.to(device)

        outputs = model(features)

        _, test_pred = torch.max(outputs, 1) # get the index of the class with the highest probability
        pred = np.concatenate((pred, test_pred.cpu().numpy()), axis=0)


Write prediction to a CSV file.

After finish running this block, download the file `prediction.csv` from the files section on the left-hand side and submit it to Kaggle.

In [None]:
with open('prediction.csv', 'w') as f:
    f.write('Id,Class\n')
    for i, y in enumerate(pred):
        f.write('{},{}\n'.format(i, y))