In [234]:
from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.87:7077") \
        .appName("Parvez_A3_PA")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",6)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [235]:
en_lines = spark_context.textFile("hdfs://192.168.2.87:9000/europarl/europarl-v7.fi-en.en")
print(en_lines.take(10))

['Resumption of the session', 'I declare resumed the session of the European Parliament adjourned on Friday 17 December 1999, and I would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period.', "Although, as you will have seen, the dreaded 'millennium bug' failed to materialise, still the people in a number of countries suffered a series of natural disasters that truly were dreadful.", 'You have requested a debate on this subject in the course of the next few days, during this part-session.', "In the meantime, I should like to observe a minute' s silence, as a number of Members have requested, on behalf of all the victims concerned, particularly those of the terrible storms, in the various countries of the European Union.", "Please rise, then, for this minute' s silence.", "(The House rose and observed a minute' s silence)", 'Madam President, on a point of order.', 'You will be aware from the press and television that there have been a num

In [184]:
fi_lines = spark_context.textFile("hdfs://192.168.2.87:9000/europarl/europarl-v7.fi-en.fi")

In [185]:
en_lines.count()

1924942

In [186]:
fi_lines.count()

1924942

In [187]:
def tokenize(lines):
    return lines.split()

In [188]:
en_lines.getNumPartitions()

3

In [189]:
fi_lines.getNumPartitions()

3

In [190]:
def lower_case(lines):
    return lines.lower()
    

In [191]:
en_lines1 = en_lines.map(lower_case)

In [192]:
fi_lines1 = fi_lines.map(lower_case)

In [193]:
en_lines2 = en_lines1.map(tokenize)

In [194]:
en_lines1.take(10)

['resumption of the session',
 'i declare resumed the session of the european parliament adjourned on friday 17 december 1999, and i would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period.',
 "although, as you will have seen, the dreaded 'millennium bug' failed to materialise, still the people in a number of countries suffered a series of natural disasters that truly were dreadful.",
 'you have requested a debate on this subject in the course of the next few days, during this part-session.',
 "in the meantime, i should like to observe a minute' s silence, as a number of members have requested, on behalf of all the victims concerned, particularly those of the terrible storms, in the various countries of the european union.",
 "please rise, then, for this minute' s silence.",
 "(the house rose and observed a minute' s silence)",
 'madam president, on a point of order.',
 'you will be aware from the press and television that there have be

In [195]:
en_lines2.take(10)

[['resumption', 'of', 'the', 'session'],
 ['i',
  'declare',
  'resumed',
  'the',
  'session',
  'of',
  'the',
  'european',
  'parliament',
  'adjourned',
  'on',
  'friday',
  '17',
  'december',
  '1999,',
  'and',
  'i',
  'would',
  'like',
  'once',
  'again',
  'to',
  'wish',
  'you',
  'a',
  'happy',
  'new',
  'year',
  'in',
  'the',
  'hope',
  'that',
  'you',
  'enjoyed',
  'a',
  'pleasant',
  'festive',
  'period.'],
 ['although,',
  'as',
  'you',
  'will',
  'have',
  'seen,',
  'the',
  'dreaded',
  "'millennium",
  "bug'",
  'failed',
  'to',
  'materialise,',
  'still',
  'the',
  'people',
  'in',
  'a',
  'number',
  'of',
  'countries',
  'suffered',
  'a',
  'series',
  'of',
  'natural',
  'disasters',
  'that',
  'truly',
  'were',
  'dreadful.'],
 ['you',
  'have',
  'requested',
  'a',
  'debate',
  'on',
  'this',
  'subject',
  'in',
  'the',
  'course',
  'of',
  'the',
  'next',
  'few',
  'days,',
  'during',
  'this',
  'part-session.'],
 ['in',
  

In [196]:
fi_lines2 = fi_lines1.map(tokenize)

In [197]:
fi_lines2.take(10)

[['istuntokauden', 'uudelleenavaaminen'],
 ['julistan',
  'perjantaina',
  'joulukuun',
  '17.',
  'päivänä',
  'keskeytetyn',
  'euroopan',
  'parlamentin',
  'istunnon',
  'avatuksi',
  'ja',
  'esitän',
  'vielä',
  'kerran',
  'vilpittömän',
  'toiveeni',
  'siitä,',
  'että',
  'teillä',
  'olisi',
  'ollut',
  'oikein',
  'mukava',
  'joululoma.'],
 ['kuten',
  'olette',
  'varmaan',
  'saattaneet',
  'huomata,',
  'vuodenvaihteeseen',
  '2000',
  'povattuja',
  'suuria',
  'tietokoneongelmia',
  'ei',
  'ilmennytkään.',
  'sen',
  'sijaan',
  'todella',
  'kauheat',
  'luonnonkatastrofit',
  'koettelivat',
  'kansalaisia',
  'joissakin',
  'unionimme',
  'maissa.'],
 ['te',
  'olette',
  'esittäneet',
  'toiveen,',
  'että',
  'tästä',
  'asiasta',
  'keskusteltaisiin',
  'lähipäivinä',
  'tämän',
  'istuntojakson',
  'aikana.'],
 ['sillä',
  'välin',
  'toivoisin,',
  'kuten',
  'useampi',
  'kollega',
  'on',
  'minulle',
  'esittänytkin,',
  'että',
  'viettäisimme',
  'minuu

In [198]:
en_lines2.count()


1924942

In [199]:
fi_lines2.count()

1924942

In [200]:
from operator import add

In [201]:
def f(x): return x

In [202]:
freq_words_en = en_lines2.flatMap(lambda w: w)\
                .map(lambda w: (w,1))\
                .reduceByKey(add)

In [203]:
print (freq_words_en.takeOrdered(10,key = lambda x: -x[1]))

[('the', 3631999), ('of', 1724330), ('to', 1600201), ('and', 1339315), ('in', 1127314), ('that', 830159), ('a', 804214), ('is', 785453), ('for', 553758), ('we', 548119)]


In [204]:
freq_words_fi = fi_lines2.flatMap(lambda w: w)\
                .map(lambda w: (w,1))\
                .reduceByKey(add)

In [205]:
print (freq_words_fi.takeOrdered(10,key = lambda x: -x[1]))

[('ja', 1249993), ('on', 1035987), ('että', 619663), ('euroopan', 257606), ('ei', 246274), ('myös', 178767), ('ovat', 161871), ('se', 152858), ('arvoisa', 149626), ('ole', 134745)]


In [206]:
en_1 = en_lines2.zipWithIndex()
fi_1 = fi_lines2.zipWithIndex()

In [207]:
en_1.take(2)

[(['resumption', 'of', 'the', 'session'], 0),
 (['i',
   'declare',
   'resumed',
   'the',
   'session',
   'of',
   'the',
   'european',
   'parliament',
   'adjourned',
   'on',
   'friday',
   '17',
   'december',
   '1999,',
   'and',
   'i',
   'would',
   'like',
   'once',
   'again',
   'to',
   'wish',
   'you',
   'a',
   'happy',
   'new',
   'year',
   'in',
   'the',
   'hope',
   'that',
   'you',
   'enjoyed',
   'a',
   'pleasant',
   'festive',
   'period.'],
  1)]

In [208]:
en_2 = en_1.map(lambda w: (w[1], w[0]))
fi_2 = fi_1.map(lambda w: (w[1], w[0]))

In [209]:
fi_2.take(2)

[(0, ['istuntokauden', 'uudelleenavaaminen']),
 (1,
  ['julistan',
   'perjantaina',
   'joulukuun',
   '17.',
   'päivänä',
   'keskeytetyn',
   'euroopan',
   'parlamentin',
   'istunnon',
   'avatuksi',
   'ja',
   'esitän',
   'vielä',
   'kerran',
   'vilpittömän',
   'toiveeni',
   'siitä,',
   'että',
   'teillä',
   'olisi',
   'ollut',
   'oikein',
   'mukava',
   'joululoma.'])]

In [222]:
joined_lines = en_2.join(fi_2)

In [223]:
joined_lines.takeOrdered(10)

[(0,
  (['resumption', 'of', 'the', 'session'],
   ['istuntokauden', 'uudelleenavaaminen'])),
 (1,
  (['i',
    'declare',
    'resumed',
    'the',
    'session',
    'of',
    'the',
    'european',
    'parliament',
    'adjourned',
    'on',
    'friday',
    '17',
    'december',
    '1999,',
    'and',
    'i',
    'would',
    'like',
    'once',
    'again',
    'to',
    'wish',
    'you',
    'a',
    'happy',
    'new',
    'year',
    'in',
    'the',
    'hope',
    'that',
    'you',
    'enjoyed',
    'a',
    'pleasant',
    'festive',
    'period.'],
   ['julistan',
    'perjantaina',
    'joulukuun',
    '17.',
    'päivänä',
    'keskeytetyn',
    'euroopan',
    'parlamentin',
    'istunnon',
    'avatuksi',
    'ja',
    'esitän',
    'vielä',
    'kerran',
    'vilpittömän',
    'toiveeni',
    'siitä,',
    'että',
    'teillä',
    'olisi',
    'ollut',
    'oikein',
    'mukava',
    'joululoma.'])),
 (2,
  (['although,',
    'as',
    'you',
    'will',
    'h

In [224]:
joined_line1 = joined_lines.filter(lambda w: (w[1][0] != []) and (w[1][1] != []))

In [225]:
joined_line1.count()

1918494

In [226]:
joined_line2 = joined_line1.filter(lambda w: ((len(w[1][0]) >= 5) or (len(w[1][1]) >= 5)))

In [227]:
joined_line2.count()

1870107

In [228]:
joined_line3 = joined_line2.filter(lambda w: (len(w[1][0]) == len(w[1][1])))

In [229]:
joined_line3.count()

41868

In [230]:
mapped_words = joined_line3.map(lambda w: list(zip(w[1][0], w[1][1])))

In [231]:
mapped_words.take(2)

[[('why', 'miksei'),
  ('has', 'tässä'),
  ('this', 'asiassa'),
  ('been', 'ole'),
  ('paralysed', 'tapahtunut'),
  ('for', 'mitään'),
  ('ten', 'kymmeneen'),
  ('years?', 'vuoteen?')],
 [('in', 'uskallan'),
  ('fact', 'sitä'),
  ('this', 'paitsi'),
  ('process', 'sanoa,'),
  ('is', 'että'),
  ('already', '"eurooppa'),
  ('under', 'merkitsee'),
  ('way.', 'rauhaa".')]]

In [232]:
final_result = mapped_words.flatMap(lambda w: w)\
        .map(lambda w: (w,1))\
        .reduceByKey(add)
        

In [233]:
print (final_result.takeOrdered(10,key = lambda x: -x[1]))

[(('and', 'ja'), 4268), (('is', 'on'), 3274), (('this', 'tämä'), 1473), (('president,', 'puhemies,'), 1362), (('mr', 'arvoisa'), 1290), (('we', 'meidän'), 1204), (('that', 'että'), 1122), (('must', 'on'), 1076), (('not', 'ole'), 1007), (('it', 'se'), 987)]
