In [None]:
%pylab inline
%load_ext autoreload
%load_ext tensorboard
%autoreload
import IPython
from IPython.display import Audio
import tensorboard as tb
import os
# 数値演算
import numpy as np
import torch
from torch import nn
# 音声波形の読み込み
from scipy.io import wavfile
# 音声分析
# import pyworld
# 音声分析、可視化
import librosa
import librosa.display

In [None]:
# 語彙の定義
characters = "abcdefghijklmnopqrstuvwxyz!'(),-.:;? "
# その他特殊記号
extra_symbols = [
    "^",  # 文の先頭を表す特殊記号 <SOS>
    "$",  # 文の末尾を表す特殊記号 <EOS>
]
_pad = "~"

# NOTE: パディングを 0 番目に配置
symbols = [_pad] + extra_symbols + list(characters)

# 文字列⇔数値の相互変換のための辞書
_symbol_to_id = {s: i for i, s in enumerate(symbols)}
_id_to_symbol = {i: s for i, s in enumerate(symbols)}

In [None]:
for i,s in enumerate(symbols):
    print(f"i = {i}")
    print(f"s = {s}")
    print("")

In [None]:
def text_to_sequence(text):
    # 簡易のため、大文字と小文字を区別せず、全ての大文字を小文字に変換
    text = text.lower()

    # <SOS>
    seq = [_symbol_to_id["^"]]

    # 本文
    seq += [_symbol_to_id[s] for s in text]

    # <EOS>
    seq.append(_symbol_to_id["$"])

    return seq


def sequence_to_text(seq):
    return [_id_to_symbol[s] for s in seq]

In [None]:
seq = text_to_sequence("Hello!")
print(f"文字列から数値列への変換: {seq}")
print(f"数値列から文字列への逆変換: {sequence_to_text(seq)}")


In [None]:
class SimplestEncoder(nn.Module):
    def __init__(self, num_vocab=40, embed_dim=256):
        super().__init__()
        self.embed = nn.Embedding(num_vocab, embed_dim, padding_idx=0)

    def forward(self, seqs):
        return self.embed(seqs)


In [None]:
SimplestEncoder()

In [None]:
def get_dummy_input():
    # バッチサイズに 2 を想定して、適当な文字列を作成
    seqs = [
        text_to_sequence("What is your favorite language?"),
        # text_to_sequence("Hello world."),
    ]
    in_lens = torch.tensor([len(x) for x in seqs], dtype=torch.long)
    max_len = max(len(x) for x in seqs)
    # seqs = torch.stack([torch.from_numpy(pad_1d(seq, max_len)) for seq in seqs])
    seqs = torch.tensor(seqs)

    return seqs, in_lens

In [None]:
encoder = SimplestEncoder(num_vocab=40, embed_dim=256)
seqs, in_lens = get_dummy_input()
encoder_outs = encoder(seqs)
print(f"入力のサイズ: {tuple(seqs.shape)}")
print(f"出力のサイズ: {tuple(encoder_outs.shape)}")

In [None]:
embedding = nn.Embedding(10, 2)
input = torch.tensor([[1,2,4,5], [4,3,2,9]]).long()
print(embedding(input))
print(embedding(input).shape)

In [None]:
seq = text_to_sequence("What is your favorite language?")
seq = torch.tensor(seq)
encoder = SimplestEncoder(num_vocab=40, embed_dim=256)
embed = encoder(seq)
print(embed)
print(embed.shape)

In [None]:
class ConvEncoder(nn.Module):
    def __init__(
        self,
        num_vocab=40,
        embed_dim=256,
        conv_layers=3,
        conv_channels=256,
        conv_kernel_size=5,
    ):
        super().__init__()
        # 文字埋め込み
        self.embed = nn.Embedding(num_vocab, embed_dim, padding_idx=0)

        # 1次元畳み込みの重ね合わせ：局所的な依存関係のモデル化
        self.convs = nn.ModuleList()
        for layer in range(conv_layers):
            in_channels = embed_dim if layer == 0 else conv_channels
            self.convs += [
                nn.Conv1d(
                    in_channels,
                    conv_channels,
                    conv_kernel_size,
                    padding=(conv_kernel_size - 1) // 2,
                    bias=False,
                ),
                nn.BatchNorm1d(conv_channels),
                nn.ReLU(),
                nn.Dropout(0.5),
            ]
        self.convs = nn.Sequential(*self.convs)

    def forward(self, seqs):
        emb = self.embed(seqs)
        # 1 次元畳み込みと embedding では、入力のサイズが異なるので注意
        out = self.convs(emb.transpose(1, 2)).transpose(1, 2)
        return out

In [None]:
seqs, in_lens = get_dummy_input()

num_vocab = 40
embed_dim = 256
conv_layers = 3
conv_channels = 256
conv_kernel_size = 5

embed = nn.Embedding(num_vocab, embed_dim, padding_idx=0)
embed_out = embed(seqs)
print(f"embed = {embed_out.shape}")

convs = nn.ModuleList()
for layer in range(conv_layers):
    in_channels = embed_dim if layer == 0 else conv_channels
    convs += [
        nn.Conv1d(
            in_channels,
            conv_channels,
            conv_kernel_size,
            padding=(conv_kernel_size - 1) // 2,
            bias=False,
        ),
        nn.BatchNorm1d(conv_channels),
        nn.ReLU(),
        nn.Dropout(0.5)
    ]
convs = nn.Sequential(*convs)   

# print(convs)
out = convs(embed_out.transpose(1,2))
print(f"out = {out.shape}")
out = out.transpose(1,2)
print(f"out_T = {out.shape}")

In [None]:
encoder = ConvEncoder(num_vocab=40, embed_dim=256)
seqs, in_lens = get_dummy_input()
encoder_outs = encoder(seqs)
print(f"入力のサイズ: {tuple(seqs.shape)}")
print(f"出力のサイズ: {tuple(encoder_outs.shape)}")

# エンコーダ

In [None]:
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

class Encoder(ConvEncoder):
    def __init__(
        self,
        num_vocab=40,
        embed_dim=512,
        hidden_dim=512,
        conv_layers=5,
        conv_channels=512,
        conv_kernel_size=7,
    ):
        # ConvEncoderの引数を継承した後に，Encoderで初期化される。Encoderの引数の値を使用して構築される。
        super().__init__(
            num_vocab, embed_dim, conv_layers, conv_channels, conv_kernel_size
        )
        # 双方向 LSTM による長期依存関係のモデル化
        self.blstm = nn.LSTM(
            conv_channels, hidden_dim // 2, 1, batch_first=True, bidirectional=True
        )

    def forward(self, seqs, in_lens):
        emb = self.embed(seqs)
        # 1 次元畳み込みと embedding では、入力のサイズ が異なるので注意
        out = self.convs(emb.transpose(1, 2)).transpose(1, 2)

        # 双方向 LSTM の計算
        out = pack_padded_sequence(out, in_lens, batch_first=True)
        out, _ = self.blstm(out)
        out, _ = pad_packed_sequence(out, batch_first=True)
        return out

In [None]:
conv = ConvEncoder(num_vocab=40, embed_dim=256)
encoder = Encoder(num_vocab=40, embed_dim=256)
# 継承前の元のやつ
print(conv)

In [None]:
# 継承した後。畳み込み層の数やカーネルサイズが更新されている。
print(encoder)

In [None]:
encoder = Encoder(num_vocab=40, embed_dim=256)
seqs, in_lens = get_dummy_input()
in_lens, indices = torch.sort(in_lens, dim=0, descending=True)
seqs = seqs[indices]

encoder_outs = encoder(seqs, in_lens)
print(f"入力のサイズ: {tuple(seqs.shape)}")
print(f"出力のサイズ: {tuple(encoder_outs.shape)}")

In [None]:
from torch.nn import functional as F

# 書籍中の数式に沿って、わかりやすさを重視した実装
class BahdanauAttention(nn.Module):
    def __init__(self, encoder_dim=512, decoder_dim=1024, hidden_dim=128):
        super().__init__()
        self.V = nn.Linear(encoder_dim, hidden_dim)
        self.W = nn.Linear(decoder_dim, hidden_dim, bias=False)
        # NOTE: 本書の数式通りに実装するなら bias=False ですが、実用上は bias=True としても問題ありません
        self.w = nn.Linear(hidden_dim, 1)

    def forward(self, encoder_out, decoder_state, mask=None):
        # 式 (9.11) の計算
        erg = self.w(
            torch.tanh(self.W(decoder_state).unsqueeze(1) + self.V(encoder_outs))
        ).squeeze(-1)

        if mask is not None:
            erg.masked_fill_(mask, -float("inf"))

        attention_weights = F.softmax(erg, dim=1)

        # エンコーダ出力の長さ方向に対して重み付き和を取ります
        attention_context = torch.sum(
            encoder_outs * attention_weights.unsqueeze(-1), dim=1
        )

        return attention_context, attention_weights

In [None]:
encoder_outs = torch.rand(2, 33, 512)
decoder_input = torch.rand(2, 1024)
attention = BahdanauAttention()
attention_context, attention_weights = attention(encoder_outs, decoder_input)

print(f"エンコーダの出力のサイズ: {tuple(encoder_outs.shape)}")
print(f"デコーダの隠れ状態のサイズ: {tuple(decoder_input.shape)}")
print(f"コンテキストベクトルのサイズ: {tuple(attention_context.shape)}")
print(f"アテンション重みのサイズ: {tuple(attention_weights.shape)}")

In [None]:
encoder_outs = torch.rand(2, 33, 512)
decoder_input = torch.rand(2, 1024)

encoder_dim=512
decoder_dim=1024
hidden_dim=128

V = nn.Linear(encoder_dim, hidden_dim)
W = nn.Linear(decoder_dim, hidden_dim, bias=False)
w = nn.Linear(hidden_dim, 1)

erg = w(
    torch.tanh(W(decoder_input).unsqueeze(1) + V(encoder_outs))
)
erg_sq = erg.squeeze(-1)

attention_weights = F.softmax(erg_sq, dim=1)

attention_context = torch.sum(
    encoder_outs * attention_weights.unsqueeze(-1), dim=1
)

test = encoder_outs * attention_weights.unsqueeze(-1)

print(f"W(decoder_input).unsqueeze(1).shape = {W(decoder_input).unsqueeze(1).shape}")
print(f"V(encoder_outs).shape = {V(encoder_outs).shape}")
print(f"erg.shape = {erg.shape}")
print(f"erg_sq.shape = {erg_sq.shape}")
print(f"attention_weights.shape = {attention_weights.shape}")
print(f"encoder_outs = {encoder_outs.shape}")
print(f"attention_weights = {attention_weights.shape}")
print(f"attention_weights.unsq = {attention_weights.unsqueeze(-1).shape}")
print(f"pre_sum = {test.shape}")
print(attention_context.shape)

In [None]:
x = torch.rand(2,5,1)
y = torch.rand(2,5,33)
z = x * y
# sum = torch.sum(x * y, dim=1)
a = torch.tensor([[1,2], [3,4]])
b = torch.tensor([[1,2], [3,4]])
c = a * b
print(a)
print(c)

# unsqueezeでサイズ1の次元を追加することで，アダマール積が求められる。
# 本当はサイズが合ってないと要素積はできないけど，行列×定数ができるのと同じルールで計算できちゃう。
a = torch.arange(0, 20).view(2,2,-1)
b = torch.arange(0, 4).view(2,2).unsqueeze(-1)
print("")
print(f"a.shape = {a.shape}")
print(f"b.shape = {b.shape}")
print(a)
print(b)
c = a * b
print(f"c.shape = {c.shape}")
print(c)

In [None]:
a = torch.arange(4).view(2,-1)
# b = torch.ones(4).view(2,-1) * 4
b = 4
c = a * b
print(c)
d = a + b
print(d)

In [None]:
# squeezeは次元の中でサイズ1のところを消去する。引数で次元を選択。
x = torch.zeros(2,3,1,4,1)
print(x.shape)
print(x.squeeze().shape)
print(x.squeeze(-1).shape)
print(x.unsqueeze(-1).shape)
print(x.squeeze().unsqueeze(-1).shape)

In [None]:
def make_pad_mask(lengths, maxlen=None):
    """Make mask for padding frames

    Args:
        lengths (list): list of lengths
        maxlen (int, optional): maximum length. If None, use max value of lengths.

    Returns:
        torch.ByteTensor: mask
    """
    if not isinstance(lengths, list):
        lengths = lengths.tolist()
    bs = int(len(lengths))
    if maxlen is None:
        maxlen = int(max(lengths))

    seq_range = torch.arange(0, maxlen, dtype=torch.int64)
    seq_range_expand = seq_range.unsqueeze(0).expand(bs, maxlen)
    seq_length_expand = seq_range_expand.new(lengths).unsqueeze(-1)
    mask = seq_range_expand >= seq_length_expand

    return mask

# 注意機構（内容依存と位置依存のハイブリッド）

In [None]:
class LocationSensitiveAttention(nn.Module):
    def __init__(
        self,
        encoder_dim=512,
        decoder_dim=1024,
        hidden_dim=128,
        conv_channels=32,
        conv_kernel_size=31,
    ):
        super().__init__()
        self.V = nn.Linear(encoder_dim, hidden_dim)
        self.W = nn.Linear(decoder_dim, hidden_dim, bias=False)
        self.U = nn.Linear(conv_channels, hidden_dim, bias=False)
        self.F = nn.Conv1d(
            1,
            conv_channels,
            conv_kernel_size,
            padding=(conv_kernel_size - 1) // 2,
            bias=False,
        )
        # NOTE: 本書の数式通りに実装するなら bias=False ですが、実用上は bias=True としても問題ありません
        self.w = nn.Linear(hidden_dim, 1)

    def forward(self, encoder_outs, src_lens, decoder_state, att_prev, mask=None):
        # アテンション重みを一様分布で初期化
        if att_prev is None:
            att_prev = 1.0 - make_pad_mask(src_lens).to(
                device=decoder_state.device, dtype=decoder_state.dtype
            )
            att_prev = att_prev / src_lens.unsqueeze(-1).to(encoder_outs.device)

        # (B x T_enc) -> (B x 1 x T_enc) -> (B x conv_channels x T_enc) ->
        # (B x T_enc x conv_channels)
        f = self.F(att_prev.unsqueeze(1)).transpose(1, 2)

        # 式 (9.13) の計算
        erg = self.w(
            torch.tanh(
                self.W(decoder_state).unsqueeze(1) + self.V(encoder_outs) + self.U(f)
            )
        ).squeeze(-1)

        if mask is not None:
            erg.masked_fill_(mask, -float("inf"))

        attention_weights = F.softmax(erg, dim=1)

        # エンコーダ出力の長さ方向に対して重み付き和を取ります
        attention_context = torch.sum(
            encoder_outs * attention_weights.unsqueeze(-1), dim=1
        )

        return attention_context, attention_weights

In [179]:
from ttslearn.util import make_pad_mask

mask =  make_pad_mask(in_lens).to(encoder_outs.device)
attention = LocationSensitiveAttention()

decoder_input = torch.ones(len(seqs), 1024)

attention_context, attention_weights = attention(encoder_outs, in_lens, decoder_input, None, mask)

print(f"エンコーダの出力のサイズ: {tuple(encoder_outs.shape)}")
print(f"デコーダの隠れ状態のサイズ: {tuple(decoder_input.shape)}")
print(f"コンテキストベクトルのサイズ: {tuple(attention_context.shape)}")
print(f"アテンション重みのサイズ: {tuple(attention_weights.shape)}")

エンコーダの出力のサイズ: (2, 33, 512)
デコーダの隠れ状態のサイズ: (1, 1024)
コンテキストベクトルのサイズ: (2, 512)
アテンション重みのサイズ: (2, 33)


# Prenet

In [None]:
class Prenet(nn.Module):
    def __init__(self, in_dim, layers=2, hidden_dim=256, dropout=0.5):
        super().__init__()
        self.dropout = dropout
        prenet = nn.ModuleList()
        for layer in range(layers):
            prenet += [
                nn.Linear(in_dim if layer == 0 else hidden_dim, hidden_dim),
                nn.ReLU(),
            ]
        self.prenet = nn.Sequential(*prenet)

    def forward(self, x):
        for layer in self.prenet:
            # 学習時、推論時の両方で Dropout を適用します
            x = F.dropout(layer(x), self.dropout, training=True)
        return x

In [180]:
decoder_input = torch.ones(len(seqs), 80)

prenet = Prenet(decoder_input.shape[1])
out = prenet(decoder_input)
print(f"seqs = {seqs.shape}")
print(f"デコーダの入力のサイズ: {tuple(decoder_input.shape)}")
print(f"Pre-Net の出力のサイズ: {tuple(out.shape)}")

seqs = torch.Size([1, 33])
デコーダの入力のサイズ: (1, 80)
Pre-Net の出力のサイズ: (1, 256)


In [None]:
class ZoneOutCell(nn.Module):
    def __init__(self, cell, zoneout=0.1):
        super().__init__()
        self.cell = cell
        self.hidden_size = cell.hidden_size
        self.zoneout = zoneout

    def forward(self, inputs, hidden):
        next_hidden = self.cell(inputs, hidden)
        next_hidden = self._zoneout(hidden, next_hidden, self.zoneout)
        return next_hidden

    def _zoneout(self, h, next_h, prob):
        h_0, c_0 = h
        h_1, c_1 = next_h
        h_1 = self._apply_zoneout(h_0, h_1, prob)
        c_1 = self._apply_zoneout(c_0, c_1, prob)
        return h_1, c_1

    def _apply_zoneout(self, h, next_h, prob):
        if self.training:
            mask = h.new(*h.size()).bernoulli_(prob)
            return mask * h + (1 - mask) * next_h
        else:
            return prob * h + (1 - prob) * next_h


# デコーダ

In [215]:
# from ttslearn.tacotron.decoder import ZoneOutCell

class Decoder(nn.Module):
    def __init__(
        self,
        encoder_hidden_dim=512,
        out_dim=80,
        layers=2,
        hidden_dim=1024,
        prenet_layers=2,
        prenet_hidden_dim=256,
        prenet_dropout=0.5,
        zoneout=0.1,
        reduction_factor=1,
        attention_hidden_dim=128,
        attention_conv_channels=32,
        attention_conv_kernel_size=31,
    ):
        super().__init__()
        self.out_dim = out_dim

        # 注意機構
        self.attention = LocationSensitiveAttention(
            encoder_hidden_dim,
            hidden_dim,
            attention_hidden_dim,
            attention_conv_channels,
            attention_conv_kernel_size,
        )
        self.reduction_factor = reduction_factor

        # Prenet
        self.prenet = Prenet(out_dim, prenet_layers, prenet_hidden_dim, prenet_dropout)

        # 片方向LSTM
        self.lstm = nn.ModuleList()
        for layer in range(layers):
            lstm = nn.LSTMCell(
                encoder_hidden_dim + prenet_hidden_dim if layer == 0 else hidden_dim,
                hidden_dim,
            )
            lstm = ZoneOutCell(lstm, zoneout)
            self.lstm += [lstm]

        # 出力への projection 層
        proj_in_dim = encoder_hidden_dim + hidden_dim
        self.feat_out = nn.Linear(proj_in_dim, out_dim * reduction_factor, bias=False)
        self.prob_out = nn.Linear(proj_in_dim, reduction_factor)

    def _zero_state(self, hs):
        init_hs = hs.new_zeros(hs.size(0), self.lstm[0].hidden_size)
        return init_hs

    def forward(self, encoder_outs, in_lens, decoder_targets=None):
        is_inference = decoder_targets is None

        # Reduction factor に基づくフレーム数の調整
        # (B, Lmax, out_dim) ->  (B, Lmax/r, out_dim)
        if self.reduction_factor > 1 and not is_inference:
            decoder_targets = decoder_targets[
                :, self.reduction_factor - 1 :: self.reduction_factor
            ]

        # デコーダの系列長を保持
        # 推論時は、エンコーダの系列長から経験的に上限を定める
        if is_inference:
            max_decoder_time_steps = int(encoder_outs.shape[1] * 10.0)
        else:
            max_decoder_time_steps = decoder_targets.shape[1]

        # ゼロパディングされた部分に対するマスク
        mask = make_pad_mask(in_lens).to(encoder_outs.device)

        # LSTM の状態をゼロで初期化
        h_list, c_list = [], []
        for _ in range(len(self.lstm)):
            h_list.append(self._zero_state(encoder_outs))
            c_list.append(self._zero_state(encoder_outs))

        # デコーダの最初の入力
        go_frame = encoder_outs.new_zeros(encoder_outs.size(0), self.out_dim)
        prev_out = go_frame

        # 1つ前の時刻のアテンション重み
        prev_att_w = None

        # メインループ
        outs, logits, att_ws = [], [], []
        t = 0
        while True:
            # コンテキストベクトル、アテンション重みの計算
            # h_list
            att_c, att_w = self.attention(
                encoder_outs, in_lens, h_list[0], prev_att_w, mask
            )

            # Pre-Net
            prenet_out = self.prenet(prev_out)

            # LSTM
            # hが出力，cがセルの状態
            # xsはコンテキストベクトルとprenetの出力の結合
            xs = torch.cat([att_c, prenet_out], dim=1)
            h_list[0], c_list[0] = self.lstm[0](xs, (h_list[0], c_list[0]))
            for i in range(1, len(self.lstm)):
                h_list[i], c_list[i] = self.lstm[i](
                    h_list[i - 1], (h_list[i], c_list[i])
                )
            # 出力の計算
            hcs = torch.cat([h_list[-1], att_c], dim=1)
            outs.append(self.feat_out(hcs).view(encoder_outs.size(0), self.out_dim, -1))
            logits.append(self.prob_out(hcs))
            att_ws.append(att_w)

            # 次の時刻のデコーダの入力を更新
            if is_inference:
                prev_out = outs[-1][:, :, -1]  # (1, out_dim)
            else:
                # Teacher forcing
                prev_out = decoder_targets[:, t, :]

            # 累積アテンション重み
            prev_att_w = att_w if prev_att_w is None else prev_att_w + att_w

            t += 1
            # 停止条件のチェック
            if t >= max_decoder_time_steps:
                break
            if is_inference and (torch.sigmoid(logits[-1]) >= 0.5).any():
                break

        # # 各時刻の出力を結合
        logits = torch.cat(logits, dim=1)  # (B, Lmax)
        outs = torch.cat(outs, dim=2)  # (B, out_dim, Lmax)
        att_ws = torch.stack(att_ws, dim=1)  # (B, Lmax, Tmax)

        if self.reduction_factor > 1:
            outs = outs.view(outs.size(0), self.out_dim, -1)  # (B, out_dim, Lmax)

        return outs, logits, att_ws

In [None]:
Decoder()

In [216]:
decoder_targets = torch.ones(encoder_outs.shape[0], 120, 80)
print(f"decoder_targets = {decoder_targets.shape}")
decoder = Decoder(encoder_outs.shape[-1], 80)

# Teaccher forcing: decoder_targets (教師データ) を与える
with torch.no_grad():
    outs, logits, att_ws = decoder(encoder_outs, in_lens, decoder_targets)

print(f"デコーダの入力のサイズ: {tuple(decoder_input.shape)}")
print(f"デコーダの出力のサイズ: {tuple(outs.shape)}")
print(f"stop token (logits) のサイズ: {tuple(logits.shape)}")
print(f"アテンション重みのサイズ: {tuple(att_ws.shape)}")

decoder_targets = torch.Size([2, 120, 80])
デコーダの入力のサイズ: (1, 80)
デコーダの出力のサイズ: (2, 80, 120)
stop token (logits) のサイズ: (2, 120)
アテンション重みのサイズ: (2, 120, 33)


## デコーダの確認

In [161]:
encoder_hidden_dim = 512
out_dim=80
layers=2
hidden_dim=1024
prenet_layers=2
prenet_hidden_dim=256
prenet_dropout=0.5
zoneout=0.1
reduction_factor=1
attention_hidden_dim=128
attention_conv_channels=32
attention_conv_kernel_size=31

attention = LocationSensitiveAttention(
            encoder_hidden_dim,
            hidden_dim,
            attention_hidden_dim,
            attention_conv_channels,
            attention_conv_kernel_size,
        )

prenet = Prenet(out_dim, prenet_layers, prenet_hidden_dim, prenet_dropout) 

"""
convs = nn.ModuleList()
for layer in range(conv_layers):
    in_channels = embed_dim if layer == 0 else conv_channels
    convs += [
        nn.Conv1d(
            in_channels,
            conv_channels,
            conv_kernel_size,
            padding=(conv_kernel_size - 1) // 2,
            bias=False,
        ),
        nn.BatchNorm1d(conv_channels),
        nn.ReLU(),
        nn.Dropout(0.5)
    ]
convs = nn.Sequential(*convs)
"""

lstm = nn.ModuleList()
for layer in range(layers):
    lstm_c = nn.LSTMCell(
        encoder_hidden_dim + prenet_hidden_dim if layer == 0 else hidden_dim,
        hidden_dim,
    )
    # print(lstm_c)
    lstm_c = ZoneOutCell(lstm_c, zoneout)
    # print(lstm_c)
    lstm_c = [lstm_c]
    # print(lstm_c)
    lstm += lstm_c
    # print(lstm)
    # print("")

# 出力への projection 層
proj_in_dim = encoder_hidden_dim + hidden_dim
feat_out = nn.Linear(proj_in_dim, out_dim * reduction_factor, bias=False)
prob_out = nn.Linear(proj_in_dim, reduction_factor)

def _zero_state(hs):
        init_hs = hs.new_zeros(hs.size(0), lstm[0].hidden_size)
        return init_hs

In [181]:
print(lstm[0].hidden_size)
print(len(lstm))

1024
2


In [170]:
lstm

ModuleList(
  (0): ZoneOutCell(
    (cell): LSTMCell(768, 1024)
  )
  (1): ZoneOutCell(
    (cell): LSTMCell(1024, 1024)
  )
)

In [178]:
encoder_outs = encoder_outs
in_lens = in_lens
decoder_targets = None
print(f"encoder_outs = {encoder_outs.shape}")

# decoder_targets = Noneのとき，is_inference = True
is_inference = decoder_targets is None
# print(f"is_inference = {is_inference}")

# Reduction factor に基づくフレーム数の調整
# (B, Lmax, out_dim) ->  (B, Lmax/r, out_dim)
# is_inference = Trueでないときに実行
if reduction_factor > 1 and not is_inference:
    decoder_targets = decoder_targets[
        :, reduction_factor - 1 :: reduction_factor
    ]

# デコーダの系列長を保持
# 推論時は、エンコーダの系列長から経験的に上限を定める
if is_inference:
    max_decoder_time_steps = int(encoder_outs.shape[1] * 10.0)
else:
    max_decoder_time_steps = decoder_targets.shape[1]

# ゼロパディングされた部分に対するマスク
mask = make_pad_mask(in_lens).to(encoder_outs.device)

# LSTM の状態をゼロで初期化
# encoder_outs.size[0], lstm[0].hidden_size
h_list, c_list = [], []
for _ in range(len(lstm)):
    h_list.append(_zero_state(encoder_outs))
    c_list.append(_zero_state(encoder_outs))
print(f"h_list = {h_list}")
print(f"c_list = {len(c_list)}")
print(f"_zero_state = {_zero_state(encoder_outs).shape}")

# デコーダの最初の入力
go_frame = encoder_outs.new_zeros(encoder_outs.size(0), out_dim)
print(f"go_frame = {go_frame.shape}")
prev_out = go_frame

# 1つ前の時刻のアテンション重み
prev_att_w = None

# メインループ
outs, logits, att_ws = [], [], []
t = 0
while True:
    # コンテキストベクトル、アテンション重みの計算
    # h_list
    att_c, att_w = attention(
        encoder_outs, in_lens, h_list[0], prev_att_w, mask
    )

    # Pre-Net
    prenet_out = prenet(prev_out)

    # LSTM
    # hが出力，cがセルの状態
    # xsはコンテキストベクトルとprenetの出力の結合
    xs = torch.cat([att_c, prenet_out], dim=1)
    h_list[0], c_list[0] = lstm[0](xs, (h_list[0], c_list[0]))
    for i in range(1, len(lstm)):
        h_list[i], c_list[i] = lstm[i](
            h_list[i - 1], (h_list[i], c_list[i])
        )
    # 出力の計算
    hcs = torch.cat([h_list[-1], att_c], dim=1)
    outs.append(feat_out(hcs).view(encoder_outs.size(0), out_dim, -1))
    logits.append(prob_out(hcs))
    att_ws.append(att_w)

    # 次の時刻のデコーダの入力を更新（prenetの入力）
    if is_inference:
        prev_out = outs[-1][:, :, -1]  # (1, out_dim)
    else:
        # Teacher forcing
        # 学習時は自身の出力を回帰的に参照するのではなく，正しいデータを参照する。
        # 学習時（特に初期）は間違った出力を出すことが予想され，それを参照していては学習が進まない。
        # そのため，正解データを次の出力のために利用して学習していく。
        prev_out = decoder_targets[:, t, :]

    # 累積アテンション重み
    prev_att_w = att_w if prev_att_w is None else prev_att_w + att_w

    t += 1
    # 停止条件のチェック
    if t >= max_decoder_time_steps:
        break
    if is_inference and (torch.sigmoid(logits[-1]) >= 0.5).any():
        break

# 各時刻の出力を結合
logits = torch.cat(logits, dim=1)  # (B, Lmax)
outs = torch.cat(outs, dim=2)  # (B, out_dim, Lmax)
att_ws = torch.stack(att_ws, dim=1)  # (B, Lmax, Tmax)

if reduction_factor > 1:
    outs = outs.view(outs.size(0), out_dim, -1)  # (B, out_dim, Lmax)


encoder_outs = torch.Size([2, 33, 512])
h_list = tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])
c_list = 2
_zero_state = torch.Size([2, 1024])
go_frame = torch.Size([2, 80])


In [None]:
x = torch.rand(3,100,10)
x = torch.arange(3*8*1).view(3,8,1)
reduction_factor = 4
x_r = x[:, reduction_factor-1 :: reduction_factor, :]
x_ = x[:, 1::4, :]
print(f"x = {x}")

In [None]:
print(x_r.shape)
print(f"x_r = {x_r}")
# print(x_.shape)
# print(f"x_ = {x_}")

In [None]:
x = torch.randn(2, 3, 4)
x_0 = torch.cat((x, x, x), 0)
x_1 = torch.cat((x, x, x), 1)
x_2 = torch.cat((x, x, x), 2)

cat_dim = [0, 1, 2]

for i in cat_dim:
    print(f"x_{i} = {torch.cat((x, x, x), i).shape}")

In [None]:
x = torch.tensor((), dtype=float)
x.new_zeros((2,3))


In [182]:
for i in range(1, len(lstm)):
    print(i)

1


In [198]:
# ゼロパディングされた部分に対するマスク
mask = make_pad_mask(in_lens).to(encoder_outs.device)

print(f"encoder_outs = {encoder_outs.shape}")

# LSTM の状態をゼロで初期化
# encoder_outs.size[0], lstm[0].hidden_size
h_list, c_list = [], []
for _ in range(len(lstm)):
    h_list.append(_zero_state(encoder_outs))
    c_list.append(_zero_state(encoder_outs))
print(f"h_list = {h_list}")
print(f"c_list = {len(c_list)}")
print(f"_zero_state = {_zero_state(encoder_outs).shape}")

# デコーダの最初の入力
go_frame = encoder_outs.new_zeros(encoder_outs.size(0), out_dim)
print(f"go_frame = {go_frame.shape}")
prev_out = go_frame

# 1つ前の時刻のアテンション重み
prev_att_w = None

# メインループ
outs, logits, att_ws = [], [], []
t = 0

# コンテキストベクトル、アテンション重みの計算
# h_list
att_c, att_w = attention(
    encoder_outs, in_lens, h_list[0], prev_att_w, mask
)
print(f"att_c = {att_c.shape}")
print(f"att_w = {att_w.shape}")

# Pre-Net
prenet_out = prenet(prev_out)
print(f"prinet_out = {prenet_out.shape}")

# LSTM
# hが出力，cがセルの状態
# xsはコンテキストベクトルとprenetの出力の結合
xs = torch.cat([att_c, prenet_out], dim=1)
print(f"xs = {xs.shape}")
print("")

h_list[0], c_list[0] = lstm[0](xs, (h_list[0], c_list[0]))
# print(f"h_list = {h_list} \nc_list = {c_list}")

for i in range(1, len(lstm)):
    h_list[i], c_list[i] = lstm[i](
        h_list[i - 1], (h_list[i], c_list[i])
    )
    # print(f"h_list = {h_list} \nc_list = {c_list}")

# 出力の計算
hcs = torch.cat([h_list[-1], att_c], dim=1)
print(f"hcs = {hcs.shape}")

outs.append(feat_out(hcs).view(encoder_outs.size(0), out_dim, -1))
print(f"feat_out(hcs) = {feat_out(hcs).shape}")
print(f"feat_out(hcs)_transposed = {feat_out(hcs).view(encoder_outs.size(0), out_dim, -1).shape}")
# print(f"outs = {len(outs)}")

logits.append(prob_out(hcs))
print(f"prob_out = {prob_out(hcs).shape}")

att_ws.append(att_w)

encoder_outs = torch.Size([2, 33, 512])
h_list = [tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]]), tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])]
c_list = 2
_zero_state = torch.Size([2, 1024])
go_frame = torch.Size([2, 80])
att_c = torch.Size([2, 512])
att_w = torch.Size([2, 33])
prinet_out = torch.Size([2, 256])
xs = torch.Size([2, 768])

hcs = torch.Size([2, 1536])
feat_out(hcs) = torch.Size([2, 80])
feat_out(hcs)_transposed = torch.Size([2, 80, 1])
prob_out = torch.Size([2, 1])


In [209]:
print(outs[-1][:, :, -1].shape)
print(outs[-1].shape)

torch.Size([2, 80])
torch.Size([2, 80, 1])


In [210]:
x = [[1, 2, 3], [4, 5, 6]]
x[-1][-1]

6

# Postnet

In [217]:
class Postnet(nn.Module):
    def __init__(
        self,
        in_dim=80,
        layers=5,
        channels=512,
        kernel_size=5,
        dropout=0.5,
    ):
        super().__init__()
        postnet = nn.ModuleList()
        for layer in range(layers):
            in_channels = in_dim if layer == 0 else channels
            out_channels = in_dim if layer == layers - 1 else channels
            postnet += [
                nn.Conv1d(
                    in_channels,
                    out_channels,
                    kernel_size,
                    stride=1,
                    padding=(kernel_size - 1) // 2,
                    bias=False,
                ),
                nn.BatchNorm1d(out_channels),
            ]
            if layer != layers - 1:
                postnet += [nn.Tanh()]
            postnet += [nn.Dropout(dropout)]
        self.postnet = nn.Sequential(*postnet)

    def forward(self, xs):
        return self.postnet(xs)


In [218]:
Postnet()

Postnet(
  (postnet): Sequential(
    (0): Conv1d(80, 512, kernel_size=(5,), stride=(1,), padding=(2,), bias=False)
    (1): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): Tanh()
    (3): Dropout(p=0.5, inplace=False)
    (4): Conv1d(512, 512, kernel_size=(5,), stride=(1,), padding=(2,), bias=False)
    (5): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): Tanh()
    (7): Dropout(p=0.5, inplace=False)
    (8): Conv1d(512, 512, kernel_size=(5,), stride=(1,), padding=(2,), bias=False)
    (9): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (10): Tanh()
    (11): Dropout(p=0.5, inplace=False)
    (12): Conv1d(512, 512, kernel_size=(5,), stride=(1,), padding=(2,), bias=False)
    (13): BatchNorm1d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (14): Tanh()
    (15): Dropout(p=0.5, inplace=False)
    (16): Conv1d(512, 80, kernel_size=(5,), strid