## One to one translation of europen languages using spark (Data : https://ec.europa.eu/jrc/en/language-technologies/dcep)

In [1]:
from pyspark.sql import SparkSession
from operator import add

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.113:7077") \
        .appName("blameyben_lecture1_hdfs_example")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","100s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("INFO")

In [2]:
# The same example, this time using map and reduce from the Spark API, and loading the text file from HDFS.

rdd1 = spark_context.textFile("hdfs://192.168.2.113:9000/europarl/europarl-v7.sv-en.en")


rdd2 = spark_context.textFile ("hdfs://192.168.2.113:9000/europarl/europarl-v7.sv-en.sv")


lines = rdd1.count() 
print(f'total lines= {lines}') 

lines = rdd2.count() 
print(f'total lines= {lines}') 

 

total lines= 1862234
total lines= 1862234


In [43]:
rdd1.getNumPartitions()

2

In [4]:
rdd2.getNumPartitions()

3

In [33]:
rdd1.take(10)

['Resumption of the session',
 'I declare resumed the session of the European Parliament adjourned on Friday 17 December 1999, and I would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period.',
 "Although, as you will have seen, the dreaded 'millennium bug' failed to materialise, still the people in a number of countries suffered a series of natural disasters that truly were dreadful.",
 'You have requested a debate on this subject in the course of the next few days, during this part-session.',
 "In the meantime, I should like to observe a minute' s silence, as a number of Members have requested, on behalf of all the victims concerned, particularly those of the terrible storms, in the various countries of the European Union.",
 "Please rise, then, for this minute' s silence.",
 "(The House rose and observed a minute' s silence)",
 'Madam President, on a point of order.',
 'You will be aware from the press and television that there have be

In [5]:
def Func(lines):
      lines = lines.lower()
      lines = lines.split()
      return lines
mapred = rdd1.map(Func)

In [26]:
# flatMap() to output multiple elements for each input value, split on space and make each word lowercase
rdd3 = rdd1.flatMap(lambda x: x.lower().split(' '))

rdd6 = rdd2.flatMap(lambda x: x.lower().split(' '))


# Map a tuple and append int 1 for each word in words.txt
rdd4 = rdd3.map(lambda x: (x,1))

rdd8 = rdd6.map(lambda x: (x,1))


lines = rdd3.count() 
print(f'total lines= {lines}')  

lines = rdd6.count() 
print(f'total lines= {lines}')  


total lines= 45778381
total lines= 41604741


In [37]:
rdd3.take(10)


[('resumption', 523),
 ('of', 1659758),
 ('i', 501393),
 ('declare', 1360),
 ('european', 268599),
 ('parliament', 73330),
 ('adjourned', 359),
 ('friday', 560),
 ('17', 1718),
 ('1999,', 1035)]

In [36]:
rdd6.take(10)

['återupptagande',
 'av',
 'sessionen',
 'jag',
 'förklarar',
 'europaparlamentets',
 'session',
 'återupptagen',
 'efter',
 'avbrottet']

## Use Spark to compute the 10 most frequently according words in the English language corpus. Repeat for the other language.

## Verify that your results are reasonable.

In [4]:
# flatMap() to output multiple elements for each input value, split on space and make each word lowercase
rdd3 = rdd1.flatMap(lambda x: x.lower().split(' '))

# Map a tuple and append int 1 for each word in words.txt
rdd4 = rdd3.map(lambda x: (x,1))

# Perform aggregation (sum) all the int values for each unique key)
rdd5 = rdd4.reduceByKey(lambda x, y: x+y)
##lines = rdd3.count() 
rdd5.map(lambda x: (x[1], x[0])).sortByKey(False).take(10)
#print(f'total lines= {lines}') 
#wf_broadcast_dict = spark_context.broadcast(rdd5.collectAsMap())
#frequency = wf_broadcast_dict.value['cat']
# ... the same number of words?

[(3498375, 'the'),
 (1659758, 'of'),
 (1539760, 'to'),
 (1288401, 'and'),
 (1085993, 'in'),
 (797516, 'that'),
 (773522, 'a'),
 (758050, 'is'),
 (534242, 'for'),
 (522849, 'we')]

In [5]:
rdd5.take(5)

[('resumption', 523),
 ('of', 1659758),
 ('i', 501393),
 ('declare', 1360),
 ('european', 268599)]

## Use this parallel corpus to mine some translations in the form of word pairs, for the two languages. Do this by pairing words found on short lines with the same number of words respectively. We (incorrectly) assume the words stay in the same order when translated.

##  Follow this approach. Work with the pair of RDDs here. Hint: make a new pair of RDDs for each step, sv_1, en_1, sv_2, en_2, ...

## 1. Key the lines by their line number (hint: ZipWithIndex()).

In [6]:
en1 = rdd1.zipWithIndex()
en1.take(5)

[('Resumption of the session', 0),
 ('I declare resumed the session of the European Parliament adjourned on Friday 17 December 1999, and I would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period.',
  1),
 ("Although, as you will have seen, the dreaded 'millennium bug' failed to materialise, still the people in a number of countries suffered a series of natural disasters that truly were dreadful.",
  2),
 ('You have requested a debate on this subject in the course of the next few days, during this part-session.',
  3),
 ("In the meantime, I should like to observe a minute' s silence, as a number of Members have requested, on behalf of all the victims concerned, particularly those of the terrible storms, in the various countries of the European Union.",
  4)]

In [7]:
sv1 = rdd2.zipWithIndex()
sv1.take(5)

[('Återupptagande av sessionen', 0),
 ('Jag förklarar Europaparlamentets session återupptagen efter avbrottet den 17 december. Jag vill på nytt önska er ett gott nytt år och jag hoppas att ni haft en trevlig semester.',
  1),
 ('Som ni kunnat konstatera ägde "den stora år 2000-buggen" aldrig rum. Däremot har invånarna i ett antal av våra medlemsländer drabbats av naturkatastrofer som verkligen varit förskräckliga.',
  2),
 ('Ni har begärt en debatt i ämnet under sammanträdesperiodens kommande dagar.',
  3),
 ('Till dess vill jag att vi, som ett antal kolleger begärt, håller en tyst minut för offren för bl.a. stormarna i de länder i Europeiska unionen som drabbats.',
  4)]

## 2. Swap the key and value - so that the line number is the key.

In [8]:
rev_en = en1.map(lambda x: (x[1], x[0]))
rev_en.take(5)

[(0, 'Resumption of the session'),
 (1,
  'I declare resumed the session of the European Parliament adjourned on Friday 17 December 1999, and I would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period.'),
 (2,
  "Although, as you will have seen, the dreaded 'millennium bug' failed to materialise, still the people in a number of countries suffered a series of natural disasters that truly were dreadful."),
 (3,
  'You have requested a debate on this subject in the course of the next few days, during this part-session.'),
 (4,
  "In the meantime, I should like to observe a minute' s silence, as a number of Members have requested, on behalf of all the victims concerned, particularly those of the terrible storms, in the various countries of the European Union.")]

In [9]:
rev_sv = sv1.map(lambda x: (x[1], x[0]))
rev_sv.take(5)

[(0, 'Återupptagande av sessionen'),
 (1,
  'Jag förklarar Europaparlamentets session återupptagen efter avbrottet den 17 december. Jag vill på nytt önska er ett gott nytt år och jag hoppas att ni haft en trevlig semester.'),
 (2,
  'Som ni kunnat konstatera ägde "den stora år 2000-buggen" aldrig rum. Däremot har invånarna i ett antal av våra medlemsländer drabbats av naturkatastrofer som verkligen varit förskräckliga.'),
 (3,
  'Ni har begärt en debatt i ämnet under sammanträdesperiodens kommande dagar.'),
 (4,
  'Till dess vill jag att vi, som ett antal kolleger begärt, håller en tyst minut för offren för bl.a. stormarna i de länder i Europeiska unionen som drabbats.')]

##  3. Join the two RDDs together according to the line number key, so you have pairs of matching lines.

In [10]:
join=rev_en.join(rev_sv)
join.take(10)

[(0, ('Resumption of the session', 'Återupptagande av sessionen')),
 (5,
  ("Please rise, then, for this minute' s silence.",
   'Jag ber er resa er för en tyst minut.')),
 (10,
  ("Would it be appropriate for you, Madam President, to write a letter to the Sri Lankan President expressing Parliament's regret at his and the other violent deaths in Sri Lanka and urging her to do everything she possibly can to seek a peaceful reconciliation to a very difficult situation?",
   'Skulle det vara möjligt för er, fru talman, att skriva ett brev till den srilankesiska presidenten i vilket parlamentets beklagande uttrycks över hans och de övriga brutala dödsfallen i Sri Lanka och uppmanar henne att göra allt som står i hennes makt för att få en fredlig lösning på en mycket komplicerad situation?')),
 (15,
  ('My question relates to something that will come up on Thursday and which I will then raise again.',
   'Min fråga har att göra med något som kommer att behandlas på torsdag och som jag då ko

##  4. Filter to exclude line pairs that have an empty/missing “corresponding” sentence.

In [11]:
non_empty_lines = join.filter(lambda x: len(x[1][0])>0 and len(x[1][1])>0).cache()

In [12]:
non_empty_lines.take(10)

[(0, ('Resumption of the session', 'Återupptagande av sessionen')),
 (5,
  ("Please rise, then, for this minute' s silence.",
   'Jag ber er resa er för en tyst minut.')),
 (10,
  ("Would it be appropriate for you, Madam President, to write a letter to the Sri Lankan President expressing Parliament's regret at his and the other violent deaths in Sri Lanka and urging her to do everything she possibly can to seek a peaceful reconciliation to a very difficult situation?",
   'Skulle det vara möjligt för er, fru talman, att skriva ett brev till den srilankesiska presidenten i vilket parlamentets beklagande uttrycks över hans och de övriga brutala dödsfallen i Sri Lanka och uppmanar henne att göra allt som står i hennes makt för att få en fredlig lösning på en mycket komplicerad situation?')),
 (15,
  ('My question relates to something that will come up on Thursday and which I will then raise again.',
   'Min fråga har att göra med något som kommer att behandlas på torsdag och som jag då ko

## 5. Filter to leave only pairs of sentences with a small number of words per sentence, this should give a more reliable translation (you can experiment).

In [13]:
non_empty_lines.sortBy(lambda x : len(x[1][1].split(' '))).take(10)
#non_empty_lines.map(lambda x: (x[1], x[0])).sortByKey(True).take(4)
#non_empty_lines.map(lambda x: (x[0], x[1][0], x[1][1], len(x[1][1].split(' ')) )).sortBy(lambda x : x[3]).take(10)
#non_empty_lines.map(lambda x: (len(x[1][1].split(' ')), x[1], x[0])).sortByKey().take(4)

[(50, ('Agenda', 'Arbetsplan')),
 (90, ('Thank you very much.', 'Tack.')),
 (1295, ('Why?', 'Varför?')),
 (4465, ('(Applause)', '(Applåder)')),
 (4725, ('.', '.')),
 (5410, ('Court of First Instance', 'Förstainstansrätten')),
 (6690, ('.', '.')),
 (8585, ('Welcome', 'Välkomsthälsning')),
 (8760, ('.', '.')),
 (9045, ('.', '.'))]

In [14]:
non_empty_lines_no_dots = join.filter(lambda x: len(x[1][0])>0 and len(x[1][1])>0 and x[1][0]!= '.' and x[1][1]!= '.').cache()

In [15]:
non_empty_lines_no_dots.take(10)

[(0, ('Resumption of the session', 'Återupptagande av sessionen')),
 (5,
  ("Please rise, then, for this minute' s silence.",
   'Jag ber er resa er för en tyst minut.')),
 (10,
  ("Would it be appropriate for you, Madam President, to write a letter to the Sri Lankan President expressing Parliament's regret at his and the other violent deaths in Sri Lanka and urging her to do everything she possibly can to seek a peaceful reconciliation to a very difficult situation?",
   'Skulle det vara möjligt för er, fru talman, att skriva ett brev till den srilankesiska presidenten i vilket parlamentets beklagande uttrycks över hans och de övriga brutala dödsfallen i Sri Lanka och uppmanar henne att göra allt som står i hennes makt för att få en fredlig lösning på en mycket komplicerad situation?')),
 (15,
  ('My question relates to something that will come up on Thursday and which I will then raise again.',
   'Min fråga har att göra med något som kommer att behandlas på torsdag och som jag då ko

In [16]:
non_empty_lines_no_dots.sortBy(lambda x : len(x[1][1].split(' '))).take(10)

[(1297, ('No.', 'Nej.')),
 (1557, ('VOTE', 'OMRÖSTNING')),
 (3662, ("'Long-winded' .", 'Långrandigt.')),
 (4567, ('(Applause)', '(Applåder)')),
 (4837, ('Intergovernmental Conference', 'Regeringskonferensen')),
 (5662, ('ALTENER', 'Altener')),
 (7012, ('President.', 'Talmannen.')),
 (11507, ('Welcome', 'Välkomsthälsning')),
 (14877, ('Why not?', 'Varför?')),
 (15472, ('VOTE', 'OMRÖSTNING'))]

## 6. Filter to leave only pairs of sentences with the same number of words in each sentence.


In [17]:
non_empty_lines_equal = non_empty_lines_no_dots.filter(lambda x: len(x[1][0]) == len(x[1][1])).cache()

In [18]:
non_empty_lines_equal.take(10)

[(30,
  ('Now, however, he is to go before the courts once more because the public prosecutor is appealing.',
   'Nu är det emellertid så att han skall åtalas på nytt i och med att allmänne åklagaren överklagar.')),
 (305,
  ('Therefore, we support the establishment of an agricultural and rural development policy which is consistent with the objectives we have set. We want rural areas, at the dawn of the 21st century, to be competitive and multi-functional, both with regard to agriculture and with regard to opening up to the diversity of non-agricultural activities.',
   'Därför förespråkar vi att man fastslår en politik för jordbruket och landsbygdens utveckling som överensstämmer med de mål som vi har satt upp och att landsbygden, i början av 2000-talet, skall vara konkurrenskraftig och mångfunktionell, såväl ur jordbrukssynpunkt som beträffande en öppen attityd gentemot olika verksamheter utanför jordbruket.')),
 (555,
  ('The EU really deserves such a gesture, but differences betwe

In [6]:
#non_empty_lines_equal.flatMap(lambda x: zip(x[0],x[1])).take(3)

## 7. For each sentence pair, map so that you pair each (in order) word in the two sentences. We no longer need the line numbers. (hint: use python’s built in zip() function)
## 8. Use reduce to count the number of occurrences of the word-translation-pairs.

## 9. Print some of the most frequently occurring pairs of words.

In [19]:
val_en = non_empty_lines_equal.map(lambda x: x[1][0])
val_sv = non_empty_lines_equal.map(lambda x: x[1][1])
#y = sc.parallelize(range(1000, 1005))
#non_empty_lines_equal.zip(non_empty_lines_equal).collect()

In [20]:
val_en.take(2)

['Now, however, he is to go before the courts once more because the public prosecutor is appealing.',
 'Therefore, we support the establishment of an agricultural and rural development policy which is consistent with the objectives we have set. We want rural areas, at the dawn of the 21st century, to be competitive and multi-functional, both with regard to agriculture and with regard to opening up to the diversity of non-agricultural activities.']

In [21]:
val_sv.take(2)

['Nu är det emellertid så att han skall åtalas på nytt i och med att allmänne åklagaren överklagar.',
 'Därför förespråkar vi att man fastslår en politik för jordbruket och landsbygdens utveckling som överensstämmer med de mål som vi har satt upp och att landsbygden, i början av 2000-talet, skall vara konkurrenskraftig och mångfunktionell, såväl ur jordbrukssynpunkt som beträffande en öppen attityd gentemot olika verksamheter utanför jordbruket.']

In [31]:
words_en = val_en.map(lambda line: line.split(' '))
words_sv = val_sv.map(lambda line: line.split(' '))
pair_words = words_en.zip(words_sv)
pairs = pair_words.flatMap(lambda x: zip(x[0],x[1]))

In [34]:
pairs.take(10)

[('Now,', 'Nu'),
 ('however,', 'är'),
 ('he', 'det'),
 ('is', 'emellertid'),
 ('to', 'så'),
 ('go', 'att'),
 ('before', 'han'),
 ('the', 'skall'),
 ('courts', 'åtalas'),
 ('once', 'på')]

In [39]:
pairs2 = pairs.reduceByKey(lambda x, y: x+y)
##lines = rdd3.count() 
pairs2.map(lambda x: (x[1], x[0])).sortByKey(False).take(20)

[('…\xa0för', '…\xa0for'),
 ('•••', '•'),
 ('”öppenhet”,', "'control',"),
 ('”öga', 'terror,'),
 ('”Är', 'Ettore'),
 ('”vår', "'our"),
 ('”vänstervriden”', "wing'"),
 ('”vinn-vinn-situation”.', "'win-win"),
 ('”verktygslådan”', "'tool"),
 ('”vart', "'every"),
 ('”varför', "'Why"),
 ('”valuta', "'value"),
 ('”utan', "'without"),
 ('”unga', '‘young'),
 ('”täppa', "'stop-gap"),
 ('”turism', '‘tourism'),
 ('”tragedi”:', "'disaster':"),
 ('”tillfälliga”', "'temporary'"),
 ('”tamtam”-system', "'tom-tom'"),
 ('”svarta', "'black")]

In [None]:
# release the cores for another application!
spark_context.stop()