## Note
- size: 特徴量
- length: 時系列

In [1]:
from torch.utils.data import Dataset

import torch
from torch import nn, optim
import pandas as pd
import torchaudio
import librosa
import numpy as np
import math

from torchmetrics.functional import char_error_rate

import glob
import os
import re
import copy


In [2]:
tkwargs_int = {
    "dtype": torch.int32,
    "device": "cuda",
}
tkwargs_float = {
    "dtype": torch.float32,
    "device": "cuda",
}

In [3]:
class YesNoDataset(Dataset):
    def __init__(self, wav_dir_path, model_sample_rate):
        super().__init__()

        dataset = []
        columns = ["path", "text_idx"]
        self.labels = ["y", "e", "s", "n", "o", "<space>", "_"]
        self.label_to_idx = {label: i for i, label in enumerate(self.labels)}
        for wav_file_path in glob.glob(wav_dir_path + "*.wav"):
            file_name = os.path.splitext(os.path.basename(wav_file_path))[0]
            text_idx = []
            for c in file_name:
                if c == "1":
                    text_idx += [self.label_to_idx[ic] for ic in "yes"] 
                elif c == "0":
                    text_idx += [self.label_to_idx[ic] for ic in "no"] 
                elif c == "_":
                    text_idx.append(self.label_to_idx["<space>"])
                else:
                    raise ValueError("Invalid Dir Path")
            dataset.append([wav_file_path, text_idx])
        
        self.dataset = pd.DataFrame(dataset, columns=columns)
        self.model_sample_rate = model_sample_rate
        self.spectrogram_transformer = torchaudio.transforms.MelSpectrogram(
            # スペクトル設定
            sample_rate=self.model_sample_rate,
            n_fft=1024,
            # スペクトログラム設定
            win_length= None,
            hop_length= 512,
            window_fn= torch.hann_window,
            # メルスペクトログラム設定
            n_mels=40,
            power=2.0,
        )
    
    
    def __len__(self):
        return self.dataset.shape[0]
    
    def __getitem__(self, idx):
        wav_file_path = self.dataset.iloc[idx, 0]
        text_idx = self.dataset.iloc[idx, 1]
        wav_data, sample_rate = torchaudio.load(wav_file_path)
        if sample_rate != self.model_sample_rate:
            wav_data = torchaudio.functional.resample(wav_data, sample_rate, self.model_sample_rate)
            sample_rate = self.model_sample_rate
        spectrogram = self.spectrogram_transformer(wav_data)
        spectrogram_db = librosa.amplitude_to_db(spectrogram)

        return spectrogram_db[0].transpose(1,0), torch.tensor(text_idx)

        

In [4]:
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

def collate_fn(batch):
    # spectrogram_db: tensor[Time, Melbins]
    # text_idx: tensor[text_len]
    spectrogram_dbs, text_idxs = zip(*batch)
   
    original_spectrogram_db_lens = torch.tensor(np.array([len(spectrogram_db) for spectrogram_db in spectrogram_dbs]))
    original_text_idx_lens = torch.tensor(np.array([len(text_idx) for text_idx in text_idxs]))

    # padding and packing for spectrogram_db
    padded_spectrogram_dbs = []
    for spectrogram_db in spectrogram_dbs:
        padded_spectrogram_db = np.pad(spectrogram_db, ((0,max(original_spectrogram_db_lens)-spectrogram_db.shape[0]),(0,0)), "constant", constant_values=0)
        padded_spectrogram_dbs.append(padded_spectrogram_db)
    
    padded_spectrogram_dbs = torch.tensor(np.array(padded_spectrogram_dbs))

    # padding and packing for text_idx
    padded_text_idxs = pad_sequence(text_idxs, batch_first=True, padding_value=-1)
    #packed_padded_texts = pack_padded_sequence(padded_texts, original_text_idx_lens, batch_first=True, enforce_sorted=False)

    # テキストはCTCロス計算でしか使わず、RNNに入力しないのでpackingによるマスクは不要
    return padded_spectrogram_dbs, padded_text_idxs, original_spectrogram_db_lens, original_text_idx_lens

In [5]:
model_sample_rate = 8000
wav_dir_path = "../dataset/waves_yesno/"
dataset = YesNoDataset(wav_dir_path, model_sample_rate)

In [6]:
from torch.utils.data import random_split, DataLoader
# 学習データとテストデータに分割
## 合計サイズが元のサイズと同一になるように注意
train_size = int(len(dataset) * 0.7)
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(
    dataset, [train_size, test_size]
)
BATCH_SIZE = 2
train_dataloader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=4,
    # 不完全なバッチの無視
    drop_last=True,
    # 高速化?
    pin_memory=True,
    collate_fn=collate_fn
)
test_dataloader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=4,
    # 不完全なバッチの無視
    drop_last=True,
    # 高速化?
    pin_memory=True,
    collate_fn=collate_fn
)

In [7]:
from my_modules import Conv2DSubSampling, PositionalEncoding

In [16]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, input_size=256, input_size_hidden=2048, nhead=4, dropout=0.1, norm_first=True):
        # input_size: 入力の特徴量次元
        # input_size_hidden: 
        #   Linearにおける中間層の次元
        #   Attention is all you needにおいてはReLU(x*W_1 + b_1)*W_2 + b_2にて計算される
        super(TransformerEncoderLayer, self).__init__()
        # embed_dimは特徴量次元のサイズであり、vdim, kdimを指定しない場合はembed_dimと同じになる
        # 出力は[B, T, F]となるようにそれぞれのattentionが行われるぽい・・・
        self.norm_first = norm_first
        self.norm1 = nn.LayerNorm(input_size)
        self.norm2 = nn.LayerNorm(input_size)
        self.multi_head_self_attn = nn.MultiheadAttention(embed_dim=input_size, num_heads=nhead, dropout=dropout, batch_first=True)
        self.dropout = nn.Dropout(dropout)

        self.linear1 = nn.Linear(input_size, input_size_hidden)
        self.linear2 = nn.Linear(input_size_hidden, input_size)
        self.relu = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
    def forward(self, x, attn_mask=None, key_padding_mask=None):
        # args:
        #   x: [B, T, F]
        #   attn_msxk: [B*H, T, F](Hはヘッド数) or [T, F]
        #       計算したAttentionWeightに対して加算するマスク
        #       マスクされたAttentionWeightは無視される
        #       オフラインエンコーダーでは一般にシーケンスすべてを見るためNoneでよい
        #   key_padding_mask: [B, T]
        #       パディングされた部分をTrueにすることで無視するようにする
        #       サブサンプリング済みであれば特に指定しなくてよさそう
        # return:
        #  x: [B, T, F]
        if self.norm_first:
            x = self.norm1(x)
            x = x + self._multi_head_self_attn(x, attn_mask, key_padding_mask)
            x = self.norm2(x)
            x = x + self._feed_forward(x)
        else:
            # Attention is all you needにおける実装
            x = x + self._multi_head_self_attn(x, attn_mask, key_padding_mask)
            x = self.norm1(x)
            x = x + self._feed_forward(x)
            x = self.norm2(x)
        return x

    def _multi_head_self_attn(self, x, attn_mask=None, key_padding_mask=None):
        x = self.multi_head_self_attn(x, x, x, attn_mask=attn_mask, key_padding_mask=key_padding_mask, need_weights=False)[0]
        x = self.dropout(x)
        return x

    def _feed_forward(self, x):
        # Attention is all you needではReLU(x*W_1 + b_1)*W_2 + b_2で計算される
        # dropout: 各線形層ごとに適用する
        x = self.linear1(x)
        x = self.relu(x)
        x = self.dropout1(x)
        x = self.linear2(x)
        x = self.dropout2(x)

        return x
    


In [17]:
class TransformerEncoder(nn.Module):
    def __init__(self, input_size, subsampled_input_size=256, num_layers=12, n_head=4, input_hidden_size=2048, dropout=0.1, norm_first=True) -> None:
        # input_size: 入力の特徴量次元
        # subsampled_input_size: サブサンプリング後の入力特徴量次元
        # num_layers: TransformerEncoderLayerの数
        # n_head: MultiHeadAttentionのヘッド数
        # input_hidden_size: TransformerEncoderLayerの線形層における中間層の次元
        # dropout: Dropoutの割合
        # norm_first: LayerNormを先に行うかどうか
        super(TransformerEncoder, self).__init__()
        self.num_layers = num_layers
        self.conv_subsample = Conv2DSubSampling(input_size, subsampled_input_size)
        self.positional_encoding = PositionalEncoding(subsampled_input_size)

        transformer_encoder_layer = TransformerEncoderLayer(input_size=subsampled_input_size, input_hidden_size=input_hidden_size, nhead=n_head, dropout=dropout, norm_first=norm_first)
        self.transformer_encoder = nn.ModuleList(
            # オブジェクトの共有を防ぐためにdeepcopyを使う
            [copy.deepcopy(transformer_encoder_layer) for _ in range(num_layers)]
        )
        self.norm_first = norm_first
        if self.norm_first:
            self.norm = nn.LayerNorm(subsampled_input_size)
    
    def forward(self, x, x_lengths):
        # args:
        #   x: [B, T, input_size]
        #   x_lengths: [B]
        # return:
        x = self.positional_encoding(x) # [B, T', subsampled_input_size]
        key_padding_mask = self._make_key_padding_mask(x_lengths) # [B, T']
        for i, layer in enumerate(self.transformer_encoder):
            x = layer(x, attn_mask=None, key_padding_mask=key_padding_mask)
            if i == self.num_layers // 2:
                x_inter = x
        if self.norm_first:
            x = self.norm(x)
            if x_inter is not None:
                x_inter = self.norm(x_inter)
                
        return x, x_inter
    
    def _create_key_padding_mask(self, x_lengths):
        # args:
        #   x_lengths: [B]
        # return:
        #   key_padding_mask: [B, T]
        #       パディングされた部分をTrueにすることで無視するようにする
        max_len = x_lengths.max()
        key_padding_mask = torch.arange(max_len, device=x_lengths.device)[None, :] >= x_lengths[:, None]
        return key_padding_mask
        
        

In [18]:
class Encoder(nn.Module):
    def __init__(self, input_size, subsampled_input_size, num_labels):
        super(Encoder, self).__init__()
        self.input_size = input_size
        self.num_labels = num_labels
        self.conv2d_sub_sampling = Conv2DSubSampling(input_size, subsampled_input_size, 3, 2, 3, 1)
        self.transformer_encoder = TransformerEncoder(subsampled_input_size, 12, 4, 2048, 0.1, True)
    
    def forward(self, x, x_lengths):
        # args:
        #   x: [B, T, input_size]
        #   x_lengths: [B]
        #       padding前のシーケンス長
        subsampled_x, subsampled_x_length = self.conv2d_sub_sampling(x, x_lengths)
        key_padding_mask = self._create_key_padding_mask(subsampled_x_length)
        output = self.transformer_encoder(subsampled_x, subsampled_x_length) # [B, T', subsampled_input_size]
        return output
        

これ以降、各モデルごとに実験用のコードを記述していきます。

In [8]:
from torch import nn, optim
from torch.nn.functional import log_softmax
class RNN(nn.Module):
    def __init__(self, input_length, num_labels):
        # input_lengthは一つのスペクトルの大きさ
        # パディングはバッチ内で数をそろえるため
        super(RNN, self).__init__()
        self.input_length = input_length
        self.num_labels = num_labels
        # time * batch * featureで入力する
        #self.rnn = nn.RNN(input_size=self.input_length, hidden_size=64, num_layers=1, batch_first=True)
        self.rnn = nn.LSTM(input_size=self.input_length, hidden_size=32, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(64, self.num_labels, bias=True)

    def forward(self, padded_x, original_x_lens):
        # x: device済みのpadded_sequence
        packed_padded_x = pack_padded_sequence(padded_x, original_x_lens, batch_first=True, enforce_sorted=False)
        packed_padded_x_rnn, hidden = self.rnn(packed_padded_x, None)
        # unpackした上で全結合層へ
        padded_x_rnn = pad_packed_sequence(packed_padded_x_rnn, batch_first=True, padding_value=0)[0]
        padded_y = self.fc(padded_x_rnn)
        padded_log_prob = log_softmax(padded_y, dim=2)
        
        return padded_log_prob
    

In [9]:
class Model(nn.Module):
    def __init__(self, input_length, subsampled_input_length, num_labels):
        super(Model, self).__init__()
        self.input_length = input_length
        self.num_labels = num_labels
        self.conv2d_sub_sampling = Conv2DSubSampling(input_length, subsampled_input_length, 3, 2, 3, 1)
        self.rnn = RNN(subsampled_input_length, self.num_labels)
    
    def forward(self, padded_spectrogram_dbs):
        sub_sampled_padded_spectrogram_dbs = self.conv2d_sub_sampling(padded_spectrogram_dbs)
        sub_sampled_padded_spectrogram_db_lens = torch.tensor([sub_sampled_padded_spectrogram_dbs.shape[1] for _ in range(sub_sampled_padded_spectrogram_dbs.shape[0])])
        padded_log_prob = self.rnn(sub_sampled_padded_spectrogram_dbs, sub_sampled_padded_spectrogram_db_lens)
        return padded_log_prob, sub_sampled_padded_spectrogram_db_lens

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"This learning will be running on {device}.")

This learning will be running on cuda.


In [11]:
input_length = 40
subsampled_input_length = 32
num_labels = len(dataset.labels)
num_epochs = 40

In [12]:
def ctc_simple_decode(hypotheses_idxs, labels, padding_idx):
    # hypothesis_idxs: tensor(batch, time)
    # labels: np.array(num_labels)

    hypotheses_idxs = hypotheses_idxs.cpu().numpy()
    hypotheses = []
    blank_idx = labels.index("_")
    for hypothesis_idxs in hypotheses_idxs:
        hypothesis = []
        prev_idx = -1
        for idx in hypothesis_idxs:
            if idx == blank_idx:
                continue
            elif idx == prev_idx:
                continue
            elif idx == padding_idx:
                continue
            else:
                hypothesis.append(labels[idx])
                prev_idx = idx
        hypotheses.append("".join(hypothesis))
    return hypotheses

In [15]:
from torch.utils.tensorboard import SummaryWriter
import time

model = Model(input_length, subsampled_input_length, num_labels).to(device)

ctc_loss = nn.CTCLoss(reduction="sum", blank=dataset.label_to_idx["_"])
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
# Adam

writer = SummaryWriter()

for i in range(num_epochs):
    t0 = time.time()
    model.train()
    epoch_loss = 0
    cnt = 0
    for _, (padded_spectrogram_dbs, padded_text_idxs, original_spectrofram_db_lens, original_text_idx_lens) in enumerate(train_dataloader):
        cnt += 1
        optimizer.zero_grad()
        padded_spectrogram_dbs = padded_spectrogram_dbs.to(device)
        padded_text_idxs = padded_text_idxs.to(device)
      
        padded_log_probs, sub_sampled_padded_spectrogram_db_lens = model(padded_spectrogram_dbs)
        loss = ctc_loss(padded_log_probs.transpose(1,0), padded_text_idxs, sub_sampled_padded_spectrogram_db_lens, original_text_idx_lens)
        loss.backward()
        optimizer.step()
        # lossはバッチ内平均ロス
        epoch_loss += (loss.item() / BATCH_SIZE)
    # バッチ内平均ロスの和をイテレーション数で割ることで、一つのデータあたりの平均ロスを求める
    writer.add_scalar("Loss/Training", epoch_loss / cnt, i)

    model.eval()
    with torch.no_grad():
        epoch_test_loss = 0
        cnt = 0
        total_cer = 0
        for _, (padded_spectrogram_dbs, padded_text_idxs, original_spectrofram_db_lens, original_text_idx_lens) in enumerate(test_dataloader):
            cnt += 1
            padded_spectrogram_dbs = padded_spectrogram_dbs.to(device)
            padded_text_idxs = padded_text_idxs.to(device)
            
            padded_log_probs, sub_sampled_padded_spectrogram_db_lens = model(padded_spectrogram_dbs)
            loss = ctc_loss(padded_log_probs.transpose(1,0), padded_text_idxs, sub_sampled_padded_spectrogram_db_lens, original_text_idx_lens)
            epoch_test_loss += loss.item()
            # for CER calculation
            hypotheses_idxs = padded_log_probs.argmax(dim=2) 
            hypotheses = ctc_simple_decode(hypotheses_idxs, dataset.labels, -1)
            teachers = ctc_simple_decode(padded_text_idxs, dataset.labels, -1)
            total_cer += char_error_rate(hypotheses, teachers)

    writer.add_scalar("Loss/Test", epoch_test_loss / cnt, i)
    writer.add_scalar("CER/Test", total_cer / cnt, i)
    t1 = time.time()
    print(f"{i} epoch: {epoch_loss / cnt} loss, {epoch_test_loss / cnt} test loss, CER: {total_cer / cnt}, {t1 - t0} sec")

0 epoch: 110.57820426093207 loss, 95.21940019395616 test loss, CER: 0.8986272811889648, 0.6719908714294434 sec
1 epoch: 108.1518071492513 loss, 94.02442762586806 test loss, CER: 0.9991829991340637, 0.6908049583435059 sec
2 epoch: 105.6210560268826 loss, 90.56393517388238 test loss, CER: 0.9048658013343811, 0.6970508098602295 sec
3 epoch: 98.65273412068684 loss, 81.75259314643012 test loss, CER: 0.4814228415489197, 0.7050962448120117 sec
4 epoch: 81.96647220187717 loss, 59.058189392089844 test loss, CER: 0.15483933687210083, 0.709968090057373 sec
5 epoch: 52.83677122328017 loss, 33.4733698103163 test loss, CER: 0.0781707614660263, 0.7209258079528809 sec
6 epoch: 30.53088665008545 loss, 20.97452820671929 test loss, CER: 0.05267477035522461, 0.7451949119567871 sec
7 epoch: 20.224013063642715 loss, 15.957574844360352 test loss, CER: 0.05020396411418915, 0.7213895320892334 sec
8 epoch: 15.447737587822807 loss, 10.982566939459907 test loss, CER: 0.04708058014512062, 0.7611837387084961 sec
9 

In [16]:
with torch.no_grad():
    total_cer = 0
    cnt = 0
    for _, (padded_spectrogram_dbs,padded_text_idxs, original_spectrofram_db_lens, original_text_idx_lens) in enumerate(test_dataloader):
        # unpacked_log_probs: tensor[batch, padded_time, num_labels]
        padded_spectrogram_dbs = padded_spectrogram_dbs.to(device)
        padded_text_idxs = padded_text_idxs.to(device)
        padded_log_probs, sub_sampled_padded_spectrogram_db_lens = model(padded_spectrogram_dbs)
        hypotheses_idxs = padded_log_probs.argmax(dim=2) 
        hypotheses = ctc_simple_decode(hypotheses_idxs, dataset.labels, -1)
        teachers = ctc_simple_decode(padded_text_idxs, dataset.labels, -1)
        for hypothesis, teacher in zip(hypotheses, teachers):
            print(f"hyp: {hypothesis}")
            print(f"tea: {teacher}")
        total_cer += char_error_rate(hypotheses, teachers)
        cnt += 1
    print(f"CER: {total_cer / cnt}")

hyp: no<space>no<space>yes<space>yes<space>yes<space>yes<space>no<space>no
tea: no<space>no<space>yes<space>yes<space>yes<space>yes<space>no<space>no
hyp: no<space>yes<space>yes<space>yes<space>yes<space>no<space>yes<space>no
tea: no<space>yes<space>yes<space>yes<space>yes<space>no<space>yes<space>no
hyp: no<space>yes<space>no<space>yes<space>yes<space>yes<space>no<space>no
tea: no<space>yes<space>no<space>yes<space>yes<space>yes<space>no<space>no
hyp: no<space>yes<space>yes<space>yes<space>no<space>no<space>no<space>no
tea: no<space>yes<space>yes<space>yes<space>no<space>no<space>no<space>no
hyp: yes<space>yes<space>no<space>yes<space>no<space>yes<space>yes<space>no
tea: yes<space>yes<space>no<space>yes<space>no<space>yes<space>yes<space>no
hyp: yes<space>yes<space>no<space>no<space>no<space>yes<space>yes<space>yes
tea: yes<space>yes<space>no<space>no<space>no<space>yes<space>yes<space>yes
hyp: yes<space>yes<space>yes<space>no<space>no<space>yes<space>no<space>yes
tea: yes<space>yes<s