<a href="https://colab.research.google.com/github/whdid502/stt_model_project/blob/decoder/attention_decoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch import Tensor, optim

import numpy as np
import time
from typing import Tuple, Optional, Any

## Encoder

In [None]:
class BaseRNN(nn.Module):
    supported_rnns = {
        'lstm': nn.LSTM,
        'gru': nn.GRU,
        'rnn': nn.RNN
    }

    def __init__(
            self,
            input_size: int,                       # size of input
            hidden_dim: int = 512,                 # dimension of RNN`s hidden state vector
            num_layers: int = 1,                   # number of recurrent layers
            rnn_type: str = 'lstm',                # number of RNN layers
            dropout_p: float = 0.3,                # dropout probability
            bidirectional: bool = True,            # if True, becomes a bidirectional rnn
            device: str = 'cuda'                   # device - 'cuda' or 'cpu'
    ) -> None:
        super(BaseRNN, self).__init__()
        rnn_cell = self.supported_rnns[rnn_type]
        self.rnn = rnn_cell(input_size, hidden_dim, num_layers, True, True, dropout_p, bidirectional)
        self.hidden_dim = hidden_dim
        self.device = device

    def forward(self, *args, **kwargs):
        raise NotImplementedError


In [None]:
class CNNExtractor(nn.Module):
    supported_activations = {
        'hardtanh': nn.Hardtanh(0, 20, inplace=True),
        'relu': nn.ReLU(inplace=True),
        'elu': nn.ELU(inplace=True),
        'leaky_relu': nn.LeakyReLU(inplace=True),
        'gelu': nn.GELU()
    }

    def __init__(self, activation: str = 'hardtanh') -> None:
        super(CNNExtractor, self).__init__()
        self.activation = CNNExtractor.supported_activations[activation]

    def forward(self, inputs: Tensor, input_lengths: Tensor) -> Optional[Any]:
        raise NotImplementedError

In [None]:
class VGGExtractor(CNNExtractor):
    def __init__(self, activation: str, mask_conv: bool):
        super(VGGExtractor, self).__init__(activation)
        self.mask_conv = mask_conv
        self.conv = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(num_features=64),
            self.activation,
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(num_features=64),
            self.activation,
            nn.MaxPool2d(2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(num_features=128),
            self.activation,
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(num_features=128),
            self.activation,
            nn.MaxPool2d(2, stride=2)
        )

    def forward(self, inputs: Tensor, input_lengths: Tensor) -> Optional[Any]:
        conv_feat = self.conv(inputs)
        output = conv_feat

        return output

In [None]:
class Listener(BaseRNN):
  def __init__(
            self,
            input_size: int,                       # size of input
            hidden_dim: int = 512,                 # dimension of RNN`s hidden state
            device: str = 'cuda',                  # device - 'cuda' or 'cpu'
            dropout_p: float = 0.3,                # dropout probability
            num_layers: int = 3,                   # number of RNN layers
            bidirectional: bool = True,            # if True, becomes a bidirectional encoder
            rnn_type: str = 'lstm',                # type of RNN cell
            extractor: str = 'vgg',                # type of CNN extractor
            activation: str = 'hardtanh',          # type of activation function
            mask_conv: bool = False                # flag indication whether apply mask convolution or not
    ) -> None:
        self.mask_conv = mask_conv
        self.extractor = extractor.lower()
        self.device = device

        if self.extractor == 'vgg':
            input_size = (input_size - 1) << 5 if input_size % 2 else input_size << 5
            super(Listener, self).__init__(input_size, hidden_dim, num_layers, rnn_type, dropout_p, bidirectional, device)
            self.conv = VGGExtractor(activation, mask_conv)
        else:
            raise ValueError("Unsupported Extractor : {0}".format(extractor))

  def forward(self, inputs: Tensor, input_lengths: Tensor) -> Tuple[Tensor, Tensor]:
    conv_feat = self.conv(inputs.unsqueeze(1), input_lengths).to(self.device)
    conv_feat = conv_feat.transpose(1, 2)

    batch_size, seq_length, num_channels, hidden_dim = conv_feat.size()
    conv_feat = conv_feat.contiguous().view(batch_size, seq_length, num_channels * hidden_dim)

    if self.training:
        self.rnn.flatten_parameters()

    output, hidden = self.rnn(conv_feat)

    return output, hidden


## Multi-head Attention

In [None]:
def scaled_dot_product_attention(q, k, v, mask) :
    scaled_attention_logits = torch.bmm(q, k.transpose(1,2)) / np.sqrt(k.size(-1))

    if mask is not None :
        scaled_attention_logits.masked_fill_(mask, -1e9)

    attention_weights = F.softmax(scaled_attention_logits, -1)
    output = torch.bmm(attention_weights, v)
    
    return output, attention_weights

In [None]:
class MultiHeadAttention(nn.Module) :
    def __init__(self, d_model=512, num_heads=8) :
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads

        assert d_model % num_heads == 0

        self.depth = d_model // num_heads
        
        self.wq = nn.Linear(d_model, d_model, bias=True)
        self.wk = nn.Linear(d_model, d_model, bias=True)
        self.wv = nn.Linear(d_model, d_model, bias=True)

        self.linear = nn.Linear(d_model, d_model, bias=True) # ??

    def forward(self, q, k, v, mask=None) :        
        batch_size = v.size(0)

        q = self.wq(q).view(batch_size, -1, self.num_heads, self.depth)
        k = self.wk(k).view(batch_size, -1, self.num_heads, self.depth)
        v = self.wv(v).view(batch_size, -1, self.num_heads, self.depth)

        # split heads
        q = q.permute(2,0,1,3).contiguous().view(batch_size * self.num_heads, -1, self.depth)
        k = k.permute(2,0,1,3).contiguous().view(batch_size * self.num_heads, -1, self.depth)
        v = v.permute(2,0,1,3).contiguous().view(batch_size * self.num_heads, -1, self.depth)

        if mask is not None :
            mask = mask.repeat(self.num_heads, 1, 1)

        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)

        scaled_attention = scaled_attention.view(self.num_heads, batch_size, -1, self.depth)
        scaled_attention = scaled_attention.permute(1, 2, 0, 3).contiguous().view(batch_size, -1, self.d_model)
        output = self.linear(scaled_attention) # TODO : check

        return output, attention_weights

In [None]:
# temp_mha = MultiHeadAttention(d_model=512, num_heads=8)
# y = torch.rand((1, 60, 512))  # (batch_size, encoder_sequence, d_model)
# out, attn = temp_mha(y, y, y, mask=None)

# display(out.shape, attn.shape)
# display(y)
# out

## Decode

In [None]:
class DecoderStep(nn.Module) :
    def __init__(self, num_classes, LSTM_num=1, d_model=1024, num_heads=4, dropout_p=0.3, device='cuda'):
        super(DecoderStep, self).__init__()
        self.d_model = d_model
        self.device = device

        self.embedding = nn.Embedding(num_classes, d_model)
        self.input_dropout = nn.Dropout(dropout_p)

        self.uniDirLSTM = nn.LSTM(input_size=d_model, hidden_size=d_model, num_layers=LSTM_num, bias=True, batch_first=True, dropout=dropout_p, bidirectional=False)

        self.mha = MultiHeadAttention(d_model=d_model, num_heads=num_heads)
        
        self.layernorm1 = nn.LayerNorm(d_model, eps=1e-6)
        self.layernorm2 = nn.LayerNorm(d_model, eps=1e-6)

        self.linear1 = nn.Linear(d_model, d_model, bias=True)
        self.linear2 = nn.Linear(d_model, num_classes, bias=False)

    def forward(self, input_var, hidden, enc_output) :
        # enc_output.shape == (batch_size, input_seq_len, d_model)
        batch_size, output_lengths = input_var.size(0), input_var.size(1)

        embedded = self.embedding(input_var).to(self.device)
        embedded = self.input_dropout(embedded)

        if self.training :
            self.uniDirLSTM.flatten_parameters()

        out1, hidden = self.uniDirLSTM(embedded, hidden)
        
        context, attn_weights_block = self.mha(out1, enc_output, enc_output) # (batch_size, target_seq_len, d_model)
        out2 = self.layernorm1(context + out1).view(-1, self.d_model) # (batch_size, target_seq_len, d_model)

        out_proj = self.linear1(out2)
        output = self.layernorm2(out_proj + out2).view(batch_size, -1, self.d_model) # (batch_size, target_seq_len, d_model)

        output = self.linear2(torch.tanh(output).contiguous().view(-1, self.d_model))

        output = F.log_softmax(output, dim=1)
        output = output.view(batch_size, output_lengths, -1).squeeze(1)

        return output, hidden, attn_weights_block

In [None]:
class Decoder(nn.Module) :
    def __init__(self, num_classes, max_length=150, d_model=1024, num_heads=4, LSTM_num=2, dropout_p=0.3, device='cuda'):
        super(Decoder, self).__init__()

        self.d_model = d_model
        # self.num_layers = num_layers

        self.dec_layer = DecoderStep(num_classes=num_classes, LSTM_num=LSTM_num, d_model=d_model, num_heads=num_heads, dropout_p=dropout_p, device=device)

    def forward(self, inputs, enc_outputs) :
        assert enc_outputs is not None or inputs is not None

        hidden = None
        result = list()

        max_length = inputs.size(1) - 1 # minus the start of sequence symbol
        batch_size = enc_outputs.size(0)
        lengths = np.array([max_length] * batch_size)

        input_var = inputs[:, 0].unsqueeze(1)
        
        # TODO : delete
        print("🎶 input_var size : ", input_var.size())

        for di in range(max_length) :
            step_output, hidden, attn_weights_block = self.dec_layer(input_var, hidden, enc_outputs)
            result.append(step_output)
            input_var = result[-1].topk(1)[1]

            # TODO : ??
            if not self.training :
                eos_batches = input_var.data.eq(2) # eq(eos_id)

                if eos_batches.dim() > 0 :
                    eos_batches = eos_batches.cpu().view(-1).numpy()
                    update_idx = ((lengths > di) & eos_batches) != 0

        return result

In [None]:
class LAS(nn.Module) :
    def __init__(self, num_classes, input_size=80, hidden_dim=512, dropout_p=0.15, mask_conv=None, max_len=150, num_heads=4, 
                 dec_num_layers=2, enc_num_layers=3, device='cuda'):
        super(LAS, self).__init__()

        self.encoder = Listener(input_size=input_size, hidden_dim=hidden_dim, device=device, dropout_p=dropout_p, num_layers=enc_num_layers)
        self.decoder = Decoder(num_classes=num_classes, max_length=max_len, d_model=hidden_dim << 1, LSTM_num=dec_num_layers, dropout_p=dropout_p, device=device)

    def forward(self, inputs, input_lengths, targets=None):
        output, hidden = self.encoder(inputs, input_lengths)
        # print("😎 encoding done -> output size : ", output.size())

        result = self.decoder(targets, output)
        # print("🧐 decoder done")

        return result

    def flatten_parameters(self) :
        self.encoder.rnn.flatten_parameters()
        self.decoder.dec_layer.uniDirLSTM.flatten_parameters()

## data 준비

In [None]:
# TODO
# 0. csv 파일 내에 있는 data path를 통해 audio->feature vector & label & len(feature_vectors) & len(labels) 각각의 list를 을 하나의 tuple로 묶기
# 1. max length로 padding
# 2. id2char dictionary 필요

In [None]:
# !pip install python-Levenshtein
import Levenshtein as Lev

total_dist = 0.0
total_length = 0.0
EOS_ID = 0
id2char = {} # TODO : delete

def label_to_string(labels) :    
    sentences = str()
    for label in labels :
        if label.item() == EOS_ID :
            break
        sentence += id2char[label.item()]
    
    return sentence

def charErrorRate(targets, hypothesises) :
    for target, hypothesis in zip(targets, hypothesises) :
        s1 = label_to_string(target)
        s2 = label_to_string(hypothesis)

        # space 제거
        s1 = s1.replace(' ', '')
        s2 = s2.replace(' ', '')

        # TODO : check -> sentence에 '_'가 있는지 없는지 확인, 있다면 '_'도 지우기

        dist = Lev.distance(s2, s1)
        length = len(s1)

        total_dist += dist
        total_length += length

    return total_dist / total_length

## Train

In [None]:
def train_step(model, epoch, train_dataset, loss_func, optimizer, device='cuda') :
    inputs, input_lengths, targets, target_lengths = train_dataset
    
    model.train() # model을 train mode로 변경
    
    train_start_time = time.time()

    inputs = inputs.to(device)
    targets = targets.to(device)
    model = model.to(device)

    if isinstance(model, nn.DataParallel):
        model.module.flatten_parameters()
    else :
        model.flatten_parameters()

    result = model(inputs, input_lengths, targets)
    result = torch.stack(result, dim=1).to(device) # list를 dim=1 방향으로 concatenate => return Tensor
    hypothesises = result.max(-1)[1]

    # loss 계산
    targets = targets[:, 1:] # 0번째 column을 뺌 (모두 1임.)
    loss = loss_func(result.contiguous().view(-1, result.size(-1)), targets.contiguous().view(-1))
    step_loss = loss.item() # loss.item()은 loss의 스칼라 값.

    # 정확도 계산
    cer = charErrorRate(targets, hypothesises)

    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=400)

    # torch.cuda.empty_cache() # TODO : check that this is necessary

    return step_loss, cer

In [None]:
def train(model, batch_size, num_epochs, lr, weight_decay) :
    print("[INFO] train start")

    loss_func = nn.CrossEntropyLoss(reduction='sum')
    optimizer = optim.Adam(model.module.parameters(), lr=lr, weight_decay=weight_decay)

    for epoch in range(num_epochs) :
        # TODO
        # 1. batch size만큼 data load하기 
        # -> csv 파일 내에 있는 data path를 통해 audio->feature vector & label & len(feature_vectors) & len(labels) 각각의 list를 을 하나의 tuple로 묶기
        tmp_input = torch.rand((BATCH_SIZE,951,N_MELS), dtype=torch.float64).uniform_(0,200)
        tmp_input_length = torch.randint(0, N_MELS, size=(BATCH_SIZE,))
        tmp_target = torch.rand((BATCH_SIZE,59), dtype=torch.float64).uniform_(0,49)
        tmp_target_length = torch.randint(0, N_MELS, size=(BATCH_SIZE,))

        tmp_input = tmp_input.float()
        tmp_target = tmp_target.long()

        train_dataset = (tmp_input, tmp_input_length, tmp_target, tmp_target_length)

        # train
        epoch_loss, epoch_cer = train_step(model, epoch, train_dataset, loss_func, optimizer)

        # checkpoint 저장
        if (epoch+1) % 5 == 0 :
            pass

## 실행

In [None]:
# hyper-parameter
N_MELS = 80
HIDDEN_DIM = 256
DROPOUT_P = 0.15
MAX_LEN = 150
NUM_HEADS = 4
ENC_NUM_LAYERS = 3
DEC_NUM_LAYERS = 2
DEVICE = 'cuda'

NUM_CLASSES = 50 # TODO : dataset으로 label의 개수 넣어주기
LEARNING_RATE = 1e-06
WEIGHT_DECAY = 1e-05
BATCH_SIZE = 8
NUM_EPOCHS = 20

In [None]:
model = nn.DataParallel(LAS(num_classes=NUM_CLASSES, input_size=N_MELS, hidden_dim=HIDDEN_DIM, dropout_p=DROPOUT_P, max_len=12, 
                                  num_heads=NUM_HEADS, dec_num_layers=DEC_NUM_LAYERS, enc_num_layers=ENC_NUM_LAYERS, device=DEVICE)).to('cuda')

print("model 초기화 성공")

train(model, BATCH_SIZE, 1, LEARNING_RATE, WEIGHT_DECAY)

model 초기화 성공
[INFO] train start
🎶 input_var size :  torch.Size([8, 1])


1838.8216552734375