In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from operator import add


# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.113:7077") \
        .appName("mitra-parta")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .config("spark.cores.max",4)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()
# Old API (RDD)
spark_context = spark_session.sparkContext
spark_context.setLogLevel("INFO")

In [2]:
# A 
#     : Read English transcripts , count the numbers of lines 

lines1= spark_context.textFile("hdfs://192.168.2.113:9000/europarl/europarl-v7.et-en.en")#  engligh
print(lines1.take(5))

lines1.count()

["Action taken on Parliament's resolutions: see Minutes", 'Documents received: see Minutes', 'Written statements (Rule 116): see Minutes', 'Texts of agreements forwarded by the Council: see Minutes', 'Membership of Parliament: see Minutes']


651746

In [3]:
# A1 
#     : Read the Estonian transcripts , count teh numbers of lines 

lines2 = spark_context.textFile("hdfs://192.168.2.113:9000/europarl/europarl-v7.et-en.et")#  Estonian
print(lines2.take(5))


lines2.count()

['Parlamendi resolutsioonide vastuvõtmisele järgnev tegevus (vt protokoll)', 'Esitatud dokumendid (vaata protokolli)', 'Kirjalikud deklaratsioonid (kodukorra artikkel 116) (vaata protokolli)', 'Nõukogu saadetud kokkulepete tekstid (vaata protokolli)', 'Parlamendi koosseis (vaata protokolli)']


651746

In [4]:
# Verifythatthelinecountsarethesameforthetwolanguages.
#  ???????????????????????????????????????????????????????

In [5]:


words = lines1.map(lambda line: line.split(' '))
word_counts = words.map(lambda w: len(w))
total_words = word_counts.reduce(add)
print(f'total words in English= {total_words}') 

words = lines2.map(lambda line: line.split(' '))
word_counts = words.map(lambda w: len(w))
total_words = word_counts.reduce(add)
print(f'total words in Estonian= {total_words}') 

total words in English= 15687988
total words in Estonian= 11214592


In [6]:
# count the number of partitions 
print (lines1.getNumPartitions())# number of partition for english
print (lines2.getNumPartitions())# number of partition for Estonian


2
2


In [7]:
# A2  Pre-process
def func_convert(lines):
    lines=lines.flatMap(lambda line: line.lower().split(' '))
    return lines

lines1_1=func_convert(lines1)
print(lines1_1.take(50))
print ('********************************************************************************************************************************************************************')
lines2_1=func_convert(lines2)
print(lines2_1.take(20))

['action', 'taken', 'on', "parliament's", 'resolutions:', 'see', 'minutes', 'documents', 'received:', 'see', 'minutes', 'written', 'statements', '(rule', '116):', 'see', 'minutes', 'texts', 'of', 'agreements', 'forwarded', 'by', 'the', 'council:', 'see', 'minutes', 'membership', 'of', 'parliament:', 'see', 'minutes', 'membership', 'of', 'committees', 'and', 'delegations:', 'see', 'minutes', 'future', 'action', 'in', 'the', 'field', 'of', 'patents', '(motions', 'for', 'resolutions', 'tabled):', 'see']
********************************************************************************************************************************************************************
['parlamendi', 'resolutsioonide', 'vastuvõtmisele', 'järgnev', 'tegevus', '(vt', 'protokoll)', 'esitatud', 'dokumendid', '(vaata', 'protokolli)', 'kirjalikud', 'deklaratsioonid', '(kodukorra', 'artikkel', '116)', '(vaata', 'protokolli)', 'nõukogu', 'saadetud']


In [8]:
# A2  Pre-process
def func_convert_listtolist(lines):
    lines=lines.map(lambda line: line.lower().split(' '))
    return lines
lines1_1_L=func_convert_listtolist(lines1)
print(lines1_1.take(50))
print ('********************************************************************************************************************************************************************')
lines2_1_L=func_convert_listtolist(lines2)
print(lines2_1.take(20))

['action', 'taken', 'on', "parliament's", 'resolutions:', 'see', 'minutes', 'documents', 'received:', 'see', 'minutes', 'written', 'statements', '(rule', '116):', 'see', 'minutes', 'texts', 'of', 'agreements', 'forwarded', 'by', 'the', 'council:', 'see', 'minutes', 'membership', 'of', 'parliament:', 'see', 'minutes', 'membership', 'of', 'committees', 'and', 'delegations:', 'see', 'minutes', 'future', 'action', 'in', 'the', 'field', 'of', 'patents', '(motions', 'for', 'resolutions', 'tabled):', 'see']
********************************************************************************************************************************************************************
['parlamendi', 'resolutsioonide', 'vastuvõtmisele', 'järgnev', 'tegevus', '(vt', 'protokoll)', 'esitatud', 'dokumendid', '(vaata', 'protokolli)', 'kirjalikud', 'deklaratsioonid', '(kodukorra', 'artikkel', '116)', '(vaata', 'protokolli)', 'nõukogu', 'saadetud']


In [9]:
print(lines1_1.count())
print(lines2_1.count())

15687988
11214592


In [10]:

mapper = lines1_1.map(lambda w: w.strip())\
                .map(lambda w: (w,1))


top_ten = mapper.reduceByKey(add)

print(top_ten.takeOrdered(10, key=lambda x: -x[1]))

[('the', 1188331), ('of', 573311), ('to', 519165), ('and', 462953), ('in', 379602), ('a', 262705), ('that', 260069), ('is', 254183), ('for', 191702), ('we', 177479)]


In [13]:
#  A3 top common words in Estonian
mapper = lines2_1.map(lambda w: w.strip())\
                .map(lambda w: (w,1))


top_ten = mapper.reduceByKey(add)

print(top_ten.takeOrdered(10, key=lambda x: -x[1]))

[('ja', 367414), ('on', 334744), ('et', 236033), ('euroopa', 140456), ('ei', 116118), ('ning', 100243), ('see', 88746), ('mis', 87777), ('kui', 79788), ('-', 69239)]


#  A4
lines1_zip=lines1_1.zipWithIndex()
lines1_zip.take(50)

In [15]:
# step 1  for english 
lines1_zip=lines1_1_L.zipWithIndex() 
print(lines1_zip.take(10))

print ('********************************************************************************************************************************************************************')
# step 1 for Estonian 
lines2_zip=lines2_1_L.zipWithIndex() # 
print (lines2_zip.take(10))

[(['action', 'taken', 'on', "parliament's", 'resolutions:', 'see', 'minutes'], 0), (['documents', 'received:', 'see', 'minutes'], 1), (['written', 'statements', '(rule', '116):', 'see', 'minutes'], 2), (['texts', 'of', 'agreements', 'forwarded', 'by', 'the', 'council:', 'see', 'minutes'], 3), (['membership', 'of', 'parliament:', 'see', 'minutes'], 4), (['membership', 'of', 'committees', 'and', 'delegations:', 'see', 'minutes'], 5), (['future', 'action', 'in', 'the', 'field', 'of', 'patents', '(motions', 'for', 'resolutions', 'tabled):', 'see', 'minutes'], 6), (['agenda', 'for', 'next', 'sitting:', 'see', 'minutes'], 7), (['closure', 'of', 'sitting'], 8), (['(the', 'sitting', 'was', 'closed', 'at', '11.55', 'p.m.)'], 9)]
********************************************************************************************************************************************************************
[(['parlamendi', 'resolutsioonide', 'vastuvõtmisele', 'järgnev', 'tegevus', '(vt', 'protokoll)'], 0), (['

In [16]:
# step 2 
def swap (tuple):
    line, index= tuple 
    return (index, line)

# for english 
lines1_zip_swap= lines1_zip.map(swap)
print (lines1_zip_swap.take(20))

print ('********************************************************************************************************************************************************************')

# for Estonian 
lines2_zip_swap= lines2_zip.map(swap)
print (lines2_zip_swap.take(20))


[(0, ['action', 'taken', 'on', "parliament's", 'resolutions:', 'see', 'minutes']), (1, ['documents', 'received:', 'see', 'minutes']), (2, ['written', 'statements', '(rule', '116):', 'see', 'minutes']), (3, ['texts', 'of', 'agreements', 'forwarded', 'by', 'the', 'council:', 'see', 'minutes']), (4, ['membership', 'of', 'parliament:', 'see', 'minutes']), (5, ['membership', 'of', 'committees', 'and', 'delegations:', 'see', 'minutes']), (6, ['future', 'action', 'in', 'the', 'field', 'of', 'patents', '(motions', 'for', 'resolutions', 'tabled):', 'see', 'minutes']), (7, ['agenda', 'for', 'next', 'sitting:', 'see', 'minutes']), (8, ['closure', 'of', 'sitting']), (9, ['(the', 'sitting', 'was', 'closed', 'at', '11.55', 'p.m.)']), (10, ['opening', 'of', 'the', 'sitting']), (11, ['(the', 'sitting', 'was', 'opened', 'at', '9', 'a.m.)']), (12, ['documents', 'received:', 'see', 'minutes']), (13, ['approval', 'of', 'minutes', 'of', 'previous', 'sitting:', 'see', 'minutes']), (14, ['membership', 'of', 

In [17]:
# step 2 second formula for english 
lines1_zip_swap_2= lines1_zip.map(lambda x: (x[1],x[0]))
print (lines1_zip_swap_2.take(5))

print('********************************************************************************************************************************************************************')
# step 2 second formula for Estonian 
lines2_zip_swap_2 = lines2_zip.map(lambda x: (x[1],x[0]))
print (lines2_zip_swap_2.take(5))

[(0, ['action', 'taken', 'on', "parliament's", 'resolutions:', 'see', 'minutes']), (1, ['documents', 'received:', 'see', 'minutes']), (2, ['written', 'statements', '(rule', '116):', 'see', 'minutes']), (3, ['texts', 'of', 'agreements', 'forwarded', 'by', 'the', 'council:', 'see', 'minutes']), (4, ['membership', 'of', 'parliament:', 'see', 'minutes'])]
********************************************************************************************************************************************************************
[(0, ['parlamendi', 'resolutsioonide', 'vastuvõtmisele', 'järgnev', 'tegevus', '(vt', 'protokoll)']), (1, ['esitatud', 'dokumendid', '(vaata', 'protokolli)']), (2, ['kirjalikud', 'deklaratsioonid', '(kodukorra', 'artikkel', '116)', '(vaata', 'protokolli)']), (3, ['nõukogu', 'saadetud', 'kokkulepete', 'tekstid', '(vaata', 'protokolli)']), (4, ['parlamendi', 'koosseis', '(vaata', 'protokolli)'])]


In [18]:
# step 3 
pairs_matching_lines= lines1_zip_swap.join(lines2_zip_swap)
print (pairs_matching_lines.take(2))
print ('**************')
print (pairs_matching_lines.count())

[(1588, (['membership', 'of', 'committees', 'and', 'delegations:', 'see', 'minutes'], ['parlamendi', 'komisjonide', 'ja', 'delegatsioonide', 'koosseis', '(vt', 'protokolli)'])), (3340, (['on', 'behalf', 'of', 'the', 'uen', 'group.', '-', 'mr', 'president,', 'the', 'first', '10', 'years', 'of', 'the', 'internal', 'market', 'added', 'eur', '877', 'billion', 'to', 'the', 'eu', 'economy', 'and', 'it', 'helped', 'to', 'create', 'over', '2.5', 'million', 'jobs,', 'so', 'i', 'do', 'not', 'think', 'anybody', 'could', 'argue', 'against', 'the', 'idea', 'that', 'the', 'internal', 'market', 'has', 'definitely', 'worked.'], ['fraktsiooni', 'uen', 'nimel.', '-', 'hr', 'juhataja,', 'esimese', '10', 'aastaga', 'lisas', 'ühtne', 'turg', 'eli', 'majandusele', '877', 'miljardit', 'eurot', 'ja', 'aitas', 'luua', 'üle', '2,5', 'miljoni', 'uue', 'töökoha,', 'seega', 'ei', 'saa', 'minu', 'arvates', 'mitte', 'keegi', 'vaielda', 'selle', 'vastu,', 'et', 'ühtne', 'turg', 'toimib.']))]
**************
651746


In [19]:
# step 4 
missing_sentences= pairs_matching_lines.filter(lambda x: (x[1][0][0] != '' and x[1][1][0]!= ''))
print (missing_sentences.count())
print (missing_sentences.take(5))

649594
[(6424, (['they', 'do', 'not', 'substantially', 'improve', 'the', 'report.'], ['need', 'ei', 'muuda', 'raportit', 'oluliselt', 'paremaks.'])), (16736, (['and,', 'commissioner,', 'you', 'probably', 'did', 'not', 'have', 'the', 'opportunity', 'to', 'land', 'and', 'stay', 'in', 'kuwait', 'in', '1991', 'when', '300', 'oilfields', 'were', 'burning.'], ['ja,', 'volinik,', 'teil', 'ilmselt', 'ei', 'olnud', 'võimalust', 'maanduda', 'ja', 'peatuda', '1991.', 'aastal', 'kuveidis,', 'kui', 'põles', '300', 'naftavälja.'])), (31840, (['in', 'order', 'to', 'safeguard', 'energy', 'supplies', 'in', 'future', 'too,', 'the', 'eu', 'member', 'states', 'have', 'relied', 'on', 'more', 'intensive', 'cooperation', 'in', 'this', 'sector', 'since', 'the', '1990s.'], ['selleks,', 'et', 'kindlustada', 'energiaga', 'varustatus', 'ka', 'tulevikus,', 'on', 'eli', 'liikmesriigid', 'alates', '1990ndatest', 'tuginenud', 'selles', 'sektoris', 'intensiivsemale', 'koostööle.'])), (40988, (['i', 'am', 'pleased', 't

In [24]:
# step 5 
sentences_with_small_words= missing_sentences.filter(lambda x: (len(x[1][0]) < 10))
print(sentences_with_small_words.count())
print (sentences_with_small_words.take(10))

79173
[(1588, (['membership', 'of', 'committees', 'and', 'delegations:', 'see', 'minutes'], ['parlamendi', 'komisjonide', 'ja', 'delegatsioonide', 'koosseis', '(vt', 'protokolli)'])), (3964, (['-', 'joint', 'motion', 'for', 'a', 'resolution:', 'natural', 'disasters'], ['-', 'resolutsiooni', 'ühine', 'ettepanek:', 'loodusõnnetused'])), (23744, (['nevertheless,', 'we', 'feel', 'that', 'this', 'does', 'not', 'prevent', 'cooperation.'], ['sellegipoolest', 'me', 'tunneme,', 'et', 'see', 'ei', 'välista', 'koostööd.'])), (39384, (['now', 'it', 'is', 'about', 'the', 'practical', 'work', 'for', 'tomorrow.'], ['järgnema', 'peab', 'praktiline', 'tegevus.'])), (41284, (['next', 'time', 'this', 'figure', 'will', 'probably', 'fall', 'below', '40%.'], ['järgmisel', 'korral', 'langeb', 'see', 'arv', 'tõenäoliselt', 'alla', '40%.'])), (46076, (['written', 'statements', '(rule', '142)'], ['kirjalikud', 'avaldused', '(kodukorra', 'artikkel', '142)'])), (54488, (['we', 'should', 'support', 'quality', 'and

In [25]:
# step 6 pairsofsentenceswiththesamenumberofwordsineachsentence.

sentences_with_equal_num_of_words= sentences_with_small_words.filter(lambda x: ( len(x[1][0]) == len(x[1][1])))
print(sentences_with_equal_num_of_words.count())
print (sentences_with_equal_num_of_words.take(10))

20050
[(14140, (['-', 'report:', 'mayer'], ['-', 'raport:', 'mayer'])), (45036, (['indeed!'], ['tõepoolest!'])), (123500, (['these', 'are', 'just', 'two', 'examples.'], ['need', 'on', 'vaid', 'kaks', 'näidet.'])), (162824, (['(uproar)'], ['(sumin)'])), (200772, (['we', 'clearly', 'need', 'to', 'do', 'more.'], ['me', 'peame', 'ilmselt', 'veel', 'midagi', 'tegema.'])), (239540, (['what', 'was', 'achieved', 'is', 'important.'], ['see,', 'mis', 'saavutati,', 'on', 'oluline.'])), (241440, (['(applause)'], ['(aplaus)'])), (244868, (['(applause)'], ['(aplaus)'])), (280748, (['are', 'there', 'any', 'comments?'], ['kas', 'kellelgi', 'on', 'kommentaare?'])), (315088, (['i', 'share', 'the', 'view', 'of', 'commissioner', 'barroso.'], ['olen', 'siinkohal', 'josé', 'manuel', 'barrosoga', 'ühel', 'arvamusel.']))]


In [30]:
# step 7  For each sentence pair,map so that you pair each(inorder)word in the two sentences.We no longer need the line numbers.(hint:usepython’sbuiltinzip()function)

pair_words =sentences_with_equal_num_of_words.map(lambda x: (x[1][0],x[1][1]))
#print (pair_words.take(10))

pair_words_in_two = pair_words.flatMap(lambda x: list(zip(x[0],x[1])))
print (pair_words_in_two.take(10))


[('such', 'sellised'), ('divergences', 'lahknevused'), ('are', 'ei'), ('no', 'ole'), ('longer', 'samuti'), ('acceptable', 'enam'), ('either.', 'vastuvõetavad.'), ('we', 'me'), ('cannot', 'ei'), ('go', 'saa,')]


In [31]:
# step 8   Use reduce to count the number of occurrences of the word-translation-pairs.

word_counts_mapper= pair_words_in_two.map(lambda w: (w,1))

#print (word_counts_mapper.take(3))
word_counts_reducer= word_counts_mapper.reduceByKey(add)

print (word_counts_reducer.take(10))



[(('6.', '6.'), 168), (('(applause)', '(aplaus)'), 1738), (('and', 'ja'), 664), (('gentlemen,', 'härrad,'), 27), (('outdated.', 'aegunud.'), 1), (('these', 'oma'), 2), (('11.', '11.'), 104), (('received:', 'dokumendid'), 220), (('it', 'see'), 416), (('100%', '100%'), 1)]


In [32]:
# step 9 Print some of the most frequently occurring pairs of words.

print(word_counts_reducer.takeOrdered(60, key=lambda x: -x[1]))


[(('is', 'on'), 1749), (('(applause)', '(aplaus)'), 1738), (('this', 'see'), 863), (('we', 'me'), 719), (('that', 'see'), 686), (('and', 'ja'), 664), (('see', '(vt'), 603), (('minutes', 'protokoll)'), 594), (('thank', 'tänan'), 590), (('i', 'ma'), 532), (('-', '-'), 435), (('are', 'on'), 417), (('it', 'see'), 416), (('of', 'hääletuse'), 382), (('explanations', 'selgitused'), 376), (('vote', 'kohta'), 370), (('2.', '2.'), 355), (('1.', '1.'), 352), (('3.', '3.'), 332), (('not', 'ole'), 302), (('written', 'kirjalikud'), 292), (('is', 'ei'), 277), (('have', 'on'), 257), (('our', 'meie'), 248), (('very', 'väga'), 248), (('documents', 'esitatud'), 226), (('received:', 'dokumendid'), 220), (('4.', '4.'), 214), (('mr', 'härra'), 209), (('has', 'on'), 207), (('(', '('), 187), (('5.', '5.'), 187), (('we', 'meil'), 180), (('report:', 'raport:'), 174), (('you.', 'teid.'), 172), (('you,', 'teid,'), 171), (('6.', '6.'), 168), (('that', 'et'), 155), (('7.', '7.'), 154), (('you', 'te'), 153), (('they

In [33]:
spark_context.stop()