<a href="https://colab.research.google.com/github/snigdhagogineni/RNN_Machine_Translation/blob/main/BigdataProject_MachineTranslationUsingRNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Installation and setting up spark (Referred graph frames lab)
!rm -rf spark-3.1.1-bin-hadoop3.2
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q findspark pyspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
import findspark
findspark.init()
from pyspark.sql import SparkSession
#sc carries the spark context
sc = SparkSession.builder \
    .config("spark.jars", "/usr/local/lib/python3.10/dist-packages/pyspark/jars/graphframes-0.8.2-spark3.3.2-s_2.11.jar") \
    .getOrCreate()

sc.conf.set("spark.sql.repl.eagerEval.enabled", True)

  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:

!wget https://raw.githubusercontent.com/snigdhagogineni/big_data/main/dataset.csv -O dataset.csv

--2024-05-08 01:24:19--  https://raw.githubusercontent.com/snigdhagogineni/big_data/main/dataset.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.110.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.111.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8557837 (8.2M) [text/plain]
Saving to: ‘dataset.csv’


2024-05-08 01:24:20 (23.6 MB/s) - ‘dataset.csv’ saved [8557837/8557837]



In [None]:
#Importing the dataset from public hosted github
input_df = sc.read.option("header", "true").option("inferSchema", "true").csv("dataset.csv")

In [None]:
input_df.show(5)

+---+-------+-------+
| id|english|spanish|
+---+-------+-------+
|  0|     go|     ve|
|  1|     go|   vete|
|  2|     go|   vaya|
|  3|     go| vayase|
|  4|     hi|   hola|
+---+-------+-------+
only showing top 5 rows



In [None]:
input_df.count() #There are 120614 records

120614

In [None]:
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer,StringIndexer
#Cleaning the data by removing the special characters.
input_df = input_df.withColumn("english", regexp_replace(input_df["english"], "[^a-z\s]", ""))
input_df = input_df.withColumn("spanish", regexp_replace(input_df["spanish"], "[^a-zñ\s]", ""))


In [None]:
input_df.show(5) #Analyzing the first 5 rows

+---+-------+-------+
| id|english|spanish|
+---+-------+-------+
|  0|     go|     ve|
|  1|     go|   vete|
|  2|     go|   vaya|
|  3|     go| vayase|
|  4|     hi|   hola|
+---+-------+-------+
only showing top 5 rows



In [None]:
#Spanish language has only one extra character apart from english - ñ
st_tok = '<st>' #Signifies start padding and end tokens
pd_tok = '<pad>'
e_tok = '<end>'

en_voc = [st_tok,' ','a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l','m', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x','y', 'z',pd_tok, e_tok]
sp_voc = en_voc+['ñ']

In [None]:
#converting sentences to words and to indexed words using string indexer
from pyspark.sql.functions import col,explode,lit,collect_list
tokenizer = Tokenizer(inputCol="english", outputCol="english_words")
df = tokenizer.transform(input_df)
df = df.select("id","english", explode("english_words").alias("english_words"))
indexer = StringIndexer(inputCol="english_words", outputCol="indices")
indexed_df = indexer.fit(df).transform(df)
indexed_df = indexed_df.withColumn("indices", col("indices").cast("int"))
indexed_df = indexed_df.withColumn("indexed_english_words", col("indices") + lit(1))
english_indexed_df = indexed_df.groupby("english","id").agg(collect_list("indexed_english_words").alias("english_indexed_words"))
english_indexed_df.show(truncate=False)

+---------------------------------------------------------+------+--------------------------------------------------------+
|english                                                  |id    |english_indexed_words                                   |
+---------------------------------------------------------+------+--------------------------------------------------------+
|a baby has delicate skin                                 |39504 |[6, 472, 42, 3901, 1885]                                |
|a bad cold caused the singer to lose his voice           |106381|[6, 247, 273, 1270, 1, 1314, 3, 632, 24, 947]           |
|a bad habit once formed is difficult to get rid of       |112625|[6, 247, 1451, 334, 4199, 7, 352, 3, 60, 1484, 10]      |
|a ball hit her right leg                                 |39505 |[6, 997, 602, 41, 143, 1123]                            |
|a ball hit the back of my head while i was playing soccer|115573|[6, 997, 602, 1, 112, 10, 18, 702, 291, 2, 13, 376, 868]|
|a baseb

In [None]:
#converting sentences to words and to indexed words using string indexer for spanish
tokenizer = Tokenizer(inputCol="spanish", outputCol="spanish_words")
df = tokenizer.transform(input_df)
df = df.select("id","spanish", explode("spanish_words").alias("spanish_words"))
indexer = StringIndexer(inputCol="spanish_words", outputCol="indices")
indexed_df = indexer.fit(df).transform(df)
indexed_df = indexed_df.withColumn("indices", col("indices").cast("int"))
indexed_df = indexed_df.withColumn("indexed_spanish_words", col("indices") + lit(1))
spanish_indexed_df = indexed_df.groupby("spanish","id").agg(collect_list("indexed_spanish_words").alias("spanish_indexed_words"))
spanish_indexed_df.show(truncate=False)

+---------------------------------------------------------------------------------------------------------+------+------------------------------------------------------------------------------------+
|spanish                                                                                                  |id    |spanish_indexed_words                                                               |
+---------------------------------------------------------------------------------------------------------+------+------------------------------------------------------------------------------------+
|a algunas personas les gustan los gatos otras prefieren los perros                                       |99113 |[4, 341, 224, 197, 287, 19, 647, 1121, 5866, 19, 546]                               |
|a algunas personas no les gusta levantarse temprano por la manana                                        |114815|[4, 341, 224, 5, 197, 56, 1690, 314, 14, 7, 77]                                     |


In [None]:
from pyspark.sql import functions as F1
# Count the total unique words in input
total_unique_words_input = english_indexed_df.select("english_indexed_words").distinct().count()

# Calculate the length of the longest sentence in input
length_longest_sentence_input = english_indexed_df.groupBy("id").agg(F1.max(F1.size("english_indexed_words")).alias("max_length")).agg(F1.max("max_length")).collect()[0][0]
# Count the total unique words in output
total_unique_words_output =spanish_indexed_df.select("spanish_indexed_words").distinct().count()

# Calculate the length of the longest sentence in output
length_longest_sentence_output = spanish_indexed_df.groupBy("id").agg(F1.max(F1.size("spanish_indexed_words")).alias("max_length")).agg(F1.max("max_length")).collect()[0][0]
padding_length=max(length_longest_sentence_input,length_longest_sentence_output)
print("Total unique words in input:", total_unique_words_input)
print("Length of longest sentence in input:", length_longest_sentence_input)
print("Total unique words in output:", total_unique_words_output)
print("Length of longest sentence in output:", length_longest_sentence_output)


Total unique words in input: 103989
Length of longest sentence in input: 47
Total unique words in output: 114337
Length of longest sentence in output: 53


In [None]:
#Padding indexed words to make all words of equal length
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, IntegerType
max_length = padding_length
def pad_sequence(sequence):
    padded_sequence = sequence + [0] * (max_length - len(sequence))
    return padded_sequence

pad_sequence_udf = udf(pad_sequence, ArrayType(IntegerType()))

spanish_padded_df = spanish_indexed_df.withColumn("spanish_padded_words", pad_sequence_udf(spanish_indexed_df["spanish_indexed_words"]))

spanish_padded_df.show(truncate=False)


+---------------------------------------------------------------------------------------------------------+------+------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|spanish                                                                                                  |id    |spanish_indexed_words                                                               |spanish_padded_words                                                                                                                                                                      |
+---------------------------------------------------------------------------------------------------------+------+------------------------------------------------------------------------------------+---------------------------

In [None]:
#Padding indexed words to make all words of equal length

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
max_length = padding_length
def pad_sequence(sequence):
    padded_sequence = sequence + [0] * (max_length - len(sequence))
    return padded_sequence

pad_sequence_udf = udf(pad_sequence, ArrayType(IntegerType()))

english_padded_df = english_indexed_df.withColumn("english_padded_words", pad_sequence_udf(english_indexed_df["english_indexed_words"]))

english_padded_df.show(truncate=False)

+---------------------------------------------------------+------+--------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|english                                                  |id    |english_indexed_words                                   |english_padded_words                                                                                                                                                            |
+---------------------------------------------------------+------+--------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|a baby has delicate skin                                 |39504 |[6, 472, 42, 3901, 1885]       

In [None]:
#Joining english and spanish dataframes together
english_spanish_padded_df=english_padded_df.join(spanish_padded_df,english_padded_df.id==spanish_padded_df.id)

In [None]:
english_spanish_padded_df.show()

+--------------------+------+---------------------+--------------------+--------------------+------+---------------------+--------------------+
|             english|    id|english_indexed_words|english_padded_words|             spanish|    id|spanish_indexed_words|spanish_padded_words|
+--------------------+------+---------------------+--------------------+--------------------+------+---------------------+--------------------+
|a baby has delica...| 39504| [6, 472, 42, 3901...|[6, 472, 42, 3901...|un bebe tiene la ...| 39504| [10, 336, 37, 7, ...|[10, 336, 37, 7, ...|
|a bad cold caused...|106381| [6, 247, 273, 127...|[6, 247, 273, 127...|un serio resfriad...|106381| [10, 875, 1236, 2...|[10, 875, 1236, 2...|
|a bad habit once ...|112625| [6, 247, 1451, 33...|[6, 247, 1451, 33...|es dificil deshac...|112625| [9, 193, 6976, 2,...|[9, 193, 6976, 2,...|
|a ball hit her ri...| 39505| [6, 997, 602, 41,...|[6, 997, 602, 41,...|una pelota le peg...| 39505| [16, 1180, 21, 15...|[16, 1180, 21,

In [None]:
#Creating indices for english and spanish vocab
in_to_sp = {k:v for k,v in enumerate(sp_voc)}
sp_to_in = {v:k for k,v in enumerate(sp_voc)}
in_to_en = {k:v for k,v in enumerate(en_voc)}
en_to_in = {v:k for k,v in enumerate(en_voc)}

In [None]:
en_sent = [row['english'] for row in english_spanish_padded_df.select('english').collect()]
sp_sent = [row['spanish'] for row in english_spanish_padded_df.select('spanish').collect()]


In [None]:
en_sent[:10]

['got it',
 'call us',
 'hi guys',
 'tom left',
 'is it bad',
 'tom moved',
 'count on it',
 'get started',
 'im serious',
 'its normal']

In [None]:
sp_sent[:10]

['entendiste',
 'llamanos',
 'que pasa troncos',
 'tom se fue',
 'es malo',
 'tom se mudo',
 'cuenta con eso',
 'empieza',
 'lo digo en serio',
 'es normal']

In [None]:
#check_tokens verifies validity of a sentence based on vocab
m_seq_l = 200

def check_tokens(s, v):
    for t in list(set(s)):
        if t not in v:
            print(t)
            return False
    return True
#check_size verifies size of a sentence based on m_seq_l

def check_size(s, m_seq_l):
    #print(len(list(s)))
    return len(list(s)) < (m_seq_l - 1)

vs_ind = []
for i in range(len(sp_sent)):
    sp_s, en_s = sp_sent[i], en_sent[i]
    if check_size(sp_s, m_seq_l) \
      and check_size(en_s, m_seq_l) \
      and check_tokens(sp_s, sp_voc):
        vs_ind.append(i)

In [None]:
len(vs_ind) #out of 120614 sentences 120607 are valid

120607

In [None]:
sp_sent = [sp_sent[i] for i in vs_ind]
en_sent = [en_sent[i] for i in vs_ind]

In [None]:
len(sp_sent)

120607

In [None]:
sp_sent[:3]

['entendiste', 'llamanos', 'que pasa troncos']

In [None]:
print(len(sp_sent))
print(len(en_sent))

120607
120607


In [None]:
#Converting valid sentences to spark dataframe
filtered_data = zip(en_sent, sp_sent)
data_rdd = sc.sparkContext.parallelize(filtered_data)
final_data = sc.createDataFrame(data_rdd, ["english", "spanish"])

In [None]:
final_data.show()

+------------+--------------------+
|     english|             spanish|
+------------+--------------------+
|      got it|          entendiste|
|     call us|            llamanos|
|     hi guys|    que pasa troncos|
|    tom left|          tom se fue|
|   is it bad|             es malo|
|   tom moved|         tom se mudo|
| count on it|      cuenta con eso|
| get started|             empieza|
|  im serious|    lo digo en serio|
|  its normal|           es normal|
| they called|            llamaron|
| this is bad|         eso es malo|
| this is ice|       esto es hielo|
| tom giggled|tom solto una risita|
|come with us|  venid con nosotros|
| i cant sing|     no puedo cantar|
| ill be free|          sere libre|
| im a farmer|     soy un granjero|
|is that mine|       es ese el mio|
|please leave|  por favor marchese|
+------------+--------------------+
only showing top 20 rows



In [None]:
#Splitting available dataset to train and test data : 80/20 split
train_ratio = 0.8
test_ratio = 1 - train_ratio
train_df, test_df = final_data.randomSplit([train_ratio, test_ratio], seed=42)
print("Training set size:", train_df.count())
print("Testing set size:", test_df.count())

Training set size: 96445
Testing set size: 24162


In [None]:
train_df.show(5)

+--------------------+--------------------+
|             english|             spanish|
+--------------------+--------------------+
|a baby has delica...|un bebe tiene la ...|
|a baby is sleepin...|un bebe duerme en...|
|a band of robbers...|una banda de ladr...|
|a beam of sunligh...|un rayo de sol at...|
|   a bear killed tom|   un oso mato a tom|
+--------------------+--------------------+
only showing top 5 rows



In [None]:
import numpy as np
import torch
import math
from torch import nn
import torch.nn.functional as F
#getting available processing device
def get_device():
    return torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
#getting scaled dot product of scalars
def sc_d_p(q, k, v, m=None):
    dk1 = q.size()[-1]
    sc = torch.matmul(q, k.transpose(-1, -2)) / math.sqrt(dk1)
    if m is not None:
        sc = sc.permute(1, 0, 2, 3) + m
        sc = sc.permute(1, 0, 2, 3)
    at = F.softmax(sc, dim=-1)
    val = torch.matmul(at, v)
    return val, at
#defining forward pass of encoder layer
class PosEnc(nn.Module):
    def __init__(self, d_m, m_seq_l):
        super().__init__()
        self.m_seq_l, self.d_m = m_seq_l, d_m

    def forward(self):
        ei = torch.arange(0, self.d_m, 2).float()
        denom = torch.pow(10000, ei/self.d_m)
        pos = (torch.arange(self.m_seq_l)
                          .reshape(self.m_seq_l, 1))
        e_P, o_P = torch.sin(pos / denom), torch.cos(pos/ denom)
        s = torch.stack([e_P, o_P], dim=2)
        pe = torch.flatten(s, start_dim=1, end_dim=2)
        return pe
#defining sentence embedding layer
class SentEmb(nn.Module):
    def __init__(self, m_seq_l, d_m, l_to_i, st_tok, e_tok, pd_tok):
        super().__init__()
        self.vocab_size, self.m_seq_l = len(l_to_i), m_seq_l
        self.embedding, self.l_to_i = nn.Embedding(self.vocab_size, d_m), l_to_i
        self.pos_enc, self.dout = PosEnc(d_m, m_seq_l), nn.Dropout(p=0.1)
        self.st_tok, self.e_tok, self.pd_tok = st_tok, e_tok, pd_tok

    def bat_toks(self, b, st_tok, e_tok):
        #generating tokens from sentence
        def get_tokens(se, st_tok, e_tok):
            swi = [self.l_to_i[token] for token in list(se)]
            if st_tok:
                swi.insert(0, self.l_to_i[self.st_tok])
            if e_tok:
                swi.append(self.l_to_i[self.e_tok])
            for _ in range(len(swi), self.m_seq_l):
                swi.append(self.l_to_i[self.pd_tok])
            return torch.tensor(swi)

        toks = []
        for se_num in range(len(b)):
           toks.append( get_tokens(b[se_num], st_tok, e_tok) )
        toks = torch.stack(toks)
        return toks.to(get_device())
    #forward pass of embedding layer
    def forward(self, x1, st_tok, e_tok):
        x1 = self.bat_toks(x1, st_tok, e_tok)
        x1 = self.embedding(x1)
        pos = self.pos_enc().to(get_device())
        x1 = self.dout(x1 + pos)
        return x1

#defining forward pass of multi head attention layer
class MHAttn(nn.Module):
    def __init__(self, d_m, n_h):
        super().__init__()
        self.d_m, self.n_h = d_m, n_h
        self.h_d = d_m // n_h
        self.qkvl = nn.Linear(d_m , 3 * d_m)
        self.ll = nn.Linear(d_m, d_m)

    def forward(self, x, mask):
        b_size, sl, d_m = x.size()
        qkv1 = self.qkvl(x)
        qkv1 = qkv1.reshape(b_size, sl, self.n_h, 3 * self.h_d)
        qkv1 = qkv1.permute(0, 2, 1, 3)
        q1, k1, v1 = qkv1.chunk(3, dim=-1)
        val, at = sc_d_p(q1, k1, v1, mask)
        val = val.permute(0, 2, 1, 3).reshape(b_size, sl, self.n_h * self.h_d)
        o = self.ll(val)
        return o

#forward pass of normalization layer
class LayerNorm(nn.Module):
    def __init__(self, ps, eps=1e-5):
        super().__init__()
        self.ps, self.eps = ps, eps
        self.gamma = nn.Parameter(torch.ones(ps))
        self.beta =  nn.Parameter(torch.zeros(ps))

    def forward(self, vs):
        d = [-(i + 1) for i in range(len(self.ps))]
        m = vs.mean(dim=d, keepdim=True)
        v = ((vs - m) ** 2).mean(dim=d, keepdim=True)
        s = (v+ self.eps).sqrt()
        y = (vs - m) / s
        o= self.gamma * y + self.beta
        return o

#forward pass of feed forward layer
class PosWiseFF(nn.Module):
    def __init__(self, d_m, h, dp=0.1):
        super(PosWiseFF, self).__init__()
        self.li1 = nn.Linear(d_m, h)
        self.li2 = nn.Linear(h, d_m)
        self.relu, self.dout = nn.ReLU(), nn.Dropout(p=dp)

    def forward(self, x1):
        x1 = self.li1(x1)
        x1 = self.relu(x1)
        x1 = self.dout(x1)
        x1 = self.li2(x1)
        return x1

#forward pass of encoder layer
class EncLayer(nn.Module):
    def __init__(self, d_m, ffn_h, n_h, dp):
        super(EncLayer, self).__init__()
        self.at = MHAttn(d_m=d_m, n_h=n_h)
        self.n1, self.dout1 = LayerNorm(ps=[d_m]), nn.Dropout(p=dp)
        self.ffn = PosWiseFF(d_m=d_m, h=ffn_h, dp=dp)
        self.n2, self.dout2 = LayerNorm(ps=[d_m]), nn.Dropout(p=dp)


    def forward(self, y, sam):
        res_y = y.clone()
        y = self.at(y, mask=sam)
        y = self.dout1(y)
        y = self.n1(y + res_y)
        res_y = y.clone()
        y = self.ffn(y)
        y = self.dout2(y)
        y = self.n2(y + res_y)
        return y
#forward pass of sequential encoder layer
class SeqEnc(nn.Sequential):
    def forward(self, *vs):
        y, sam  = vs
        for module in self._modules.values():
            y = module(y, sam)
        return y
#forward pass of encoder
class Enc(nn.Module):
    def __init__(self,d_m,ffn_h,n_h,dp,n_l,m_seq_l,l_to_i,st_tok,e_tok,pd_tok):
        super().__init__()
        self.se_em = SentEmb(m_seq_l, d_m, l_to_i, st_tok, e_tok, pd_tok)
        self.ls = SeqEnc(*[EncLayer(d_m, ffn_h, n_h, dp)
                                      for _ in range(n_l)])

    def forward(self, y, sam, st_tok, e_tok):
        y = self.se_em(y, st_tok, e_tok)
        y= self.ls(y, sam)
        return y

#forward pass of multi head cross attention layer
class MHCAttention(nn.Module):
    def __init__(self, d_m, n_h):
        super().__init__()
        self.d_m, self.n_h = d_m, n_h
        self.h_d = d_m // n_h
        self.kvl = nn.Linear(d_m , 2 * d_m)
        self.q_layer = nn.Linear(d_m , d_m)
        self.ll = nn.Linear(d_m, d_m)

    def forward(self, x1, y1, mask):
        b_size, sl, d_m = x1.size()
        kv1 = self.kvl(x1)
        q1 = self.q_layer(y1)
        kv1 = kv1.reshape(b_size, sl, self.n_h, 2 * self.h_d)
        q1 = q1.reshape(b_size, sl, self.n_h, self.h_d)
        kv1 = kv1.permute(0, 2, 1, 3)
        q1 = q1.permute(0, 2, 1, 3)
        k1, v1 = kv1.chunk(2, dim=-1)
        val, at = sc_d_p(q1, k1, v1, mask)
        val = val.permute(0, 2, 1, 3).reshape(b_size, sl, d_m)
        o = self.ll(val)
        return o

#forward pass of decoder layer
class DecLayer(nn.Module):
    def __init__(self, d_m, ffn_h, n_h, dp):
        super(DecLayer, self).__init__()
        self.sa = MHAttn(d_m=d_m, n_h=n_h)
        self.lnorm1, self.dout1 = LayerNorm(ps=[d_m]), nn.Dropout(p=dp)

        self.ed_at = MHCAttention(d_m=d_m, n_h=n_h)
        self.lnorm2, self.dout2 = LayerNorm(ps=[d_m]), nn.Dropout(p=dp)

        self.ffn = PosWiseFF(d_m=d_m, h=ffn_h, dp=dp)
        self.lnorm3, self.dout3 = LayerNorm(ps=[d_m]), nn.Dropout(p=dp)

    def forward(self, x1, y1, sam, cam):
        _y1 = y1.clone()
        y1 = self.sa(y1, mask=sam)
        y1 = self.dout1(y1)
        y1 = self.lnorm1(y1 + _y1)

        _y1 = y1.clone()
        y1 = self.ed_at(x1, y1, mask=cam)
        y1 = self.dout2(y1)
        y1 = self.lnorm2(y1 + _y1)

        _y1 = y1.clone()
        y1 = self.ffn(y1)
        y1 = self.dout3(y1)
        y1 = self.lnorm3(y1 + _y1)
        return y1

#forward pass of sequential decoder layer
class SeqDec(nn.Sequential):
    def forward(self, *vs):
        x1, y1, sam, cam = vs
        for module in self._modules.values():
            y1 = module(x1, y1, sam, cam)
        return y1
#forward pass of decoder
class Dec(nn.Module):
    def __init__(self,d_m,ffn_h,n_h,dp,n_l,m_seq_l,l_to_i,st_tok,e_tok,pd_tok):
        super().__init__()
        self.se_em = SentEmb(m_seq_l, d_m, l_to_i, st_tok, e_tok, pd_tok)
        self.ls = SeqDec(*[DecLayer(d_m, ffn_h, n_h, dp) for _ in range(n_l)])

    def forward(self, x1, y1, sam, cam, st_tok, e_tok):
        y1 = self.se_em(y1, st_tok, e_tok)
        y1 = self.ls(x1, y1, sam, cam)
        return y1

#Defining neural network architecture based on encoder and decoder
class NeuralNet(nn.Module):
    def __init__(self,d_m,ffn_h,n_h,dp,n_l,m_seq_l,sp_voc_s,en_to_in,sp_to_in,st_tok,e_tok,pd_tok
                ):
        super().__init__()
        self.enc = Enc(d_m, ffn_h, n_h, dp, n_l, m_seq_l, en_to_in, st_tok, e_tok, pd_tok)
        self.dec = Dec(d_m, ffn_h, n_h, dp, n_l, m_seq_l, sp_to_in, st_tok, e_tok, pd_tok)
        self.li, self.device = nn.Linear(d_m, sp_voc_s), torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

    def forward(self,x1,y1,enc_sam=None,dec_sam=None,dec_cam=None,enc_st_tok=False,enc_e_tok=False,dec_st_tok=False, dec_e_tok=False):
        x1 = self.enc(x1, enc_sam, st_tok=enc_st_tok, e_tok=enc_e_tok)
        o = self.dec(x1, y1, dec_sam, dec_cam, st_tok=dec_st_tok, e_tok=dec_e_tok)
        o = self.li(o)
        return o


In [None]:
#Defining parameters for neural networks
import torch
d_m = 512
dp = 0.1
ffn_h = 2048
m_seq_l = 200
sp_voc_s = len(sp_voc)
n_h = 8
b_size = 30
n_l = 1
neuralnet = NeuralNet(d_m,ffn_h,n_h,dp,n_l,m_seq_l,sp_voc_s,en_to_in,sp_to_in,st_tok,e_tok,pd_tok)

In [None]:
#helping load the dataset from dataframe
from torch.utils.data import Dataset, DataLoader

class TextDataset(Dataset):
    def __len__(self):
        return len(self.en_sent)
    def __getitem__(self, idx):
        return self.en_sent[idx], self.sp_sent[idx]
    def __init__(self, en_sent, sp_sent):
        self.en_sent = en_sent
        self.sp_sent = sp_sent

In [None]:
en_sent = [row['english'] for row in train_df.select('english').collect()]
sp_sent = [row['spanish'] for row in train_df.select('spanish').collect()]

In [None]:
len(en_sent)

96445

In [None]:
data = TextDataset(en_sent, sp_sent)
train_data = DataLoader(data, b_size)

In [None]:
#defining loss and optimization criteria for network
from torch import nn

criterian = nn.CrossEntropyLoss(ignore_index=sp_to_in[pd_tok],reduction='none')

for p in neuralnet.parameters():
    if p.dim() > 1:nn.init.xavier_uniform_(p)
device = get_device()
optimizer1 = torch.optim.Adam(neuralnet.parameters(), lr=1e-4)


In [None]:
#generating the mask values for encoder and decoder of neural network
least_value = -1e8

def generate_mask_values(eng_b, sp_b):
    sent_count = len(eng_b)
    la_m = torch.full([m_seq_l, m_seq_l] , True)
    la_m = torch.triu(la_m, diagonal=1)
    e_pad_m = torch.full([sent_count, m_seq_l, m_seq_l] , False)
    d_pad_m_sa = torch.full([sent_count, m_seq_l, m_seq_l] , False)
    d_pad_m_ca = torch.full([sent_count, m_seq_l, m_seq_l] , False)

    for i in range(sent_count):
      en_sen_l, sp_sen_l = len(eng_b[i]), len(sp_b[i])
      en_ct_pad_m = np.arange(en_sen_l + 1, m_seq_l)
      sp_ct_pad_m = np.arange(sp_sen_l + 1, m_seq_l)
      e_pad_m[i, :, en_ct_pad_m] = True
      e_pad_m[i, en_ct_pad_m, :] = True
      d_pad_m_sa[i, :, sp_ct_pad_m] = True
      d_pad_m_sa[i, sp_ct_pad_m, :] = True
      d_pad_m_ca[i, :, en_ct_pad_m] = True
      d_pad_m_ca[i, sp_ct_pad_m, :] = True

    enc_sam = torch.where(e_pad_m, least_value, 0)
    dec_sam =  torch.where(la_m + d_pad_m_sa, least_value, 0)
    dec_cam = torch.where(d_pad_m_ca, least_value, 0)
    return enc_sam, dec_sam, dec_cam

In [None]:
#Training the neural network based on epoch_ct parameter
neuralnet.train()
neuralnet.to(device)
total_loss = 0
epoch_ct = 5 #Specifies the no of epochs
train_losses=[]
for epoch in range(epoch_ct):
    print(f"Epoch number {epoch}")
    it_obj = iter(train_data)
    for bnum, bdata in enumerate(it_obj):
        neuralnet.train()
        eng_b, sp_b = bdata
        enc_sam, dec_sam, dec_cam = generate_mask_values(eng_b, sp_b)
        optimizer1.zero_grad()
        pred_sp = neuralnet(eng_b,sp_b,enc_sam.to(device),dec_sam.to(device),dec_cam.to(device),enc_st_tok=False,enc_e_tok=False,dec_st_tok=True,dec_e_tok=True)
        label_data = neuralnet.dec.se_em.bat_toks(sp_b, st_tok=False, e_tok=True)
        data_loss = criterian(pred_sp.view(-1, sp_voc_s).to(device),label_data.view(-1).to(device)).to(device)
        indx_val = torch.where(label_data.view(-1) == sp_to_in[pd_tok], False, True)
        data_loss = data_loss.sum() / indx_val.sum()
        data_loss.backward()
        optimizer1.step()
        train_losses.append(data_loss.item())
        if bnum % 100 == 0:
            print(f"Iter number {bnum} : {data_loss.item()}")
            print(f"source clause: {eng_b[0]}")
            print(f"actual target clause: {sp_b[0]}")
            sp_pred_sen = torch.argmax(pred_sp[0], axis=1)
            sen_pred = ""
            for idx in sp_pred_sen:
              if idx == sp_to_in[e_tok]:
                break
              else:
                sen_pred += in_to_sp[idx.item()]
            print(f"predicted target clause: {sen_pred}")
            neuralnet.eval()
            print("-------------------------------------------")

Epoch number 0
Iter number 0 : 3.898242235183716
source clause: a baby has delicate skin
actual target clause: un bebe tiene la piel sensible
predicted target clause: wwn<pad>i<pad>f<pad><pad>rrrvnrn<pad>rbwnv<pad>n<pad>s<pad><pad>v<pad><pad><pad><pad><pad>vv<pad>aa<pad><pad><pad><pad>avvaiiaaai<pad>aaaaaaa<pad>iaaa<pad><pad><pad><pad><pad>iia<pad>aq<pad><pad><pad><pad><pad><pad>rarrv<pad>avawvwa<pad><pad>wwtv<pad>v<pad>v<pad>w<pad>wn<pad>w<pad>ivwawvvawi<pad>wwww<pad><pad>bawni<pad><pad>nvnnnwwnaaaavniiiin
-------------------------------------------
Iter number 100 : 2.746222734451294
source clause: do you have more than one copy of this key
actual target clause: acaso tienes mas de una copia de esta llave
predicted target clause: eeene  e e a ne  e eenae aneeeeennnenneee
-------------------------------------------
Iter number 200 : 2.726428270339966
source clause: he didnt like to be punished
actual target clause: a el no le gustaba que le castigaran
predicted target clause: e e  e  

In [None]:
#making predictions using the trained model
neuralnet.eval()
def predict(en_sen):
  en_sen = (en_sen,)
  sp_sen = ("",)
  for wc in range(m_seq_l):
    enc_sam, dec_sam, dec_cam= generate_mask_values(en_sen, sp_sen)
    translations = neuralnet(en_sen,sp_sen,enc_sam.to(device),dec_sam.to(device),dec_cam.to(device),enc_st_tok=False,enc_e_tok=False,dec_st_tok=True,dec_e_tok=False)
    token_pd_values = translations[0][wc]
    token_id_values = torch.argmax(token_pd_values).item()
    token_new = in_to_sp[token_id_values]
    sp_sen = (sp_sen[0] + token_new, )
    if token_new == e_tok:
      break
  return sp_sen[0]

In [None]:
#registering a user defined function with spark functions
from pyspark.sql.functions import udf,lower
predict_udf = udf(predict)
#using the predict function to translate test dataset
test_df = test_df.withColumn("predicted_spanish", predict_udf(lower(test_df["english"])))

In [None]:
test_df.show(truncate=False)

+-------------------------------------------------------------------------+----------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|english                                                                  |spanish                                                                           |predicted_spanish                                                                                                                                                                                       |
+-------------------------------------------------------------------------+----------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------

In [None]:
#computing bleu score and wer values using nltk
import nltk
nltk.download('punkt')
from nltk.translate.bleu_score import sentence_bleu
from nltk.metrics import edit_distance
avg_bleu_score=0
avg_wer_value=0
def get_bleu_score(actual, predicted):
    actual_tokens = nltk.word_tokenize(actual.lower())
    predicted_tokens = nltk.word_tokenize(predicted.lower())
    return sentence_bleu(actual_tokens, predicted_tokens[:-1])
def get_wer_value(actual, predicted):
    actual_words = actual.split()
    predicted_words = predicted.split()
    wer_value = edit_distance(actual_words, predicted_words)
    wer_value /= len(actual_words)
    return wer_value


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [None]:
#generating defined metrics based on predictions made by model
bleu_score_gen_udf = udf(get_bleu_score)
wer_value_gen_udf = udf(get_wer_value)
test_df = test_df.withColumn("bleu_score", bleu_score_gen_udf(test_df['spanish'],test_df["predicted_spanish"]))
test_df = test_df.withColumn("wer_value", wer_value_gen_udf(test_df['spanish'],test_df["predicted_spanish"]))

In [None]:
test_df.show()

+--------------------+--------------------+--------------------+--------------------+------------------+
|             english|             spanish|   predicted_spanish|          bleu_score|         wer_value|
+--------------------+--------------------+--------------------+--------------------+------------------+
|a ball flew in th...|una pelota entro ...|tu estas en el es...|                   0|1.1428571428571428|
|a bear will not t...|un oso no tocara ...|tu eres en el est...|                   0|1.6666666666666667|
|a bee flew out of...|una abeja salio v...|tu eres estas en ...|                   0|               1.0|
|a big earthquake ...|hubo un gran terr...|eres estas estas ...|                   0|1.3333333333333333|
|a bird was flying...|un pajaro volaba ...|eres estas en el ...|                   0|1.4285714285714286|
|a blood vessel bu...|un vaso sanguineo...|eres estas esta e...|                   0|1.3333333333333333|
|a boy is taking h...|un chico emplea s...|tu estas est

In [None]:

nltk.download('wordnet')

from nltk.translate.meteor_score import meteor_score

[nltk_data] Downloading package wordnet to /root/nltk_data...


In [None]:
#computing meteor score using nltk
def get_meteor_score(actual, predicted):

  actual_tokens = nltk.word_tokenize(actual.lower())

  predicted_tokens = nltk.word_tokenize(predicted.lower())

  return meteor_score([actual_tokens], predicted_tokens[:-1])

In [None]:
#computing meteor score using model predictions

get_meteor_score_udf = udf(get_meteor_score)

test_df = test_df.withColumn("meteor_score", get_meteor_score_udf(test_df['spanish'],test_df['predicted_spanish']))

In [None]:
test_df.show(5)

+--------------------+--------------------+--------------------+----------+------------------+-------------------+
|             english|             spanish|   predicted_spanish|bleu_score|         wer_value|       meteor_score|
+--------------------+--------------------+--------------------+----------+------------------+-------------------+
|a ball flew in th...|una pelota entro ...|tu estas en el es...|         0|1.1428571428571428|                0.0|
|a bear will not t...|un oso no tocara ...|tu eres en el est...|         0|1.6666666666666667|                0.0|
|a bee flew out of...|una abeja salio v...|tu eres estas en ...|         0|               1.0|                0.0|
|a big earthquake ...|hubo un gran terr...|eres estas estas ...|         0|1.3333333333333333|0.07692307692307693|
|a bird was flying...|un pajaro volaba ...|eres estas en el ...|         0|1.4285714285714286|0.06666666666666667|
+--------------------+--------------------+--------------------+----------+-----