In [None]:
import tensorflow as tf
import numpy as np
import random

In [None]:
from tensorflow.keras import initializers

In [None]:
class Linear(tf.Module):
  def __init__(self, input_dim: int, output_dim: int, bias: bool = True):
    super(Linear, self).__init__()
    #self.init = initializers.GlorotUniform()
    #self.weights = tf.Variable(self.init([input_dim, output_dim]))
    self.weights = tf.Variable(tf.random.normal([input_dim, output_dim]))
    self.bias = tf.Variable(tf.zeros([output_dim]))
    #print("input ", input_dim)
    #print("output : ", output_dim)
    #print("weights : ", self.weights.shape)
    if bias:
      self.bias = tf.Variable(tf.zeros([output_dim]))
      

  def __call__(self, x):
    #print(tf.matmul(x, self.weights).shape)
    #print("x : ", x.shape)
    #print("weights : ", self.weights.shape)
    #print(self.bias.shape)
    y = tf.matmul(x, self.weights) + self.bias

    return tf.nn.relu(y)

In [None]:
class Linear2(tf.keras.Model):
  def __init__(self, input_dim, output_dim):
    super(Linear2, self).__init__()
    self.input = tf.keras.layers.Input((output_dim,))
    self.linear = tf.keras.layers.Dense(units=output_dim)
                    
    
  def call(self, x):
    x = self.input(x)
    x = self.linear(x)
    return x


In [None]:
class ConvBlock(tf.Module):
  def __init__(self,
               input_dim: int,
               output_dim: int,
               kernel_size: int,
               padding: str,
               dropout_p: float = 0.5,
               activation = 'relu') -> None:

    super(ConvBlock, self).__init__()

    self.conv = tf.keras.models.Sequential([
                                            #tf.keras.layers.Input(shape=(input_dim, output_dim)),
                                            tf.keras.layers.Conv1D(output_dim, kernel_size= kernel_size, strides= 1, padding= padding),
                                            tf.keras.layers.BatchNormalization(),
                                            tf.keras.layers.ReLU(),
                                            tf.keras.layers.Dropout(rate= dropout_p)

    ])

  def call(self, inputs):
    return self.conv(inputs)

In [None]:
class PreNet(tf.keras.Model):
  def __init__(self, input_dim: int, output_dim: int, dropout_p: float) -> None:
    super(PreNet, self).__init__()

    self.fully_connectd_layers = tf.keras.models.Sequential([
                                                             Linear(input_dim, output_dim),
                                                             tf.keras.layers.ReLU(),
                                                             tf.keras.layers.Dropout(rate= dropout_p),
                                                             Linear(output_dim, output_dim),
                                                             tf.keras.layers.ReLU(),
                                                             tf.keras.layers.Dropout(rate= dropout_p)
    ])

  def call(self, input):
    return self.fully_connectd_layers(input)

In [None]:
class Encoder(tf.keras.Model):
  def __init__(self,
               vocab_size:int,
               embedding_dim: int = 512,
               encoder_lstm_dim: int = 256,
               num_lstm_layers: int = 1,
               conv_dropout_p: float = 0.5,
               num_conv_layers: int = 3,
               conv_kernel_size: int = 5) -> None:

    super(Encoder, self).__init__()
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.convs_layers = tf.keras.models.Sequential([
                                                    ConvBlock(embedding_dim,
                                                              embedding_dim,
                                                              kernel_size = conv_kernel_size,
                                                              padding = 'valid',
                                                              dropout_p = conv_dropout_p
                                                              ) for _ in range(num_conv_layers)
               ])
    self.lstm = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(num_lstm_layers))

    


  def call(self, inputs, input_lenghts):
    inputs = self.embedding(inputs)
    #inputs = inputs.transpose(1, 2)
    #print(inputs.shape)
    inputs = tf.transpose(inputs, perm=[1,2,0])

    inputs = self.convs_layers(inputs)
    inputs = tf.transpose(inputs, perm=[1,2,0])

    output= self.lstm(inputs)
    return output



In [None]:
    #self.lstm = tf.keras.Model.Sequential([
    #                                       tf.keras.layers.Input(shape= (embedding_dim, num_lstm_layers)),
    #                                       tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(num_lstm_layers))
    #])

In [None]:
class LocationSensitiveAtention(tf.keras.Model):
  def __init__(self,
               lstm_hidden_dim: int = 1024,
               embedding_dim: int = 512,
               attn_dim: int = 128,
               location_conv_filter_size: int = 32,
               location_conv_kernel_size: int = 31) -> None:

    super(LocationSensitiveAtention, self).__init__()
    self.attn_dim = attn_dim
    #self.query_proj = Linear(lstm_hidden_dim, attn_dim, bias= False)
    self.query_proj = Linear2(lstm_hidden_dim, attn_dim)
    #self.value_proj = Linear(embedding_dim, attn_dim, bias = False)
    self.value_proj = Linear2(embedding_dim, attn_dim)
    #self.align_proj = Linear(attn_dim, 1, bias = True)
    self.align_proj = Linear2(attn_dim, 1)

    self.bias = tf.Variable(tf.random.uniform(shape= (attn_dim,), minval= -0.1, maxval= 0.1))

    self.location_conv = tf.keras.layers.Conv1D(filters= location_conv_filter_size,
                                                kernel_size = location_conv_kernel_size,
                                                use_bias = False
                                                )
    self.location_proj = Linear(location_conv_filter_size, attn_dim, bias= False)
  
  def call(self, query, value, last_alignment):
    batch_size = query.shape[0]
    print(query.shape)
    query = tf.expand_dims(query, 1)
    print(query.shape)
    #query = query.unsqueeze(1)

    last_alignment = self.location_conv(last_alignment)
    last_alignment = tf.transpose(last_alignment, perm= [0,1,2])
    last_alignment = self.location_proj(last_alignment)
    #print(tf.reshape(query, shape=[-1, query.shape[2]]).shape)
    #print(self.query_proj(tf.reshape(query, shape=[-1, query.shape[2]])))
    #alignment = tf.squeeze(self.align_proj(tf.math.tanh(
    #    tf.reshape(self.query_proj(tf.reshape(query, shape=[-1, query.shape[2]])), shape= [batch_size, -1, self.attn_dim])
    #    + tf.reshape(self.value_proj(tf.reshape(value, shape=[-1, value.shape[2]])), shape= [batch_size, -1, self.attn_dim])
    #    + last_alignment
    #    + self.bias
    #)), axis = -1)

    #alignment = tf.nn.softmax(alignment, axis= -1)

    #context = tf.matmul(tf.squeeze(alignment, 1), value)
    #context = tf.squeeze(context, 1)

    #return context, alignment

    

In [None]:
batch_size = 3
seq_length = 100
query_dim = 1024
value_dim = 512
align_dim = 2

query = tf.random.uniform(shape= (batch_size, 1, query_dim), minval= -0.01, maxval= 0.01)
value = tf.random.uniform(shape= (batch_size, seq_length, value_dim), minval= -0.01, maxval= 00.1)
align = tf.random.uniform(shape= (batch_size, seq_length, align_dim), minval= -0.01, maxval= 0.01)

attention = LocationSensitiveAtention()
output = attention(query, value, align)


In [None]:
class Decoder(tf.keras.Model):
  def __init__(self,
               num_mel_bins: int = 80,
               prenet_dim: int = 256,
               decoder_lstm_dim: int = 1024,
               attn_lstm_dim: int = 1024,
               embedding_dim: int = 512,
               attn_dim: int = 128,
               location_conv_filter_size: int = 32,
               location_conv_kernel_size: int = 31,
               prenet_dropout_p: float = 0.5,
               attn_dropout_p: float = 0.1,
               decoder_dropout_p: float = 0.1,
               max_decoding_step: int = 1000,
               stop_threshold: float = 0.5) -> None:

    
    super(Decoder, self).__init__()
    self.num_mel_bins = num_mel_bins
    self.max_decoding_step = max_decoding_step
    self.decoder_lstm_dim = decoder_lstm_dim
    self.attn_lstm_dim = attn_lstm_dim
    self.embedding_dim = embedding_dim
    self.attn_dropout_p = attn_dropout_p
    self.decoder_dropout_p = decoder_dropout_p
    self.stop_threshold = stop_threshold

    self.prenet = PreNet(self.num_mel_bins, prenet_dim, prenet_dropout_p)
    self.lstm = [tf.keras.layers.LSTMCell(attn_lstm_dim),
                 tf.keras.layers.LSTMCell(decoder_lstm_dim)]

    
    self.attention = LocationSensitiveAtention(
        lstm_hidden_dim = decoder_lstm_dim,
        embedding_dim = embedding_dim,
        attn_dim = attn_dim,
        location_conv_filter_size = location_conv_filter_size,
        location_conv_kernel_size = location_conv_kernel_size
    )

    self.mel_generator = Linear(decoder_lstm_dim + embedding_dim, num_mel_bins)
    self.stop_generator = Linear(decoder_lstm_dim + embedding_dim, 1)


  def _init_decoder_states(self, encoder_outputs):

    lstm_outputs = list()
    lstm_hiddens = list()

    batch_size =encoder_outputs.shape[0]
    seq_length = encoder_outputs.shape[1]

    lstm_outputs.append(tf.zeros(shape = [batch_size, self.attn_lstm_dim]))
    lstm_outputs.append(tf.zeros(shape = [batch_size, self.decoder_lstm_dim]))


    lstm_hiddens.append(tf.zeros(shape = [batch_size, self.attn_lstm_dim]))
    lstm_hiddens.append(tf.zeros(shape = [batch_size, self.decoder_lstm_dim]))


    alignment = tf.zeros(shape= [batch_size, seq_length])
    alignment_cum = tf.zeros(shape= [batch_size, seq_length])
    context = tf.zeros(shape= [batch_size, self.embedding_dim])

    return {
        "lstm_outputs" : lstm_outputs,
        "lstm_hiddens" : lstm_hiddens,
        "alignment" : alignment,
        "alignment_cum" : alignment_cum,
        "context" : context
    }

  def parse_decoder_outputs(self, mel_outputs: list, stop_outputs: list, alignment: list):
    stop_outputs = tf.transpose(tf.stack(stop_outputs), perm= [0,0,1])
    alignment = tf.transpose(tf.stack(alignment), perm= [0,0,1])

    mel_outputs = tf.transpose(tf.stack(mel_outputs), perm= [0,0,1])
    mel_outputs = tf.reshape(mel_outputs, shape = [mel_outputs.shape[0], -1, self.num_mel_bins])
    mel_outputs = tf.transpose(mel_outputs, perm= [0,1,2])

    return {
        "mel_outputs" : mel_outputs,
        "stop_outputs" : stop_outputs,
        "alignments" : alignment
    }

  def forward_step(self, input_var,
                   encoder_outputs,
                   lstm_outputs,
                   lstm_hiddens,
                   alignment,
                   alignment_cum,
                   context) -> None:


   input_var = tf.squeeze(input_var, 1)
   input_var = tf.concat([input_var, context], axis= -1)

   lstm_outputs[1], lstm_hiddens[1] = self.lstm[0](input_var, (lstm_outputs[0], lstm_hiddens[0]))
   lstm_outputs[1] = tf.nn.dropout(lstm_outputs[1], rate= self.decoder_dropout_p)

   concated_alignment = tf.concat([tf.expand_dims(alignment, 1), tf.expand_dims(alignment_cum, 1)], 1)
   context, alignment = self.attention(lstm_outputs[0], encoder_outputs, concated_alignment)
   alignment_cum += alignment

   output = tf.concat([lstm_hiddens[1], context], axis= -1)

   mel_output = self.mel_generator(output)
   stop_output = self.stop_generator(output)

   return {
       "mel_output" : mel_output,
       "stop_output" : stop_output,
       "alignment" : alignment,
       "alignment_cum" : alignment_cum,
       "context" : context,
       "lstm_outputs" : lstm_outputs,
       "lstm_hiddens" : lstm_hiddens
   }

  def call(self,
              encoder_outputs,
              inputs,
              teacher_forcing_ratio: float = 1.0) -> None:

    
    mel_outputs, stop_outputs, alignments = list(), list(), list()
    use_teacher_forcing = True if random.random() < teacher_forcing_ratio else False
    print(encoder_outputs.shape)
    inputs, max_decoding_step = self.validate_args(encoder_outputs, inputs, teacher_forcing_ratio)
    decoder_states = self._init_decoder_states(encoder_outputs)

    if use_teacher_forcing:
      inputs = self.prenet(inputs)

      for di in range(max_decoding_step):
        input_var = tf.expand_dims(inputs[:, di, :], 1)
        decoder_states = self.forward_step(
            input_var = input_var,
            encoder_outputs = encoder_outputs,
            lstm_outputs = decoder_states["lstm_outputs"],
            lstm_hiddens = decoder_states["lstm_hiddens"],
            alignment = decoder_states["alignment"],
            alignment_cum = decoder_states["alignment_cum"],
            context = decoder_states["context"]
        )

        mel_outputs.append(decoder_states["mel_outputs"])
        stop_outputs.append(decoder_states["stop_output"])
        alignments.append(decoder_states["alignment"])

    
    else:
      input_var = inputs

      for di in range(max_decoding_step):
        input_var = self.prenet(input_var)
        decoder_states = self.forward_step(
            input_var = input_var,
            encoder_outputs = encoder_outputs,
            lstm_outputs = decoder_states["lstm_outputs"],
            lstm_hiddens = decoder_states["lstm_hiddens"],
            alignment = decoder_states["alignment"],
            alignment_cum = decoder_states["alignment_cum"],
            context = decoder_states["context"]
        )

        
        mel_outputs.append(decoder_states["mel_output"])
        stop_outputs.append(decoder_states["stop_output"])
        alignments.append(decoder_states["alignment"])

        if tf.math.sigmoid(decoder_states["stop_output"]) > self.stop_threshold:
          break
        
        input_var = decoder_states["mel_output"]
    return self.parse_decoder_outputs(mel_outputs, stop_outputs, alignments)


  def validate_args(self,
                   encoder_outputs,
                   inputs,
                   teacher_forcing_ratio: float = 1.0):
    

    #assert encoder_outputs in not None

    batch_size = encoder_outputs.shape[0]

    if input is None:

      inputs = tf.zeros(shape= [batch_size, self.num_mel_bins])
      max_decoding_step = self.max_decoding_step

      if teacher_forcing_ratio > 0:
        raise ValueError("Teacher forcing has to be disabled")

    else:
      go_frame = tf.expand_dims(tf.zeros(shape= [batch_size, self.num_mel_bins]), 1)
      print(batch_size, inputs.shape)
      inputs = tf.reshape(inputs, shape = [batch_size, inputs.shape[1], -1])


      inputs =tf.concat([go_frame, inputs], axis= 1)
      max_decoding_step = inputs.shape[1] - 1

    return inputs, max_decoding_step


    




In [None]:
batch_size = 3
input_seq_length = 10
output_seq_length = 100
encoder_embedding_dim = 512
n_mels = 80

encoder_outputs = tf.random.uniform(shape= (batch_size, input_seq_length, encoder_embedding_dim), minval= -0.1, maxval= 0.1)
decoder_inputs = tf.random.uniform(shape= (batch_size, output_seq_length, n_mels), minval= -0.1, maxval= 0.1)

decoder = Decoder()
output = decoder(encoder_outputs, decoder_inputs)

In [None]:
class PostNet(tf.keras.Model):
  def __init__(self,
               num_mel_bins: int = 80,
               postnet_dim: int = 512,
               num_conv_layers: int = 3,
               kernel_size: int = 5,
               dropout_p: float = 0.5):
    
    super(PostNet, self).__init__()

    self.conv_layers = list()
    self.conv_layers.append(
        ConvBlock(
            input_dim = postnet_dim,
            output_dim = postnet_dim,
            kernel_size = kernel_size,
            padding = 'valid',
            dropout_p = dropout_p,
            activation = 'relu' # must -> tanh
      ))
    
    for _ in range(num_conv_layers - 2):
      self.conv_layers.append(
        ConvBlock(
            input_dim = postnet_dim,
            output_dim = postnet_dim,
            kernel_size = kernel_size,
            padding = 'valid',
            dropout_p = dropout_p,
            activation = 'relu' # must -> tanh
      ))


    self.conv_layers.append(
        ConvBlock(
            input_dim = postnet_dim,
            output_dim = postnet_dim,
            kernel_size = kernel_size,
            padding = 'valid',
            dropout_p = dropout_p,
            activation = 'relu' # must -> tanh
      ))
    

    def call(self, x):
      for conv_layer in self.conv_layers:
        x = conv_layer(x)

      return x

In [None]:
class Tacotron2(tf.keras.Model):
  def __init__(self, args) -> None:
    super(Tacotron2, self).__init__()
    self.encoder = Encoder(
        vocab_size = args.vocab_size,
        embedding_dim = args.embedding_dim,
        encoder_lstm_dim = args.encoder_lstm_dim,
        num_lstm_layers = args.num_encoder_lstm_layers,
        conv_dropout_p =args.conv_dropout_p,
        num_conv_layers = args.num_encoder_conv_layers,
        conv_kernel_size = args.encoder_conv_kernel_size
    )

    self.decoder = Decoder(      
        num_mel_bins = args.num_mel_bins,
        prenet_dim = args.prenet_dim,
        decoder_lstm_dim = args.decoder_lstm_dim,
        attn_lstm_dim = args.attn_lstm_dim,
        embedding_dim = args.embedding_dim,
        attn_dim = args.attn_dim,
        location_conv_filter_size = args.location_conv_filter_size,
        location_conv_kernel_size = args.location_conv_kernel_size,
        prenet_dropout_p = args.prenet_dropout_p,
        attn_dropout_p = args.attn_dropout_p,
        decoder_dropout_p = args.decoder_dropout_p,
        max_decoding_step = args.max_decoding_step,
        stop_threshold = args.stop_threshold
    )


    self.postnet = PostNet(
        num_mel_bins = args.num_mel_bins,
        postnet_dim = args.postnet_dim,
        num_conv_layers = args.num_postnet_conv_layers,
        kernel_size = args.postnet_conv_kernel_size,
        dropout_p = args.postnet_dropout_p

    )

  def call(self,
           inputs,
           input_lenghts,
           targets,
           teacher_forcing_ratio: float = 1.0):
    
    enocder_outputs = self.encoder(inputs, input_lenghts)
    decoder_outputs = self.decoder(enocder_outputs, targets, teacher_forcing_ratio)


    postnet_outputs = self.postnet(decoder_outputs["mel_outputs"])
    decoder_outputs["mel_outputs"] += postnet_outputs


    return decoder_outputs

In [None]:
class DefaultArgument:
  def __init__(self):
    # encoder arguments
    self.vocab_size = 10
    self.embedding_dim = 512
    self.encoder_lstm_dim = 256
    self.num_encoder_lstm_layers = 1
    self.conv_dropout_p = 0.5
    self.num_encoder_conv_layers = 3
    self.encoder_conv_kernel_size = 5


    # decoder arguments
    self.num_mel_bins = 80
    self.prenet_dim = 256
    self.decoder_lstm_dim = 1024
    self.attn_lstm_dim = 1024
    self.attn_dim = 128
    self.location_conv_filter_size = 32
    self.location_conv_kernel_size = 31
    self.prenet_dropout_p = 0.5
    self.attn_dropout_p = 0.1
    self.decoder_dropout_p = 0.1
    self.max_decoding_step = 1000
    self.stop_threshold = 0.5

    # postnet arguments
    self.postnet_dim = 512
    self.num_postnet_conv_layers = 5
    self.postnet_conv_kernel_size = 5
    self.postnet_dropout_p = 0.5

In [None]:
batch_size = 3
seq_length = 3

inputs = tf.constant(np.arange(batch_size * seq_length).reshape(batch_size, seq_length))
input_lengths = tf.constant([3, 3, 2])
targets = tf.random.uniform(shape= (batch_size, 100, 80), minval= -0.1, maxval= 0.1)


args = DefaultArgument()
model = Tacotron2(args)
output = model(inputs, input_lengths, targets)

In [None]:
import torch


In [None]:
a = torch.range(1, 16)
torch.equal(a.view(4,4,1), a.view(4,4,-1))

  """Entry point for launching an IPython kernel.


True

In [None]:
tf.random.uniform(shape= (3,100, 80), minval= -0.1, maxval= 0.1)