In [1]:
!rm -rf /content/spark-3.3.2-bin-hadoop2

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [3]:
!ls /content/sp*

/content/spark-3.1.1-bin-hadoop3.2.tgz

/content/spark-3.1.1-bin-hadoop3.2:
bin   data	jars	    LICENSE   NOTICE  R		 RELEASE  yarn
conf  examples	kubernetes  licenses  python  README.md  sbin


In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [5]:
!echo $SPARK_HOME

/content/spark-3.1.1-bin-hadoop3.2


In [6]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285388 sha256=bb56f8d6c8563cf12815f6610d71142da46181c16c09d667a1f784d9b4ced5a8
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [7]:
!python -V

Python 3.10.12


In [8]:
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .getOrCreate()

spark.conf.set("spark.sql.repl.eagerEval.enabled", True)  # Property used to format output tables better\

In [9]:
spark.version

'3.1.1'

In [10]:
pip install nltk



In [11]:
# Download the comtrans corpus as the machine translation dataset
import nltk
nltk.download('comtrans')
from nltk.corpus import comtrans

[nltk_data] Downloading package comtrans to /root/nltk_data...


In [12]:
# Returns a list of 33,334 aligned French sentences from the .txt file
als = comtrans.aligned_sents("alignment-en-fr.txt")

# Defines the maximum number of allowed English or French words in a sentence
VECTOR_LEN = 6
WORD_COUNT = 5

print(len(als))

33334


In [13]:
# For every sentence in als make an element in the 'dataset' list of the form (int sentence_num, list<str> english_words, list<str> french_words)
dataset = []
i = 0
for entry in als:
    dataset.append((i, entry.words, entry.mots))
    i = i + 1

#Convert dataset from a list to a RDD, then filter out any sentences with English or French words >= VECTOR_LEN.
dataset = spark.sparkContext.parallelize(dataset).filter(lambda x: len(x[1]) < VECTOR_LEN).filter(lambda x: len(x[2]) < VECTOR_LEN)
dataset.take(20)

[(0,
  ['Resumption', 'of', 'the', 'session'],
  ['Reprise', 'de', 'la', 'session']),
 (55, ['Agenda'], ['Ordre', 'des', 'travaux']),
 (82, ['Thank', 'you', 'very', 'much', '.'], ['Merci', '.']),
 (162,
  ['The', 'debate', 'is', 'closed', '.'],
  ['Le', 'débat', 'est', 'clos', '.']),
 (210,
  ['The', 'debate', 'is', 'closed', '.'],
  ['Le', 'débat', 'est', 'clos', '.']),
 (387,
  ['The', 'debate', 'is', 'closed', '.'],
  ['Le', 'débat', 'est', 'clos', '.']),
 (500,
  ['The', 'debate', 'is', 'closed', '.'],
  ['Le', 'débat', 'est', 'clos', '.']),
 (505,
  ['Are', 'there', 'any', 'comments', '?'],
  ['Y', 'a-t-il', 'des', 'observations', '?']),
 (529, ['Thank', 'you', 'very', 'much', '.'], ['Merci', 'beaucoup', '.']),
 (883, ['With', 'what', 'aim', '?'], ['Pour', 'quoi', 'faire', '?']),
 (991, ['Why', '?'], ['Pourquoi', '?']),
 (993, ['No', '.'], ['Non', '.']),
 (1220, ['VOTE'], ['VOTES']),
 (1241,
  ['EXPLANATIONS', 'OF', 'VOTE-', 'Own', 'resources'],
  ['Explications', 'de', 'vote-', '

In [14]:
#Check that all French/English sentences have the correct maximum number of words
X_length_max = dataset.map(lambda x: len(x[1])).reduce(lambda x,y: max(x,y))
Y_length_max = dataset.map(lambda x: len(x[2])).reduce(lambda x,y: max(x,y))

print(X_length_max)
print(Y_length_max)

5
5


In [15]:
# Convert dataset from a RDD to a DF
df = dataset.toDF(["index", "English", "French"])
display(df)

index,English,French
0,"[Resumption, of, ...","[Reprise, de, la,..."
55,[Agenda],"[Ordre, des, trav..."
82,"[Thank, you, very...","[Merci, .]"
162,"[The, debate, is,...","[Le, débat, est, ..."
210,"[The, debate, is,...","[Le, débat, est, ..."
387,"[The, debate, is,...","[Le, débat, est, ..."
500,"[The, debate, is,...","[Le, débat, est, ..."
505,"[Are, there, any,...","[Y, a-t-il, des, ..."
529,"[Thank, you, very...","[Merci, beaucoup, .]"
883,"[With, what, aim, ?]","[Pour, quoi, fair..."


In [16]:
# Variable that controls the number of dimensions of the word vector in Word2Vec
FEATURE_COUNT = 528

In [17]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Word2Vec
from pyspark.sql.functions import col, lit
from pyspark.ml.linalg import DenseVector
import numpy

# This function takes in a message and breaks it down into individual words
def deconstruct(x):
    retList = []
    for i in range(len(x[1])):
        retList.append((x[0], i, x[1][i].lower()))
    return retList

def removeIndices(x):
  retList = []
  for pair in x[1]:
    retList.append(pair[1])
  return (x[0], retList)

def fillEmpties(x):
  retList = x[1]
  if len(x[1]) < VECTOR_LEN - 1:
    num_append = VECTOR_LEN - len(x[1]) - 1
    for i in range(num_append):
      x[1].append(DenseVector(numpy.zeros(FEATURE_COUNT)))
  return (x[0], retList)

def convertToNumpy(x):
  li = []
  for vec in x[1]:
    temp = numpy.zeros((FEATURE_COUNT, 1))
    for i in range(FEATURE_COUNT):
      temp[i] = vec[i]
    li.append(temp)
  return (x[0], numpy.array(li))

# Form a df where each row contains a French word. Index is the sentence index within the corpus, wordIndex is word's index within the sentence, and French contains the French word.
french_vocabulary = df.select(['index', 'French']).rdd.flatMap(deconstruct).toDF(['index', 'wordIndex', 'French'])

indexer = StringIndexer(inputCol="French", outputCol="frenchIndexed")
encoder = OneHotEncoder(inputCol="frenchIndexed", outputCol="frenchVector")
french_vocabulary = indexer.fit(french_vocabulary).transform(french_vocabulary)
french_vocabulary = encoder.fit(french_vocabulary).transform(french_vocabulary)

# test with small iterations, different hyperparameters

# Convert the df into a rdd  where each element is a French sentence represented a key-value pair of the form (int sentence_index, list word_vectors)
french_vectors = french_vocabulary.select(['index', 'wordIndex', 'frenchVector'])\
  .rdd.map(lambda x: (x[0], [(x[1], x[2])]))\
  .reduceByKey(lambda x, y: x + y)\
  .map(lambda x: (x[0], sorted(list(x[1]), key=lambda x: x[0], reverse=False)))\
  .map(lambda x: (x[0], [(i[0], DenseVector(i[1])) for i in x[1]]))

print(french_vectors.take(1))

french_vectors = french_vectors.map(removeIndices).map(fillEmpties).map(convertToNumpy)

french_vocabulary_size = french_vocabulary.select('French').distinct().count()

print(french_vocabulary_size)
display(french_vocabulary)
print(french_vectors.take(1))
len(french_vectors.take(1)[0][1])
#french_vocabulary.take(1)[0][4]

[(0, [(0, DenseVector([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.

index,wordIndex,French,frenchIndexed,frenchVector
0,0,reprise,31.0,"(528,[31],[1.0])"
0,1,de,9.0,"(528,[9],[1.0])"
0,2,la,8.0,"(528,[8],[1.0])"
0,3,session,17.0,"(528,[17],[1.0])"
55,0,ordre,41.0,"(528,[41],[1.0])"
55,1,des,12.0,"(528,[12],[1.0])"
55,2,travaux,103.0,"(528,[103],[1.0])"
82,0,merci,50.0,"(528,[50],[1.0])"
82,1,.,0.0,"(528,[0],[1.0])"
162,0,le,2.0,"(528,[2],[1.0])"


[(0, array([[[0.],
        [0.],
        [0.],
        ...,
        [0.],
        [0.],
        [0.]],

       [[0.],
        [0.],
        [0.],
        ...,
        [0.],
        [0.],
        [0.]],

       [[0.],
        [0.],
        [0.],
        ...,
        [0.],
        [0.],
        [0.]],

       [[0.],
        [0.],
        [0.],
        ...,
        [0.],
        [0.],
        [0.]],

       [[0.],
        [0.],
        [0.],
        ...,
        [0.],
        [0.],
        [0.]]]))]


5

In [18]:
# Goes through the same steps and converts the english sentences into a paired rdd with elements of the form (int sentence_index, list word_vectors)

english_vocabulary = df.select(['index', 'English']).rdd.flatMap(deconstruct).toDF(['index', 'wordIndex', 'English'])

indexer = StringIndexer(inputCol="English", outputCol="englishIndex")
encoder = OneHotEncoder(inputCol="englishIndex", outputCol="englishVector")
english_vocabulary = indexer.fit(english_vocabulary).transform(english_vocabulary)
english_vocabulary = encoder.fit(english_vocabulary).transform(english_vocabulary)

english_vectors = english_vocabulary.select(['index', 'wordIndex', 'englishVector'])\
  .rdd\
  .map(lambda x: (x[0], [(x[1], x[2])]))\
  .reduceByKey(lambda x, y: x + y)\
  .map(lambda x: (x[0], sorted(list(x[1]), key=lambda x: x[0], reverse=False)))\
  .map(lambda x: (x[0], [(i[0], i[1].toArray()) for i in x[1]]))\
  .map(lambda x: (x[0], [(i[0], numpy.pad(i[1], (0, 60), 'constant', constant_values=(0))) for i in x[1]]))\
  .map(lambda x: (x[0], [(i[0], DenseVector(i[1])) for i in x[1]]))

english_vectors = english_vectors.map(removeIndices).map(fillEmpties).map(convertToNumpy)

english_vocabulary_size = english_vocabulary.select('English').distinct().count()

print(english_vocabulary_size)
display(english_vocabulary)
english_vectors.take(1)

469


index,wordIndex,English,englishIndex,englishVector
0,0,resumption,33.0,"(468,[33],[1.0])"
0,1,of,12.0,"(468,[12],[1.0])"
0,2,the,1.0,"(468,[1],[1.0])"
0,3,session,20.0,"(468,[20],[1.0])"
55,0,agenda,38.0,"(468,[38],[1.0])"
82,0,thank,46.0,"(468,[46],[1.0])"
82,1,you,30.0,"(468,[30],[1.0])"
82,2,very,36.0,"(468,[36],[1.0])"
82,3,much,82.0,"(468,[82],[1.0])"
82,4,.,0.0,"(468,[0],[1.0])"


[(0,
  array([[[0.],
          [0.],
          [0.],
          ...,
          [0.],
          [0.],
          [0.]],
  
         [[0.],
          [0.],
          [0.],
          ...,
          [0.],
          [0.],
          [0.]],
  
         [[0.],
          [1.],
          [0.],
          ...,
          [0.],
          [0.],
          [0.]],
  
         [[0.],
          [0.],
          [0.],
          ...,
          [0.],
          [0.],
          [0.]],
  
         [[0.],
          [0.],
          [0.],
          ...,
          [0.],
          [0.],
          [0.]]]))]

In [19]:
HIDDEN_NODES = 100

In [20]:
U = numpy.random.uniform(-numpy.sqrt(1./FEATURE_COUNT), numpy.sqrt(1./FEATURE_COUNT), (HIDDEN_NODES, FEATURE_COUNT))
V = numpy.random.uniform(-numpy.sqrt(1./HIDDEN_NODES), numpy.sqrt(1./HIDDEN_NODES), (FEATURE_COUNT, HIDDEN_NODES))
W = numpy.random.uniform(-numpy.sqrt(1./HIDDEN_NODES), numpy.sqrt(1./HIDDEN_NODES), (HIDDEN_NODES, HIDDEN_NODES))
b = numpy.zeros((HIDDEN_NODES, 1)) # bias for hidden layer
c = numpy.zeros((FEATURE_COUNT, 1)) # bias for output

In [21]:
# Functions in this cell are based on the work of Javaid Nabi found here:
# https://github.com/javaidnabi31/RNN-from-scratch/blob/master/RNN_char_text%20generator.ipynb

def softmax(x):
  e_x = numpy.exp(x - numpy.max(x))
  return e_x / numpy.sum(e_x)

def forward(x):
  #ret_array = []
  hs, os, ycap = {}, {}, {}
  hs[-1] = numpy.zeros((HIDDEN_NODES,1))
  for t in range(WORD_COUNT):
    hs[t] = numpy.tanh(numpy.dot(U,x[0][t]) + numpy.dot(W,hs[t-1]) + b) # hidden state
    os[t] = numpy.dot(V,hs[t]) + c # unnormalised log probs
    ycap[t] = softmax(os[t])
  return (x[0], x[1], hs, ycap)

def backward(x): # french sentence, english sentence, hs, ycap
  # backward pass: compute gradients going backwards
  ret_array = []
  dU, dW, dV = numpy.zeros_like(U), numpy.zeros_like(W), numpy.zeros_like(V)
  db, dc = numpy.zeros_like(b), numpy.zeros_like(c)
  dhnext = numpy.zeros_like(x[2][0])
  for t in reversed(range(WORD_COUNT)):
    dy = numpy.copy(x[3][t])
    #through softmax
    dy = x[1][t] - dy # backprop into y
    ##calculate dV, dc
    dV += numpy.dot(dy, x[2][t].T)
    dc += dc
    #dh includes gradient from two sides, next cell and current output
    dh = numpy.dot(V.T, dy) + dhnext # backprop into h
    # backprop through tanh non-linearity
    dhrec = (1 - x[2][t] * x[2][t]) * dh  #dhrec is the term used in many equations
    db += dhrec
    #calculate dU and dW
    dU += numpy.dot(dhrec, x[0][t].T)
    dW += numpy.dot(dhrec, x[2][t-1].T)
    #pass the gradient from next cell to the next iteration.
    dhnext = numpy.dot(W.T, dhrec)
  # clip to mitigate exploding gradients
  for dparam in [dU, dW, dV, db, dc]:
    numpy.clip(dparam, -5, 5, out=dparam)
  return (x[0], x[1], x[2], x[3], dU, dV, dW, db, dc)

def loss(x):
  """loss for a sequence"""
  ret_sum = 0.0
  MSE = 0.0
  for t in range(WORD_COUNT):
    for i in range(FEATURE_COUNT):
      MSE += math.pow(x[0][t][i] - x[1][t][i], 2)
    ret_sum += (MSE / FEATURE_COUNT)
    MSE = 0.0
  return ret_sum / WORD_COUNT

In [22]:
def cosine_sim(vector1, vector2):
  return numpy.dot(vector1, vector2)/ (numpy.linalg.norm(vector1)*numpy.linalg.norm(vector2))

def french_sim(word_vector):
  index = numpy.argmax(word_vector)
  temp = "frenchIndexed == " + str(index)
  result = french_vocabulary.select(['French']).where(temp).collect()
  return result[0]

def english_sim(word_vector):
  index = numpy.argmax(word_vector)
  temp = "englishIndex == " + str(index)
  result = english_vocabulary.select(['English']).where(temp).collect()
  return result[0]

In [23]:
import math

# Function that adds every matrix in two tuples together
def add_tuple_gradients(gradient_tuple1, gradient_tuple2) :
  return (gradient_tuple1[0] + gradient_tuple2[0], gradient_tuple1[1] + gradient_tuple2[1], gradient_tuple1[2] + gradient_tuple2[2], gradient_tuple1[3] + gradient_tuple2[3], gradient_tuple1[4] + gradient_tuple2[4])

def Gradient_Descent_Training(iterations, learning_rate=0.001, batch_size=16):
  iteration_losses = []

  smooth_loss = -numpy.log(1.0/FEATURE_COUNT)*FEATURE_COUNT

  sample_rdd = french_vectors.keys().takeSample(False, 2)
  english_sample = english_vectors.filter(lambda x: x[0] in sample_rdd)
  french_sample = french_vectors.filter(lambda x: x[0] in sample_rdd)
  sample_rdd = french_sample.join(english_sample)

  # Run gradient descent for the given number of iterations
  for i in range(iterations) :
    # Choose a random batch of English/French sentences from the dataset
    selected_sentences = french_vectors.keys().takeSample(False, batch_size)
    english_batch_rdd = english_vectors.filter(lambda x: x[0] in selected_sentences)
    french_batch_rdd = french_vectors.filter(lambda x: x[0] in selected_sentences)

    # Join the english and french rdds so they provide training input in the form of (french_sentence, english sentence)
    input_rdd = french_batch_rdd.join(english_batch_rdd)

    # Run the forward and backward pass of backpropagation using MapReduce to calculate the gradients
    french_gradients_rdd = input_rdd.mapValues(forward).mapValues(backward)

    #print(french_gradients_rdd.takeSample(False, 1))

    # Average loss
    loss_val = french_gradients_rdd.map(lambda x: (x[1][1], x[1][3])).map(loss).reduce(lambda x,y: x + y) / batch_size

    global U
    global V
    global W
    global b
    global c

    U += french_gradients_rdd.map(lambda x: x[1][4]*learning_rate).reduce(lambda x,y: x + y)
    V += french_gradients_rdd.map(lambda x: x[1][5]*learning_rate).reduce(lambda x,y: x + y)
    W += french_gradients_rdd.map(lambda x: x[1][6]*learning_rate).reduce(lambda x,y: x + y)
    b += french_gradients_rdd.map(lambda x: x[1][7]*learning_rate).reduce(lambda x,y: x + y)
    c += french_gradients_rdd.map(lambda x: x[1][8]*learning_rate).reduce(lambda x,y: x + y)

    smooth_loss = smooth_loss*0.999 + loss_val*0.001

    print("Iteration " + str(i))
    print("Average Smooth Loss " + str(smooth_loss))
    iteration_losses.append(smooth_loss)

    if not (i)%10:

      processed_sample = sample_rdd.mapValues(forward).map(lambda x: (x[1][0], x[1][1], x[1][3])).collect()

      #print(processed_sample[0][2])

      for sample in processed_sample:
        temp = ''
        for word in sample[0]:
          #print(french_sim(word))
          temp = temp + str(french_sim(word)[0]) + " "
        print(temp)
        temp = ''

        for word in sample[1]:
          temp = temp + english_sim(word)[0] + " "
        print(temp)
        temp = ''

        for t in range(WORD_COUNT):
          temp = temp + english_sim(sample[2][t])[0] + " "
        print(temp)
        temp = ''

      print()

iteration_losses = Gradient_Descent_Training(iterations=100, batch_size=math.floor(english_vectors.count() * 0.95))

Iteration 0
Average Smooth Loss 3306.772756369247
merci beaucoup pour cela . 
thank you for that . 
. . . . . 
le débat est clos . 
the debate is closed . 
. . is . . 

Iteration 1
Average Smooth Loss 3303.465985015973
Iteration 2
Average Smooth Loss 3300.1625204342395
Iteration 3
Average Smooth Loss 3296.862359288255
Iteration 4
Average Smooth Loss 3293.565498296989
Iteration 5
Average Smooth Loss 3290.2719341883007
Iteration 6
Average Smooth Loss 3286.9816641303414
Iteration 7
Average Smooth Loss 3283.694684382083
Iteration 8
Average Smooth Loss 3280.4109912830104
Iteration 9
Average Smooth Loss 3277.130582405802
Iteration 10
Average Smooth Loss 3273.853454059301
merci beaucoup pour cela . 
thank you for that . 
the the is debate is 
le débat est clos . 
the debate is closed . 
the the is debate is 

Iteration 11
Average Smooth Loss 3270.5796029602425
Iteration 12
Average Smooth Loss 3267.309025776384
Iteration 13
Average Smooth Loss 3264.0417182394904
Iteration 14
Average Smooth Los

KeyboardInterrupt: ignored