In [None]:
import csv
from typing import List, Dict, Union, Tuple

# session_id, text1_ids, text2_ids, label(default: -1)
ExampleType = Tuple[str, List[int], List[int], float]

def load_examples(dataset_path: str) -> List[ExampleType]:
    examples = []
    with open(dataset_path) as f:
        csv_reader = csv.DictReader(f)
        for example in csv_reader:
            text1_ids = [int(token_id) + 1 for token_id in example["sentence1"].split(" ")]
            text2_ids = [int(token_id) + 1 for token_id in example["sentence2"].split(" ")]
            label = float(example["label"]) if "label" in example else -1.0
            examples.append((example["id"], text1_ids, text2_ids, label))
    return examples

In [None]:
trainable_examples = load_examples("train.csv")
dev_split_index = int(len(trainable_examples) * 0.8)

train_examples = trainable_examples[:dev_split_index]
dev_examples = trainable_examples[dev_split_index:]
test_examples = load_examples("test.csv")

In [1]:
import torch
from torch import nn
from torch.nn import functional as fnn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam
from torch.optim import Adadelta
#from transformers import get_constant_schedule_with_warmup
from torch.nn.utils.clip_grad import clip_grad_norm_
from tqdm.notebook import tqdm

from torch.nn.utils.rnn import pad_sequence

In [None]:
hidden_size = 128
embedding_size = 256
enc_bank_size = 16
enc_proj_size = 128
ppn_bank_size = 8
ppn_proj_size = 80
dropout = 0.5

In [None]:
class PreNet(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, dropout: float):
        super().__init__()
        self.fc1 = nn.Linear(input_size,  2*hidden_size)
        self.fc2 = nn.Linear(2*hidden_size, hidden_size//2)
        self.dropout1 = nn.Dropout(p=dropout)
        self.dropout2 = nn.Dropout(p=dropout)

    def forward(self, inputs):
        outputs = F.relu(self.fc1(inputs))
        outputs = self.dropout1(outputs)
        outputs = F.relu(self.fc2(outputs))
        outputs = self.dropout2(outputs)

        return outputs

In [None]:
class CBHG(nn.Module):
    def __init__(self, hidden_size, K, proj_out):
        '''
        Args:
            K(obj: int)
                bank size
            proj_out
                dimension of output of conv1d projection
        '''
        super().__init__()
        self.batch_norm = nn.BatchNorm1d(hidden_size)
        self.conv1d_list = nn.ModuleList([
            nn.Sequential(
                nn.ConstantPad1d(((k-1)//2, (k-1)//2 +1),0),
                nn.Conv1d(128, 128, k)
            ) 
            for k in range(K)]) 
        self.maxpool = nn.MaxPool1d(kernel_size=2, stride=1)
        self.conv1d_proj = nn.Sequential(
            nn.Conv1d(128, 256, 3),
            nn.Conv1d(256, proj_out, 3)
        )
        self.highway = nn.ModuleList([
            nn.Sequential(
                nn.Linear(128, 128),
                F.relu()
            )
            for _ in range(4)])

        self.gru = nn.GRU(128, 128, num_layers = 1, batch_first = True, bidirectional = True)
    
    def forward(self, inputs):
        #inputs shape (batch_size, seq_length, hidden_size)
        
        #Conv1D bank and stacking
        conv_output = []
        for conv in self.conv1d_list:
            conv_output.append(conv(inputs))
        bank_output = torch.stack(conv_output, dim = -1)
        bank_output = self.batch_norm(bank_output)

        #Maxpooling
        maxpool_output = self.maxpool(bank_output)

        #Conv1D projection and Residual connection
        proj_output = self.conv1d_proj(maxpool_output) + maxpool_output
        proj_output = self.batch_norm(proj_output)
        
        #Highway network
        highway_output = proj_output
        for layer in self.highway:
            highway_output = layer(highway_output)

        #bidirectional GRU
        CBHG_output = self.gru(highway_output)

        return CBHG_output


In [None]:
class TanhAttention():
    def __init__(self, hidden_dim) -> None:
        '''
        Args:
            hidden_dim(`int`):
                hidden_size of encoder outputs
        '''
        self.W1 = nn.Linear(hidden_dim, hidden_dim)
        self.W2 = nn.Linear(hidden_dim, hidden_dim)
        self.v = nn.Linear(hidden_dim, 1)
        self.tanh = nn.Tanh()

    def forward(self, key, query):
        query_repeated = query.unsqueeze(1).repeat(1,key.size(1),1,1)

        attention = self.v(self.tanh(self.W1(key) + self.W2(query_repeated)))
        weight = nn.Softmax(dim=1)(attention)
        context = torch.matmul(weight.transpose(1,2), key).squeeze(1)

        return context

In [None]:
class Encoder(nn.Module):
    def __init__(self, vocab_size: int, hidden_size: int, embedding_size:int):
        super().__init__()
        self.char_emb = nn.Embedding(vocab_size, embedding_size, padding_idx=0)
        self.pre_net = PreNet(embedding_size, hidden_size, dropout)
        self.cbhg = CBHG(hidden_size, enc_bank_size, enc_proj_size)
    
    def forward(self, inputs):
        word_emb = self.char_emb(inputs)
        prenet_outputs = self.pre_net(word_emb)
        enc_outputs = self.cbhg(prenet_outputs)

        return enc_outputs

In [None]:
class Decoder(nn.Module):
    def __init__(self, hidden_size):
        super().__init__()
        self.pre_net = PreNet()
        self.attention = TanhAttention(2*hidden_size)
        self.attn_rnn = nn.GRU(hidden_size, hidden_size, num_layers = 1)
        self.dec_rnn1 = nn.GRU(4*hidden_size, 2*hidden_size, num_layers = 2)
        self.dec_rnn2 = nn.GRU(2*hidden_size, 2*hidden_size, num_layers = 2)
        self.fc = nn.Linear(hidden_size, 2 * hidden_size)
    
    def forward(self, enc_outputs, dec_inputs, reduction):
        '''
        Args:
            dec_inputs (batch_size, 256, dec_steps)
        '''
        batch_size = dec_inputs.size(0)
        total_steps = dec_inputs.size(2) // reduction
        inputs = torch.cat([
            torch.zero(batch_size, hidden_size),
            dec_inputs[:,:,1:]
        ], dim=-1)
        

        attn_hidden = torch.zero(batch_size, hidden_size)
        dec_hidden1 = torch.zero(batch_size, hidden_size)
        dec_hidden1 = torch.zero(batch_size, hidden_size)

        outputs = []
        for i in range(total_steps):
            if self.training:
                prenet_inputs = inputs[i]
            else:
                prenet_inputs = mel_outputs

            prenet_outputs = self.pre_net(prenet_inputs)

            attn_outputs, attn_hidden = self.attn_rnn(prenet_outputs, attn_hidden)
            context = self.attention(enc_outputs, attn_outputs)

            dec_inputs = torch.cat([attn_outputs, context], dim = -1)
            dec_outputs, dec_hidden1 = self.dec_rnn1(dec_inputs, dec_hidden1) + dec_inputs

            dec_outputs, dec_hidden2 = self.dec_rnn2(dec_outputs, dec_hidden2) + dec_outputs

            dec_outputs = self.fc(dec_outputs).view(batch_size, total_steps, -1)
            mel_outputs = dec_outputs[:,:,-1]

            outputs.append(mel_outputs)
        
        return torch.stack(outputs, dim=-1)



In [None]:
from torchaudio.transforms import GriffinLim
sample_rate = 24000
frame_shift = 12.5
frame_length = 50
n_iter = 30
n_fft = 2480
hop_length = int(frame_shift * 0.001/sample_rate)
win_length = int(frame_length * 0.001/sample_rate)

class Vocoder(nn.Module):
    def __init__(self):
        super().__init__()
        self.cbhg = CBHG(hidden_size, ppn_bank_size, ppn_proj_size)
        self.griffinlim = GriffinLim(n_fft, n_iter, win_length, hop_length, window_fn=torch.hann_window)
    
    def forward(self, inputs):
        outputs = self.cbhg(inputs)
        outputs = self.griffinlim(outputs)


        

In [None]:
hidden_size = 256
batch_size = 32
learning_rate = 1e-3