<a href="https://colab.research.google.com/github/swetha-rana/Assignment_3/blob/main/rnn_seq_to_seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Below cells installs the necessary modules

In [None]:
%%capture
!pip install wandb

Below cell imports the necessary modules

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
import pandas as pd
import wandb
import copy

Below cell downloads the dataset and untars it

In [None]:
!wget https://storage.googleapis.com/gresearch/dakshina/dakshina_dataset_v1.0.tar
!tar -xvf dakshina_dataset_v1.0.tar

Data processing cell

In [14]:
def data_preprocessing(path,ip_token=None,ip_len=None,output_token=None,output_len=None):
  
  ip_transcription= []
  output_transcription= []
  
  df= pd.read_csv(path,names=["1", "2","3"],sep="\t").astype(str)
  if ip_token is None:
      df=df.sample(frac=1)
  for index, row in df.iterrows():
      ip_text=row['2']
      op_text= row['1']
      if ip_text=='</s>' or op_text =='</s>':
        continue
      op_text= "\t" + op_text + "\n"
      ip_transcription.append(ip_text)
      output_transcription.append(op_text)
  
  if ip_token is None:
    ip_token= tf.keras.preprocessing.text.Tokenizer(filters='', char_level=True)
    ip_token.fit_on_texts(ip_transcription)
  input_text= ip_token.texts_to_sequences(ip_transcription)
  input_text= tf.keras.preprocessing.sequence.pad_sequences(input_text,padding='post')

  if output_token is None:
    output_token= tf.keras.preprocessing.text.Tokenizer(filters='', char_level=True)
    output_token.fit_on_texts(output_transcription)

  output_text= output_token.texts_to_sequences(output_transcription)
  output_text= tf.keras.preprocessing.sequence.pad_sequences(output_text,padding='post')

  if ip_len is not None and output_len is not None:
      input_text=tf.concat([input_text,tf.zeros((input_text.shape[0],ip_len-input_text.shape[1]))],axis=1)
      output_text=tf.concat([output_text,tf.zeros((output_text.shape[0],output_len-output_text.shape[1]))],axis=1)

  return ip_transcription,input_text,ip_token,output_transcription,output_text,output_token

In [15]:
train_ip_transcription,train_input_text,train_ip_token,train_output_transcription,train_output_text,train_output_token=data_preprocessing("/content/dakshina_dataset_v1.0/ta/lexicons/ta.translit.sampled.train.tsv")

test_ip_transcription,test_input_text,test_ip_token,test_output_transcription,test_output_text,test_output_token=data_preprocessing("/content/dakshina_dataset_v1.0/ta/lexicons/ta.translit.sampled.test.tsv",train_ip_token,train_input_text.shape[1],train_output_token,train_output_text.shape[1])

val_ip_transcription,val_input_text,val_ip_token,val_output_transcription,val_output_text,val_output_token=data_preprocessing("/content/dakshina_dataset_v1.0/ta/lexicons/ta.translit.sampled.dev.tsv",train_ip_token,train_input_text.shape[1],train_output_token,train_output_text.shape[1])

encoder_tokens = len(train_ip_token.word_index)+1

encoder_seq_length =  train_input_text.shape[1]

decoder_tokens = len(train_output_token.word_index)+1

decoder_seq_length = train_output_text.shape[1]

index_to_char_input = dict((train_ip_token.word_index[key], key) for key in train_ip_token.word_index.keys())

index_to_char_target = dict((train_output_token.word_index[key], key) for key in train_output_token.word_index.keys())

In [16]:
def seq_seq_model(rnn_type,embed_dim,encoder_layers,decoder_layers,dropout):
  
  encoder_inputs = keras.Input(shape=(encoder_seq_length))
  embed = keras.layers.Embedding(encoder_tokens, embed_dim)(encoder_inputs)
  last_encoder=None
  
  if rnn_type=='RNN':
    for i in range(encoder_layers-1):      
      encoder = keras.layers.SimpleRNN(latent_dim, return_sequences=True,dropout=dropout)
      if i==0:
        enc_out = encoder(embed)
      else:
        enc_out = encoder(last_encoder)
      last_encoder=enc_out
    encoder = keras.layers.SimpleRNN(latent_dim, return_state=True,dropout=dropout)
    if encoder_layers == 1:
      encoder_outputs, state = encoder(embed)
    else:
      encoder_outputs, state = encoder(last_encoder)
    encoder_states = [state]  
    decoder_inputs = keras.Input(shape=(decoder_seq_length))
    embed = keras.layers.Embedding(decoder_tokens, embed_dim)(decoder_inputs)
    for i in range(decoder_layers):
      decoder_lstm = keras.layers.SimpleRNN(latent_dim, return_sequences=True, return_state=True,dropout=dropout)
      if i==0:
        decoder_outputs, _= decoder_lstm(embed, initial_state=encoder_states)
      else:  
        decoder_outputs, _ = decoder_lstm(last, initial_state=encoder_states)
      last=decoder_outputs
    decoder_dense = keras.layers.Dense(decoder_tokens, activation="softmax",name='final')
    decoder_outputs = decoder_dense(last)

  elif rnn_type=='GRU':
    for i in range(encoder_layers-1):
      encoder = keras.layers.GRU(latent_dim, return_sequences=True,dropout=dropout)
      if i==0:
        enc_out = encoder(embed)
      else:
        enc_out = encoder(last_encoder)
      last_encoder=enc_out
    encoder = keras.layers.GRU(latent_dim, return_state=True,dropout=dropout)
    if encoder_layers == 1:
      encoder_outputs, state = encoder(embed)
    else:
      encoder_outputs, state = encoder(last_encoder)
    encoder_states = [state]

    decoder_inputs = keras.Input(shape=(decoder_seq_length))
    embed = keras.layers.Embedding(decoder_tokens, embed_dim)(decoder_inputs)  
    
    for i in range(decoder_layers):
      decoder_lstm = keras.layers.GRU(latent_dim, return_sequences=True, return_state=True,dropout=dropout)
      if i==0:
        decoder_outputs, _= decoder_lstm(embed, initial_state=encoder_states)
      else:  
        decoder_outputs, _ = decoder_lstm(last, initial_state=encoder_states)
      last=decoder_outputs
    decoder_dense = keras.layers.Dense(decoder_tokens, activation="softmax",name='final')
    decoder_outputs = decoder_dense(last)

  elif rnn_type=='LSTM':
    for i in range(encoder_layers-1):
      encoder = keras.layers.LSTM(latent_dim, return_sequences=True,dropout=dropout)
      if i==0:
        enc_out = encoder(embed)
      else:
        enc_out = encoder(last_encoder)
      last_encoder=enc_out
    encoder = keras.layers.LSTM(latent_dim, return_state=True,dropout=dropout)
    if encoder_layers == 1:
      encoder_outputs, state_h, state_c = encoder(embed)
    else:
      encoder_outputs, state_h, state_c = encoder(last_encoder)
    encoder_states = [state_h, state_c]

    decoder_inputs = keras.Input(shape=(decoder_seq_length))
    embed = keras.layers.Embedding(decoder_tokens, embed_dim)(decoder_inputs)  

    for i in range(decoder_layers):
      decoder_lstm = keras.layers.LSTM(latent_dim, return_sequences=True, return_state=True,dropout=dropout)
      if i==0:
        decoder_outputs, _, _ = decoder_lstm(embed, initial_state=encoder_states)
      else:  
        decoder_outputs, _, _ = decoder_lstm(last, initial_state=encoder_states)
      last=decoder_outputs
    decoder_dense = keras.layers.Dense(decoder_tokens, activation="softmax",name='final')
    decoder_outputs = decoder_dense(last)

  model = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)
  return model


In [18]:
def inference_(model,encoder_layers,decoder_layers):
    encoder_inputs = model.input[0]  
    if isinstance(model.layers[encoder_layers+3], keras.layers.LSTM):
      encoder_outputs, state_h_enc, state_c_enc = model.layers[encoder_layers+3].output  
      encoder_states = [state_h_enc, state_c_enc]
    elif isinstance(model.layers[encoder_layers+3], keras.layers.GRU):
      encoder_outputs, state = model.layers[encoder_layers+3].output  
      encoder_states = [state]
    elif isinstance(model.layers[encoder_layers+3], keras.layers.RNN):
      encoder_outputs, state = model.layers[encoder_layers+3].output  
      encoder_states = [state]
    encoder_model = keras.Model(encoder_inputs, encoder_states)
    decoder_inputs =  keras.Input(shape=( 1))  

    if isinstance(model.layers[encoder_layers+3], keras.layers.RNN):
      decoder_states_inputs=[]
      decoder_states=[]
      last=None
      for i in range(decoder_layers):
        decoder_state_input = keras.Input(shape=(latent_dim,),name='inp3_'+str(i))
        x = [decoder_state_input]
        decoder_lstm = model.layers[i+encoder_layers+4]
        if i==0:
          decoder_outputs, state = decoder_lstm(
              model.layers[i+encoder_layers+2](decoder_inputs), initial_state=x
          )
        else:
          decoder_outputs, state = decoder_lstm(
              last, initial_state=x 
          )
        last=decoder_outputs
        decoder_states_inputs.append (decoder_state_input)
        decoder_states.append (state)      

    elif isinstance(model.layers[encoder_layers+3], keras.layers.GRU):
      decoder_states_inputs=[]
      decoder_states=[] 
      last=None
      for i in range(decoder_layers):
        decoder_state_input = keras.Input(shape=(latent_dim,),name='inp3_'+str(i))
        x = [decoder_state_input]
        decoder_lstm = model.layers[i+encoder_layers+4]
        if i==0:
          decoder_outputs, state = decoder_lstm(
              model.layers[i+encoder_layers+2](decoder_inputs), initial_state=x
          )
        else:
          decoder_outputs, state = decoder_lstm(
              last, initial_state=x 
          )
        last=decoder_outputs
        decoder_states_inputs.append (decoder_state_input)
        decoder_states.append (state)    
    
    elif isinstance(model.layers[encoder_layers+3], keras.layers.LSTM):
      decoder_states_inputs=[]
      decoder_states=[]
      last=None
      for i in range(decoder_layers):
        decoder_state_input_h = keras.Input(shape=(latent_dim,),name='inp3_'+str(i))
        decoder_state_input_c = keras.Input(shape=(latent_dim,),name='inp4_'+str(i))
        x = [decoder_state_input_h, decoder_state_input_c]
        decoder_lstm = model.layers[i+encoder_layers+4]
        if i==0:
          decoder_outputs, state_h_dec, state_c_dec = decoder_lstm(
              model.layers[i+encoder_layers+2](decoder_inputs), initial_state=x
          )
        else:
          decoder_outputs, state_h_dec, state_c_dec = decoder_lstm(
              last, initial_state=x 
          )
        last=decoder_outputs
        decoder_states_inputs.append (decoder_state_input_h)
        decoder_states_inputs.append (decoder_state_input_c)
        decoder_states.append (state_h_dec)
        decoder_states.append (state_c_dec)
    decoder_dense = model.get_layer('final')
    decoder_outputs = decoder_dense(last)
    decoder_model = keras.Model( [decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states )
    return encoder_model,decoder_model
