<a href="https://colab.research.google.com/github/manashpratim/Speech-To-Phenome/blob/master/HW3P2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from google.colab import drive
drive.mount('/content/drive')

In [0]:
import numpy as np
import torch
from torch import nn
from torch.nn.utils.rnn import *
from torch.utils.data import Dataset, DataLoader, TensorDataset
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
DEVICE

In [0]:
!git clone --recursive https://github.com/parlance/ctcdecode.git
!pip install wget
%cd ctcdecode
!pip install .
%cd ..

In [0]:
%cd /content/drive/My Drive/11-785-s20-hw3p2/hw3p2/
from phoneme_list import PHONEME_MAP as phonemes
from phoneme_list import PHONEME_LIST as phonemes_list
%cd /content/

In [0]:
!pip install python-Levenshtein
import Levenshtein

In [0]:
train = np.load('/content/drive/My Drive/11-785-s20-hw3p2/hw3p2/wsj0_train',allow_pickle=True,encoding = 'latin1')
train_labels = np.load('/content/drive/My Drive/11-785-s20-hw3p2/hw3p2/wsj0_train_merged_labels.npy',allow_pickle=True,encoding = 'latin1')
dev = np.load('/content/drive/My Drive/11-785-s20-hw3p2/hw3p2/wsj0_dev.npy',allow_pickle=True,encoding = 'latin1')
dev_labels =  np.load('/content/drive/My Drive/11-785-s20-hw3p2/hw3p2/wsj0_dev_merged_labels.npy',allow_pickle=True,encoding = 'latin1')
test = np.load('/content/drive/My Drive/11-785-s20-hw3p2/hw3p2/wsj0_test',allow_pickle=True,encoding = 'latin1')

In [0]:
def preprocessing(arr,flag=False):
    mean =  np.mean(arr,axis=0)
    std = np.std(arr,axis=0)
    if flag:
      arr = (arr-mean)/std
    else:
      arr = (arr-mean)
    return arr

In [0]:
class MyDataset(Dataset):
  
    def __init__(self,x,y):
         self.X = [torch.FloatTensor(preprocessing(word,flag=True)) for word in x]
         self.Y = [torch.LongTensor(word) for word in y]
    
    def __getitem__(self,i):

        return self.X[i],self.Y[i]
    
    def __len__(self):
        return len(self.X )

def collate(seq_list):
    inputs = [i[0] for i in seq_list]
    targets = [i[1] for i in seq_list]
    inputs = pad_sequence(inputs)
    targets = pad_sequence(targets,batch_first=True, padding_value = 100000)
    X_lens = torch.LongTensor([len(seq[0]) for seq in seq_list])
    Y_lens = torch.LongTensor([len(seq[1]) for seq in seq_list])
    return inputs,targets,X_lens,Y_lens

In [0]:
class TestDataset(Dataset):
    
    def __init__(self,x):
         self.X = [torch.FloatTensor(preprocessing(word,flag=True)) for word in x]
    
    def __getitem__(self,i):

        return self.X[i]
    
    def __len__(self):
        return len(self.X )

def collate_test(seq_list):
    inputs = [i for i in seq_list]
    inputs = pad_sequence(inputs)
    X_lens = torch.LongTensor([len(seq) for seq in seq_list])
    return inputs,X_lens

In [0]:
train_dataset = MyDataset(train,train_labels)
val_dataset = MyDataset(dev,dev_labels)
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=64, collate_fn = collate,num_workers=8, pin_memory=True)
val_loader = DataLoader(val_dataset, shuffle=False, batch_size=64, collate_fn = collate,num_workers=8, pin_memory=True)

In [0]:
test_dataset = TestDataset(test)
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=64, collate_fn = collate_test,num_workers=8, pin_memory=True)

In [0]:
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.cnn1 = torch.nn.Conv1d(40, 256, 1, stride=1, padding=0,bias=False)
        self.cnn2 = torch.nn.Conv1d(256, 256, 1, stride=1, padding=0,bias=False)
        self.tanh = nn.Hardtanh()
        self.lstm1 = nn.LSTM(256, 512, bidirectional=True,num_layers=6,dropout=0.2)
        self.output = nn.Linear(512*2, 47)
    
    def forward(self, X, lengths):

        X = self.tanh(self.cnn1(X.transpose(1,2)))
        X = self.tanh(self.cnn2(X))
        X = X.transpose(1,2)
        packed_X = pack_padded_sequence(X, lengths, enforce_sorted=False)
        packed_out = self.lstm1(packed_X)[0]
        out, out_lens = pad_packed_sequence(packed_out)
        out = self.output(out)
        return out, out_lens

In [0]:
def init_weights(m):
    if type(m) == nn.Conv1d or type(m) == nn.Linear:
        torch.nn.init.xavier_uniform_(m.weight.data)

In [0]:
model = Model()
criterion = nn.CTCLoss(blank = 46)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
model.apply(init_weights)
model = model.to(DEVICE)

In [0]:
from ctcdecode import CTCBeamDecoder
decoder = CTCBeamDecoder(['$']*47, beam_width=100, log_probs_input=True,blank_id = 46)

In [0]:
criterion = criterion.to(DEVICE)
for epoch in range(10):
  los = 0 
  total = 0
  model.train()
  for batch_idx, (inputs,targets,xlens,ylens) in enumerate(train_loader):
      optimizer.zero_grad()
      inputs = inputs.to(DEVICE)
      targets = targets.to(DEVICE)
      ylens = ylens.to(DEVICE)
      xlens = xlens.to(DEVICE)
      out, out_lens = model(inputs, xlens)
      out = out.log_softmax(2)
      loss = criterion(out, targets, out_lens, ylens)
      los = los + loss.item()
      loss.backward()
      optimizer.step()
      del inputs,loss,out,targets,out_lens,xlens,ylens


  print('Epoch', epoch + 1, 'Loss', los/ len(train_loader))

  with torch.no_grad():
      model.eval()
      dis = 0
      tot = 0 
      los1 = 0
      for batch_idx, (inputs,targets,xlens,ylens) in enumerate(val_loader):
        inputs = inputs.to(DEVICE)
        xlens = xlens.to(DEVICE)
        out, out_lens = model(inputs, xlens)
        out = out.log_softmax(2)
        loss = criterion(out, targets.to(DEVICE), out_lens, ylens.to(DEVICE))
        los1 = los1 + loss.item()
        out = out.cpu()
        out_lens = out_lens.cpu()
        test_Y, _, _, test_Y_lens = decoder.decode(out.transpose(0, 1), out_lens)
        
        for i in range(out.size()[1]):
            best_seq = test_Y[i, 0, :test_Y_lens[i, 0]]
            best_pron = ''.join(phonemes[j] for j in best_seq)
            best_tar = ''.join(phonemes[k] for k in targets[i] if k!=100000)
            dis = dis +  Levenshtein.distance(best_pron, best_tar)
            tot = tot + 1
        del inputs,targets,out_lens,del out,xlens
      print('Distance: ',dis/tot)
      print('Loss (Val): ',los1/len(val_loader))
     
    

In [0]:
torch.save(model.state_dict(), '/content/drive/My Drive/modelhw32.pt')

In [0]:
output = []
with torch.no_grad():
      model.eval()
      for batch_idx, (inputs,xlens) in enumerate(test_loader):
        inputs = inputs.float().to(DEVICE)
        
        xlens = xlens.to(DEVICE)
        out, out_lens = model(inputs, xlens)
        out = out.cpu()
        out_lens = out_lens.cpu()
        test_Y, _, _, test_Y_lens = decoder.decode(out.transpose(0, 1), out_lens)
        
        for i in range(out.size()[1]):
            best_seq = test_Y[i, 0, :test_Y_lens[i, 0]]
            best_pron = ''.join(phonemes[j] for j in best_seq)
            output.append(best_pron)
        del inputs,out_lens,out,xlens

In [0]:
with open('/content/drive/My Drive/mbarman.csv', 'w') as w:
    w.write('Id,Predicted\n')
    for i in range(len(output)):
            w.write(str(i)+','+str(output[i])+'\n')