In [2]:
from pyspark.sql import SparkSession
from operator import add

spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.113:7077") \
        .appName("esraa_mohammed_a")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",2)\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("INFO")


#A.1.1 :

In [3]:
elines = spark_context.textFile("hdfs://192.168.2.113:9000/europarl/europarl-v7.fi-en.en")
elines.count()


1924942

#A.1.2:

In [4]:
flines = spark_context.textFile("hdfs://192.168.2.113:9000/europarl/europarl-v7.fi-en.fi")
flines.count()

1924942

#A.1.3 :
total number of lines for both languages are : 1924942

#A.1.4: 3 partitions

In [5]:
print(elines.getNumPartitions())
print(flines.getNumPartitions())

3
3


#A.2.1

In [6]:
def preprocess(lines):
    lower = lines.map(lambda line : line.lower())\
                 .flatMap(lambda s: s.split(' '))\
                 .flatMap(lambda w: w.split('\n'))\
                 .map(lambda w: w.strip())\
                 .map(lambda w: (w,1))\
                 .reduceByKey(add)
    
    return lower

#A.2.2

In [7]:
epre = preprocess(elines)
print(epre.take(5))

[('opinion,', 5910), ('with', 307096), ('atmosphere', 668), ('of', 1724191), ('situation', 29453)]


In [8]:
fpre = preprocess(flines)
print(fpre.take(5))

[('koko', 28238), ('ja', 1249156), ('yksin.', 396), ('aina', 17314), ('asiasta.', 3280)]


#A.2.3

In [9]:
print(elines.count())
print(flines.count())

1924942
1924942


#A.3.1

In [10]:
print(epre.takeOrdered(10, key=lambda x: -x[1]))
print(fpre.takeOrdered(10, key=lambda x: -x[1]))

[('the', 3631865), ('of', 1724191), ('to', 1600135), ('and', 1339070), ('in', 1127220), ('that', 830098), ('a', 803908), ('is', 785410), ('for', 553730), ('we', 548087)]
[('ja', 1249156), ('on', 1035956), ('että', 619655), ('euroopan', 257568), ('ei', 246268), ('myös', 178765), ('ovat', 161869), ('se', 152857), ('arvoisa', 149589), ('ole', 134745)]


#A.3.2 :

Most of the frequently used words in english are articles, pronounce, propositions and verb to be. same in French language

In [11]:
elines.map(lambda line: line.split(' ')).filter(lambda w: len(w) < 7).take(10)

[['Resumption', 'of', 'the', 'session'],
 ['Why', 'are', 'there', 'no', 'fire', 'instructions?'],
 ['Why', 'are', 'no-smoking', 'areas', 'not', 'enforced?'],
 ['Agenda'],
 ['Relating', 'to', 'Wednesday:'],
 ['(Applause', 'from', 'the', 'PSE', 'Group)'],
 ['That', 'was', 'the', 'decision.'],
 ['There', 'is', 'no', 'such', 'document!'],
 ['We', 'have', 'agreed', 'to', 'this.'],
 ['(Applause', 'from', 'the', 'PPE-DE', 'Group)']]

In [12]:
flines.map(lambda line: line.split(' ')).filter(lambda w: len(w) < 7).take(10)

[['Istuntokauden', 'uudelleenavaaminen'],
 ['(Parlamentti', 'vietti', 'seisaallaan', 'minuutin', 'hiljaisuuden.)'],
 ['Arvoisa', 'puhemies,', 'käytän', 'työjärjestyspuheenvuoron.'],
 ['Arvoisa', 'puhemies,', 'käytän', 'työjärjestyspuheenvuoron.'],
 ['Se', 'on', 'tapaus', 'Aleksandr', 'Nikitin.'],
 ['Toivon,', 'että', 'sitä', 'käsitellään', 'myönteisessä', 'hengessä.'],
 ['Hyvä', 'kollega,', 'tarkistamme', 'asian.'],
 ['Vaarana', 'todellakin', 'on', 'tuleva', 'sotilasvallankaappaus.'],
 ['Me', 'emme', 'tiedä,', 'mitä', 'tapahtuu.'],
 ['Käsittelyjärjestys']]

#A.4

In [13]:
en_1 = elines.zipWithIndex()
fr_1 = flines.zipWithIndex()

print(en_1.take(2))
print(fr_1.take(2))

[('Resumption of the session', 0), ('I declare resumed the session of the European Parliament adjourned on Friday 17 December 1999, and I would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period.', 1)]
[('Istuntokauden uudelleenavaaminen', 0), ('Julistan perjantaina joulukuun 17. päivänä keskeytetyn Euroopan parlamentin istunnon avatuksi ja esitän vielä kerran vilpittömän toiveeni siitä, että teillä olisi ollut oikein mukava joululoma.', 1)]


In [14]:
en_2 = en_1.map(lambda x: (x[1], x[0]))
fr_2 = fr_1.map(lambda x: (x[1], x[0]))

print(en_2.take(2))
print(fr_2.take(2))

[(0, 'Resumption of the session'), (1, 'I declare resumed the session of the European Parliament adjourned on Friday 17 December 1999, and I would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period.')]
[(0, 'Istuntokauden uudelleenavaaminen'), (1, 'Julistan perjantaina joulukuun 17. päivänä keskeytetyn Euroopan parlamentin istunnon avatuksi ja esitän vielä kerran vilpittömän toiveeni siitä, että teillä olisi ollut oikein mukava joululoma.')]


In [15]:
en_3 = en_2.join(fr_2)


In [16]:
en_4 = en_3.filter(lambda x: len(x[1][0]) > 0 and len(x[1][1]) > 0)

In [17]:
en_4.take(2)

[(1707390,
  ('All of these various attempts have focused on three main difficulties facing workers in relation to freedom of movement and supplementary pensions.',
   'Kaikissa näissä eri yrityksissä keskityttiin kolmeen päävaikeuteen, joita työntekijät kohtaavat suhteessa liikkuvuuden vapauteen ja lisäeläkkeisiin.')),
 (1707396,
  ('It restricts itself to securing the principle of equal treatment for migrant workers.',
   'Siinä rajoitutaan turvaamaan siirtotyöläisen yhdenvertaisen kohtelun periaate.'))]

In [18]:
en_5 = en_4.filter(lambda x: len(x[1][0].split(' ')) > 7 )

In [19]:
en_5.take(2)

[(895800,
  ('We must send out a positive message that encourages dialogue and does not conjure up phantoms and devils.',
   'Meidän on lähetettävä myönteinen viesti, jossa kannustetaan vuoropuheluun ja jossa ei manata esiin haamuja ja piruja.')),
 (895812,
  ("We lived in a communist state where people said that freedom should exist, but only for us, and not for our adversaries, in keeping with the famous motto that 'there is no freedom for enemies of freedom'.",
   'Elimme kommunistivaltiossa, jossa sanottiin, että vapaus on välttämätöntä, mutta vain meille eikä vastustajillemme, mukaillen kuuluisaa mottoa "vapauden vihollisilla ei ole vapautta".'))]

In [20]:
en_6 = en_4.filter(lambda x: len(x[1][0].split(' ')) == len(x[1][1].split(' ')))

In [21]:
en_6.take(2)

[(895806,
  ('Freedom must serve a purpose.',
   'Vapauden on palveltava jotakin tarkoitusta.')),
 (895884,
  ('Europe will not remain silent.', 'EU ei jää vaitonaiseksi asiassa.'))]

In [22]:
list1= en_6.map(lambda x: x[1][0].split(' '))
list2= en_6.map(lambda x: x[1][1].split(' '))
en_7 = list1.zip(list2)

In [40]:
l1= en_7.flatMap(lambda x: x[0])
l2 = en_7.flatMap(lambda x: x[1])
en_77 = l1.zip(l2)

en_77.take(10)

[('Thank', 'Paljon'),
 ('you', 'kiitoksia,'),
 ('very', 'arvoisa'),
 ('much,', 'komission'),
 ('Commissioner.', 'jäsen.'),
 ('For', 'Sitä'),
 ('this', 'oli'),
 ('reason,', 'näin'),
 ('modernisation', 'ollen'),
 ('was', 'pakko')]

In [47]:
en_8=en_77.map(lambda x: (x,1)).foldByKey(0, add).filter(lambda x: x[0][0] != '.')


In [45]:
en_8.takeOrdered(10, key=lambda x: -x[1])

[(('is', 'on'), 4793),
 (('\xa0\xa0', '\xa0\xa0'), 4700),
 (('and', 'ja'), 3661),
 (('(Applause)', '(Suosionosoituksia)'), 2291),
 (('This', 'Tämä'), 1251),
 (('President,', 'puhemies,'), 1044),
 (('must', 'on'), 1032),
 (('is', 'ei'), 1025),
 (('not', 'ole'), 978),
 (('that', 'että'), 935)]