<a href="https://colab.research.google.com/github/ukenia/quantization-cnn/blob/starter-code/Wav2Letter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!nvidia-smi

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
BASE_PATH = "/content/gdrive/MyDrive/idl_project/"
DATA_PATH = BASE_PATH + "data/"
MODEL_PATH = BASE_PATH + "models/"
PREDICTION_PATH = BASE_PATH + "predictions/"

In [None]:
# Files from drive
train_filename = DATA_PATH + "train.npy"
train_transcripts_filename = DATA_PATH + "train_transcripts.npy"

dev_filename = DATA_PATH + "dev.npy"
dev_transcripts_filename = DATA_PATH + "dev_transcripts.npy"

test_filename = DATA_PATH + "test.npy"

In [7]:
!pip install torchaudio



In [9]:
import torchaudio

In [None]:
# Installing CTC Decoder
!git clone --recursive https://github.com/parlance/ctcdecode.git
!cd ctcdecode && pip install .

In [None]:
# Code starts here!

In [8]:
# Import necessary libraries

import timeit
import os
import sys
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import torch.optim as optim
from torch.nn.utils.rnn import *

# from ctcdecode import CTCBeamDecoder
from datetime import datetime as dt

In [5]:
# Check if cuda is available

cuda = torch.cuda.is_available()
num_workers = 4 if cuda else 0
print("Cuda = "+str(cuda)+" with num_workers = "+str(num_workers))

def get_device():
    if torch.cuda.is_available():
        device = 'cuda:0'
    else:
        device = 'cpu'
    return device
device = get_device()

Cuda = False with num_workers = 0


In [12]:
# model = torchaudio.models.Wav2Letter()
# print(model)

torchaudio.models

AttributeError: ignored

In [None]:
class Wav2LetterDataset(Dataset):
    def __init__(self, x_path, y_path):
        self.X = np.load(x_path, allow_pickle=True)
        self.y = np.load(y_path, allow_pickle=True)

        self.length = self.X.shape[0]

     def __len__(self):
        return self.length

    def __getitem__(self, index):
        x = self.X[index]
        y = self.y[index]

        return x, y

def pad_collate():
  # Need to pad based on the model for Wav2Letter
  return

In [None]:
hyperparameters = {
    "batch_size": 32
    "epochs": 50
    "learning_rate": 5e-3
    "weight_decay": 1e-5
}

In [None]:
train_data = RnnDataset(train_filename, train_labels_filename)
train_args = dict(shuffle=True, batch_size=hyperparameters["batch_size"], num_workers=num_workers, drop_last=True, collate_fn=pad_collate)
train_loader = DataLoader(train_data, **train_args)

val_data = RnnDataset(dev_filename, dev_labels_filename)
val_args = dict(shuffle=True, batch_size=hyperparameters["batch_size"], num_workers=num_workers, drop_last=True, collate_fn=pad_collate)
val_loader = DataLoader(val_data, **val_args)

In [None]:
class ConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding):
        super(ConvBlock, self).__init__()
        self.conv = nn.Conv1d(in_channels=acoustic_num_features, out_channels=250, kernel_size=48, stride=2, padding=23),
        self. relu = nn.ReLU(inplace=True)

    def forward(self, x):
        out = self.conv(out)
        out = self.relu(out)

        return out

In [None]:
class Wav2Letter(nn.Module):

    def __init__(self, num_classes = 42, num_features = 40):
        super(Wav2Letter, self).__init__()

        model = nn.Sequential(
            ConvBlock(in_channels=num_features, out_channels=250, kernel_size=48, stride=2, padding=23)

            ConvBlock(in_channels=250, out_channels=250, kernel_size=7, stride=1, padding=3),
            ConvBlock(in_channels=250, out_channels=250, kernel_size=7, stride=1, padding=3),
            ConvBlock(in_channels=250, out_channels=250, kernel_size=7, stride=1, padding=3),
            ConvBlock(in_channels=250, out_channels=250, kernel_size=7, stride=1, padding=3),
            ConvBlock(in_channels=250, out_channels=250, kernel_size=7, stride=1, padding=3),
            ConvBlock(in_channels=250, out_channels=250, kernel_size=7, stride=1, padding=3),
            ConvBlock(in_channels=250, out_channels=250, kernel_size=7, stride=1, padding=3),

            ConvBlock(in_channels=250, out_channels=2000, kernel_size=32, stride=1, padding=16),
            ConvBlock(in_channels=2000, out_channels=2000, kernel_size=1, stride=1, padding=0),
            ConvBlock(in_channels=2000, out_channels=num_classes, kernel_size=1, stride=1, padding=0)
        )
        
        self.model = model
        self.log_softmax = nn.LogSoftmax(dim=1)
    
    def forward(self, x):
        # Input - (batch_size, num_features, input_length)
        out = self.model(x)
        out = self.log_softmax(out)
        return out


In [None]:
model = Wav2LetterModel()
model.to(device)

In [None]:
criterion = nn.CTCLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=hyperparameters["learning_rate"], weight_decay=hyperparameters["weight_decay"])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', factor=0.1, patience=5) # OR can use some other scheduler

In [None]:
# Train the model - Change based on the model

def train_model(train_loader, model):
    training_loss = 0
    
    # Set model in 'Training mode'
    model.train()
    
    # enumerate mini batches
    for i, (inputs, targets) in enumerate(train_loader):

        inputs = inputs.to(device)
        targets = targets.to(device)
        
        # clear the gradients
        optimizer.zero_grad()
        
        # compute the model output
        out = model(inputs.float())
        
        # calculate loss
        loss = criterion(out, targets)
        
        # Backward pass
        loss.backward()
        
        # Update model weights
        optimizer.step()

        training_loss += loss.item()
    training_loss /= len(train_loader)
    return training_loss

In [None]:
# Evaluate the model - Change based on the model

def evaluate_model(val_loader, model):
    
    model.eval()
    
    # enumerate mini batches
    for i, (inputs, targets) in enumerate(train_loader):

        inputs = inputs.to(device)
        targets = targets.to(device)
        
        # compute the model output
        out = model(inputs.float())
        
        # calculate loss
        loss = criterion(out, targets)
        

    return loss.item()

In [None]:
for epoch in range(epochs):
    print("Epoch: ", epoch)

    if epoch % 10 == 0 and epoch != 0:
      torch.save(model, MODEL_PATH + "base_model.pth")

    # Train
    starttime = timeit.default_timer()
    training_loss = train_model(train_loader, model)
    endtime = timeit.default_timer()
    print("Training time: ", (endtime - starttime)/60)

    # Validation
    starttime = timeit.default_timer()
    val_dist, val_loss = evaluate_model(val_loader, model)
    endtime = timeit.default_timer()
    print("Validation time: ", (endtime - starttime)/60)

    scheduler.step()

    # Print log of accuracy and loss
    print("Epoch: "+str(epoch)+", Training loss: "+str(training_loss)+", Validation loss: "+str(val_loss)+
          ", Validation distance: "+str(val_dist)+", LR: "+str(scheduler.get_last_lr())+"\n")