In [77]:
from pyspark.sql import SparkSession

#Starting the sparks session

spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.87:7077") \
        .appName("Jad_Ali_Daoud_A3A")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()
spark_context = spark_session.sparkContext

In [78]:
#Get the number of lines in the english transcript
linesEn = spark_context.textFile("hdfs://192.168.2.87:9000/europarl/europarl-v7.ro-en.en")
print("Lines in english transcript : {}".format(linesEn.count()))

#Get the number of lines in the romanian transcript
linesRo = spark_context.textFile("hdfs://192.168.2.87:9000/europarl/europarl-v7.ro-en.ro")
print("Lines in romanian transcript : {}".format(linesRo.count()))

Lines in english transcript : 399375
Lines in romanian transcript : 399375


In [79]:
#Get the number of partitions
print("Number of partitions in english transcript : {}".format(linesEn.getNumPartitions()))
print("Number of partitions in romanian transcript : {}".format(linesRo.getNumPartitions()))
print("Total number of partitions of both transcript : {}".format(linesEn.getNumPartitions()+linesRo.getNumPartitions()))


Number of partitions in english transcript : 2
Number of partitions in romanian transcript : 2
Total number of partitions of both transcript : 4


In [80]:
#Function that takes an RDD and returns the lower case words of each line
def splitLower(rdd):
    
    return rdd.map(lambda word : word.lower())\
              .map(lambda line: line.split(' '))
              
              


In [81]:
#Few samples of Lowercase english transcript words
print(splitLower(linesEn).take(10))
#Few samples of Lowercase romanian transcript words
print(splitLower(linesRo).take(10))


[['membership', 'of', 'parliament:', 'see', 'minutes'], ['approval', 'of', 'minutes', 'of', 'previous', 'sitting:', 'see', 'minutes'], ['membership', 'of', 'parliament:', 'see', 'minutes'], ['verification', 'of', 'credentials:', 'see', 'minutes'], ['documents', 'received:', 'see', 'minutes'], ['written', 'statements', 'and', 'oral', 'questions', '(tabling):', 'see', 'minutes'], ['petitions:', 'see', 'minutes'], ['texts', 'of', 'agreements', 'forwarded', 'by', 'the', 'council:', 'see', 'minutes'], ['action', 'taken', 'on', "parliament's", 'resolutions:', 'see', 'minutes'], ['agenda', 'for', 'next', 'sitting:', 'see', 'minutes']]
[['componenţa', 'parlamentului:', 'a', 'se', 'vedea', 'procesul-verbal'], ['aprobarea', 'procesului-verbal', 'al', 'şedinţei', 'precedente:', 'a', 'se', 'vedea', 'procesul-verbal'], ['componenţa', 'parlamentului:', 'a', 'se', 'vedea', 'procesul-verbal'], ['verificarea', 'prerogativelor:', 'a', 'se', 'vedea', 'procesul-verbal'], ['depunere', 'de', 'documente:', '

In [82]:
#Get the number of lines in the english transcript after using splitLower()
print("Lines in english transcript after using splitLower() : {}".format(splitLower(linesEn).count()))
#Get the number of lines in the romanian transcript after using splitLower()
print("Lines in romanian transcript after using splitLower() : {}".format(splitLower(linesRo).count()))

Lines in english transcript after using splitLower() : 399375
Lines in romanian transcript after using splitLower() : 399375


In [83]:
from operator import add

#Get the first ten most occuring words in the English transcript
print(splitLower(linesEn).flatMap(lambda w: w)\
                   .map(lambda w: (w,1))\
                   .reduceByKey(add)\
                   .sortBy(lambda x: -x[1])\
                   .take(10))

#Get the first ten most occuring words in the Romanian transcript
print(splitLower(linesRo).flatMap(lambda w: w)\
                   .map(lambda w: (w,1))\
                   .reduceByKey(add)\
                   .sortBy(lambda x: -x[1])\
                   .take(10))
                    

[('the', 740292), ('of', 357378), ('to', 322434), ('and', 288989), ('in', 235625), ('a', 162975), ('that', 159878), ('is', 155300), ('for', 121107), ('we', 108204)]
[('de', 454842), ('în', 342724), ('a', 221238), ('să', 205834), ('şi', 184005), ('pentru', 135231), ('la', 131109), ('care', 129987), ('și', 118273), ('că', 105950)]


In [84]:

#1.Key the lines by their line number (hint: ZipWithIndex()). 
#2. Swap the key and value - so that the line number is the key. 
Index_En=splitLower(linesEn).zipWithIndex()
Index_En=Index_En.map(lambda w: (w[1],w[0]))

Index_Ro=splitLower(linesRo).zipWithIndex()
Index_Ro=Index_Ro.map(lambda w: (w[1],w[0]))

print(Index_Ro.take(5))
print(Index_En.take(5))

    


[(0, ['componenţa', 'parlamentului:', 'a', 'se', 'vedea', 'procesul-verbal']), (1, ['aprobarea', 'procesului-verbal', 'al', 'şedinţei', 'precedente:', 'a', 'se', 'vedea', 'procesul-verbal']), (2, ['componenţa', 'parlamentului:', 'a', 'se', 'vedea', 'procesul-verbal']), (3, ['verificarea', 'prerogativelor:', 'a', 'se', 'vedea', 'procesul-verbal']), (4, ['depunere', 'de', 'documente:', 'a', 'se', 'vedea', 'procesul-verbal'])]
[(0, ['membership', 'of', 'parliament:', 'see', 'minutes']), (1, ['approval', 'of', 'minutes', 'of', 'previous', 'sitting:', 'see', 'minutes']), (2, ['membership', 'of', 'parliament:', 'see', 'minutes']), (3, ['verification', 'of', 'credentials:', 'see', 'minutes']), (4, ['documents', 'received:', 'see', 'minutes'])]


In [86]:
#A3. Join the two RDDs together according to the line number key, so you have pairs of lines with the same line number. 
Index_Comb=Index_Ro.join(Index_En).reduceByKey(lambda x,y:x+y)                                  
print(Index_Comb.takeOrdered(1))

[(0, (['componenţa', 'parlamentului:', 'a', 'se', 'vedea', 'procesul-verbal'], ['membership', 'of', 'parliament:', 'see', 'minutes']))]


In [97]:
#4. Filter to exclude line pairs that have an empty/missing “corresponding” sentence. 
#5. Filter to leave only pairs of sentences with a small number of words per sentence, this should give a more reliable translation (you can experiment). 
#6. Filter to leave only pairs of sentences with the same number of words in each sentence.          
Index_Comb=Index_Ro.join(Index_En).reduceByKey(lambda x,y:x+y)\
                                  .filter(lambda x: len(x[1][0]) == len(x[1][1]))\
                                  .filter(lambda x: len(x[1][0])<7)\
                                  .filter(lambda x: '[]' not in x)
    
print(Index_Comb.take(10))


[(72, (['(the', 'sitting', 'was', 'opened', 'at', '09.05)'], ['(the', 'sitting', 'was', 'opened', 'at', '09.05)'])), (84, (['proiect', 'de', 'buget', 'rectificativ', 'nr.1/2007', '(vot)'], ['draft', 'amending', 'budget', 'no', '1/2007', '(vote)'])), (140, (['3.'], ['3.'])), (144, (['6.'], ['6.'])), (156, (['(the', 'sitting', 'closed', 'at', '23.15)'], ['(the', 'sitting', 'closed', 'at', '23.15)'])), (160, (['1.'], ['1.'])), (164, (['3.'], ['3.'])), (168, (['5.'], ['5.'])), (172, (['8.'], ['8.'])), (176, (['12.'], ['12.']))]


In [98]:
#7. For each sentence pair, map to give a list of word pairs (in order) from the two sentences. We no longer need the line numbers. (hint: use python’s built in zip() function
index_comb_final=Index_Comb.map(lambda x: list(zip(x[1][0],x[1][1])))
index_comb_final.take(1)

[[('(the', '(the'),
  ('sitting', 'sitting'),
  ('was', 'was'),
  ('opened', 'opened'),
  ('at', 'at'),
  ('09.05)', '09.05)')]]

In [99]:
#8. Use reduce to count the number of occurrences of the word-translation-pairs.
index6=index_comb_final.map(lambda w: (w[0],1))\
                        .filter(lambda x: x[0][0])\
                       .reduceByKey(add)
                        

In [100]:
#9. Print some of the most frequently occurring pairs of words. 
print(index6.sortBy(lambda x: -x[1])\
                   .take(20))

[(('(aplauze)', '(applause)'), 892), (('vă', 'thank'), 549), (('dezbaterea', 'the'), 508), (('-', '-'), 345), (('2.', '2.'), 293), (('1.', '1.'), 290), (('3.', '3.'), 269), (('4.', '4.'), 182), (('5.', '5.'), 159), (('6.', '6.'), 143), (('7.', '7.'), 131), (('8.', '8.'), 118), (('9.', '9.'), 101), (('10.', '10.'), 97), (('aceasta', 'this'), 96), (('ce', 'what'), 92), (('11.', '11.'), 86), (('am', 'i'), 83), (('trebuie', 'we'), 83), (('12.', '12.'), 82)]


In [101]:
# release the cores for another application!
spark_context.stop()