In [1]:
import os
cwd_old = os.getcwd()
os.chdir('..')
cwd_new = os.getcwd()
print("The working directory is moved from {} to {}.".format(cwd_old, cwd_new))

The working directory is moved from /data2/sungjaecho/Projects/tacotron2/dev_ipynb to /data2/sungjaecho/Projects/tacotron2.


# 1. Implementing Gaussian Attention Network

## 1.1. Location Sensitive Attention Network

In [3]:
from torch import nn
from layers import ConvNorm, LinearNorm

In [4]:
class Attention(nn.Module):
    def __init__(self, attention_rnn_dim, embedding_dim, attention_dim,
                 attention_location_n_filters, attention_location_kernel_size):
        super(Attention, self).__init__()
        self.query_layer = LinearNorm(attention_rnn_dim, attention_dim,
                                      bias=False, w_init_gain='tanh')
        self.memory_layer = LinearNorm(embedding_dim, attention_dim, bias=False,
                                       w_init_gain='tanh')
        self.v = LinearNorm(attention_dim, 1, bias=False)
        self.location_layer = LocationLayer(attention_location_n_filters,
                                            attention_location_kernel_size,
                                            attention_dim)
        self.score_mask_value = -float("inf")

    def get_alignment_energies(self, query, processed_memory,
                               attention_weights_cat):
        """
        PARAMS
        ------
        query: decoder output (batch, n_mel_channels * n_frames_per_step)
        processed_memory: processed encoder outputs (B, T_in, attention_dim)
        attention_weights_cat: cumulative and prev. att weights (B, 2, max_time)

        RETURNS
        -------
        alignment (batch, max_time)
        """

        processed_query = self.query_layer(query.unsqueeze(1))
        processed_attention_weights = self.location_layer(attention_weights_cat)
        energies = self.v(torch.tanh(
            processed_query + processed_attention_weights + processed_memory))

        energies = energies.squeeze(-1)
        return energies

    def forward(self, attention_hidden_state, memory, processed_memory,
                attention_weights_cat, mask):
        """
        PARAMS
        ------
        attention_hidden_state: attention rnn last output
        memory: encoder outputs
        processed_memory: processed encoder outputs
        attention_weights_cat: previous and cummulative attention weights
        mask: binary mask for padded data
        """
        alignment = self.get_alignment_energies(
            attention_hidden_state, processed_memory, attention_weights_cat)

        if mask is not None:
            alignment.data.masked_fill_(mask, self.score_mask_value)

        attention_weights = F.softmax(alignment, dim=1)
        attention_context = torch.bmm(attention_weights.unsqueeze(1), memory)
        attention_context = attention_context.squeeze(1)

        return attention_context, attention_weights

## 1.2. Gaussian Monotonic Attention Network

In [6]:
class MontonicAttention(nn.Module):
    def __init__(self, attention_rnn_dim, embedding_dim, attention_dim,
                 attention_location_n_filters, attention_location_kernel_size):
        super(MontonicAttention, self).__init__()
        self.query_layer = LinearNorm(attention_rnn_dim, attention_dim,
                                      bias=False, w_init_gain='tanh')
        self.memory_layer = LinearNorm(embedding_dim, attention_dim, bias=False,
                                       w_init_gain='tanh')
        self.mean_layer = LinearNorm(attention_dim, 10, bias=False, w_init_gain='sigmoid')
        self.logvar_layer = LinearNorm(attention_dim, 1, bias=False, w_init_gain='linear')

        self.location_layer = LocationLayer(attention_location_n_filters,
                                            attention_location_kernel_size,
                                            attention_dim)
        self.score_mask_value = 0

        self.prev_means = None
        self.prev_vars = None


    def normal_pdf(self, batch_txt_length, means, stds):
        '''
        PARAMS
        -----
        batch_txt_length: int.
        means: torch.Tensor.
        - size: [batch_size]
        stds: torch.Tensor.
        - size: [batch_size]

        RETURNS
        -----
        p: torch.Tensor.
        - size: [batch_size, batch_txt_length]
        '''
        enc_steps = batch_txt_length
        batch_size = means.size(0)

        means = means.unsqueeze(1).expand(means.size(0), enc_steps)
        stds = stds.unsqueeze(1).expand(stds.size(0), enc_steps)

        x = torch.Tensor(np.arange(enc_steps).reshape((1, enc_steps))).cuda()
        x = x.expand(batch_size, enc_steps)

        p = Normal(means, stds).cdf(x+0.5) - Normal(means, stds).cdf(x-0.5)
        # p_sum is a normalizing factor to make the sum across the encoding dimension 1.
        p_sum = p.sum(dim=1, keepdim=True).expand(p.size())
        p = p / p_sum

        return p

    def forward(self, attention_hidden_state, memory, processed_memory,
                attention_weights_cat, mask):
        """
        PARAMS
        ------
        attention_hidden_state: attention rnn last output (batch, n_mel_channels * n_frames_per_step)
        memory: encoder outputs (B, T_in, attention_dim)
        processed_memory: processed encoder outputs (B, T_in, encoder_embedding_dim)
        attention_weights_cat: previous and cummulative attention weights (B, 2, max_time)
        mask: binary mask for padded data
        """

        processed_query = self.query_layer(attention_hidden_state.unsqueeze(1))
        processed_attention_weights = self.location_layer(attention_weights_cat)
        pred_features = torch.tanh(processed_query + processed_attention_weights + processed_memory)
        # pred_features.size == (B, T_in, attention_dim)

        # Average pooling across the second dimension
        # avgpooled.size == (B, attention_dim)
        avgpooled = pred_features.mean(dim=1)

        # mean_increments.size == (B, 10)
        mean_increments = F.sigmoid(self.mean_layer(avgpooled))
        # mean_increment.size == (B)
        mean_increment = mean_increments.sum(dim=-1)
        if self.prev_means is None:
            self.prev_means = torch.zeros_like(mean_increment).cuda()
        means = self.prev_means + mean_increment
        self.prev_means = means

        # stds.size == (B)
        variances = self.logvar_layer(avgpooled)
        stds = variances.squeeze(-1).exp().sqrt()
        self.prev_vars = variances

        batch_txt_length = memory.size(1)
        attention_weights = self.normal_pdf(batch_txt_length, means, stds)

        if mask is not None:
            attention_weights.data.masked_fill_(mask, self.score_mask_value)

        attention_context = torch.bmm(attention_weights.unsqueeze(1), memory)
        attention_context = attention_context.squeeze(1)

        return attention_context, attention_weights

In [4]:
import torch
from torch.distributions.normal import Normal
import numpy as np

In [165]:
enc_steps = 10
B = 32

In [166]:
means = torch.Tensor(np.arange(B))

In [150]:
stds = torch.Tensor(np.arange(B))

In [167]:
means.size()

torch.Size([32])

In [171]:
means = means.unsqueeze(1).expand(means.size(0), enc_steps)

In [172]:
means.size()

torch.Size([32, 10])

In [173]:
stds.size()

torch.Size([32])

In [174]:
stds = stds.unsqueeze(1).expand(stds.size(0), enc_steps)

In [175]:
stds.size()

torch.Size([32, 10])

In [176]:
x = torch.Tensor(np.arange(enc_steps).reshape((1, enc_steps)))
x = x.expand(B, enc_steps)

In [178]:
x.size()

torch.Size([32, 10])

In [179]:
x[0:2,:]

tensor([[0., 1., 2., 3., 4., 5., 6., 7., 8., 9.],
        [0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]])

In [181]:
means.size(), stds.size(), x.size()

(torch.Size([32, 10]), torch.Size([32, 10]), torch.Size([32, 10]))

In [183]:
p = Normal(means, stds).cdf(x+0.5) - Normal(means, stds).cdf(x-0.5)

In [184]:
p.size()

torch.Size([32, 10])

In [185]:
p_sum = p.sum(dim=1, keepdim=True).expand(p.size())

In [186]:
p = p / p_sum

In [198]:
means = torch.Tensor(np.arange(B))
stds = torch.Tensor(np.arange(B))
batch_txt_length = 10

In [199]:
enc_steps = batch_txt_length
batch_size = means.size(0)
        
means = means.unsqueeze(1).expand(means.size(0), enc_steps)
stds = stds.unsqueeze(1).expand(stds.size(0), enc_steps)

x = torch.Tensor(np.arange(enc_steps).reshape((1, enc_steps)))
x = x.expand(batch_size, enc_steps)

p = Normal(means, stds).cdf(x+0.5) - Normal(means, stds).cdf(x-0.5)
# p_sum is a normalizing factor to make the sum across the encoding dimension 1.
p_sum = p.sum(dim=1, keepdim=True).expand(p.size()) 
p = p / p_sum       

In [202]:
p.unsqueeze(1).size()

torch.Size([32, 1, 10])

In [205]:
memory = torch.ones(batch_size, batch_txt_length, 512)
memory.size()

torch.Size([32, 10, 512])

In [207]:
torch.bmm(p.unsqueeze(1), memory).size()

torch.Size([32, 1, 512])

In [209]:
torch.bmm(p.unsqueeze(1), memory).squeeze(1).size()

torch.Size([32, 512])