In [21]:
from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.87:7077") \
        .appName("reuben_sajith_rajkumar_A3")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [22]:
from operator import add

In [23]:
lines_en = spark_context.newAPIHadoopFile(
    'hdfs://192.168.2.87:9000/europarl/europarl-v7.sv-en.en',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
)\
.cache() # Keep this RDD in memory!

In [24]:
lines_sv = spark_context.newAPIHadoopFile(
    'hdfs://192.168.2.87:9000/europarl/europarl-v7.sv-en.sv',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text',
)\
.cache() # Keep this RDD in memory!

In [25]:
lines_en.count()

1862234

In [26]:
lines_sv.count()

1862234

In [27]:
lines_en.getNumPartitions()

2

In [28]:
lines_sv.getNumPartitions()

3

In [29]:
def preprocess(rdd):
    return(rdd)\
    .map(lambda words: words[1].lower())\
    .map(lambda lines: lines.split(' '))

In [30]:
lines_en_pp = preprocess(lines_en)
lines_en_pp.take(10)

[['resumption', 'of', 'the', 'session'],
 ['i',
  'declare',
  'resumed',
  'the',
  'session',
  'of',
  'the',
  'european',
  'parliament',
  'adjourned',
  'on',
  'friday',
  '17',
  'december',
  '1999,',
  'and',
  'i',
  'would',
  'like',
  'once',
  'again',
  'to',
  'wish',
  'you',
  'a',
  'happy',
  'new',
  'year',
  'in',
  'the',
  'hope',
  'that',
  'you',
  'enjoyed',
  'a',
  'pleasant',
  'festive',
  'period.'],
 ['although,',
  'as',
  'you',
  'will',
  'have',
  'seen,',
  'the',
  'dreaded',
  "'millennium",
  "bug'",
  'failed',
  'to',
  'materialise,',
  'still',
  'the',
  'people',
  'in',
  'a',
  'number',
  'of',
  'countries',
  'suffered',
  'a',
  'series',
  'of',
  'natural',
  'disasters',
  'that',
  'truly',
  'were',
  'dreadful.'],
 ['you',
  'have',
  'requested',
  'a',
  'debate',
  'on',
  'this',
  'subject',
  'in',
  'the',
  'course',
  'of',
  'the',
  'next',
  'few',
  'days,',
  'during',
  'this',
  'part-session.'],
 ['in',
  

In [31]:
lines_sv_pp = preprocess(lines_sv)
lines_sv_pp.take(10)

[['återupptagande', 'av', 'sessionen'],
 ['jag',
  'förklarar',
  'europaparlamentets',
  'session',
  'återupptagen',
  'efter',
  'avbrottet',
  'den',
  '17',
  'december.',
  'jag',
  'vill',
  'på',
  'nytt',
  'önska',
  'er',
  'ett',
  'gott',
  'nytt',
  'år',
  'och',
  'jag',
  'hoppas',
  'att',
  'ni',
  'haft',
  'en',
  'trevlig',
  'semester.'],
 ['som',
  'ni',
  'kunnat',
  'konstatera',
  'ägde',
  '"den',
  'stora',
  'år',
  '2000-buggen"',
  'aldrig',
  'rum.',
  'däremot',
  'har',
  'invånarna',
  'i',
  'ett',
  'antal',
  'av',
  'våra',
  'medlemsländer',
  'drabbats',
  'av',
  'naturkatastrofer',
  'som',
  'verkligen',
  'varit',
  'förskräckliga.'],
 ['ni',
  'har',
  'begärt',
  'en',
  'debatt',
  'i',
  'ämnet',
  'under',
  'sammanträdesperiodens',
  'kommande',
  'dagar.'],
 ['till',
  'dess',
  'vill',
  'jag',
  'att',
  'vi,',
  'som',
  'ett',
  'antal',
  'kolleger',
  'begärt,',
  'håller',
  'en',
  'tyst',
  'minut',
  'för',
  'offren',
  'f

In [32]:
lines_en_pp.count()

1862234

In [33]:
lines_sv_pp.count()

1862234

In [34]:
def frequent(rdd):
    return(rdd)\
    .flatMap(lambda w: w)\
    .map(lambda w: (w,1))

In [35]:
rdd1 = frequent(lines_en_pp)
lines_en_freq = rdd1.reduceByKey(add)
print(lines_en_freq.takeOrdered(10, key=lambda x: -x[1]))

[('the', 3498375), ('of', 1659758), ('to', 1539760), ('and', 1288401), ('in', 1085993), ('that', 797516), ('a', 773522), ('is', 758050), ('for', 534242), ('we', 522849)]


In [36]:
rdd2 = frequent(lines_sv_pp)
lines_sv_freq = rdd2.reduceByKey(add)
print(lines_sv_freq.takeOrdered(10, key=lambda x: -x[1]))

[('att', 1706293), ('och', 1344830), ('i', 1050774), ('det', 924866), ('som', 913276), ('för', 908680), ('av', 738068), ('är', 694381), ('en', 620310), ('vi', 539797)]


In [37]:
lines_en_index = lines_en_pp.zipWithIndex()\
                            .map(lambda x: (x[1],x[0]))

In [38]:
lines_sv_index = lines_sv_pp.zipWithIndex()\
                            .map(lambda x: (x[1],x[0]))

In [42]:
rdd_join = lines_en_index.join(lines_sv_index)

[(968120,
  (['a',
    'number',
    'of',
    'members',
    'of',
    'the',
    'european',
    'parliament,',
    'amongst',
    'others',
    'mr',
    'schulz,',
    'mr',
    'meyer-pleite,',
    'mr',
    'liese,',
    'mr',
    'dos',
    'santos',
    'and',
    'several',
    'others,',
    'regretfully',
    'i',
    'cannot',
    'mention',
    'them',
    'all,',
    'have',
    'mentioned',
    'the',
    'cuban',
    'issue.'],
   ['ett',
    'antal',
    'europaparlamentsledamöter,',
    'bland',
    'andra',
    'martin',
    'schulz,',
    'willy',
    'meyer-pleite,',
    'peter',
    'liese,',
    'manuel',
    'antónio',
    'dos',
    'santos',
    'och',
    'flera',
    'andra,',
    'jag',
    'kan',
    'tyvärr',
    'inte',
    'nämna',
    'dem',
    'alla,',
    'har',
    'tagit',
    'upp',
    'frågan',
    'om',
    'kuba.']))]

In [46]:
rdd_filter1 = rdd_join.filter(lambda x: x[1][0]!=" ")\
                  .filter(lambda x: x[1][1]!=" ")
rdd_filter2 = rdd_filter1.filter(lambda x: len(x[1][0])==len(x[1][1]))
rdd_filter2.take(10)


[(185385,
  (['true,',
    'we',
    'have',
    'plans',
    'to',
    'set',
    'up',
    'a',
    'european',
    'platform',
    'to',
    'test',
    'the',
    'vaccine,',
    'even',
    'if',
    'there',
    'is',
    'already',
    'a',
    'fully',
    'developed',
    'african',
    'platform.'],
   ['jo,',
    'vi',
    'har',
    'planer',
    'på',
    'att',
    'upprätta',
    'en',
    'europeisk',
    'bas',
    'för',
    'att',
    'utprova',
    'vaccinet',
    'även',
    'om',
    'det',
    'redan',
    'finns',
    'en',
    'fullt',
    'utbyggd',
    'afrikansk',
    'bas.'])),
 (389965,
  (['and',
    'they',
    'must',
    'also',
    'be',
    'respected',
    'in',
    'this',
    'instance.'],
   ['och', 'de', 'bör', 'respekteras', 'även', 'i', 'det', 'här', 'fallet.'])),
 (673780,
  (['illegal',
    'immigration',
    'and',
    'potential',
    'threats',
    'to',
    'security',
    'posed',
    'by',
    'criminal',
    'activities',
    'must',


In [56]:
en = rdd_filter2.flatMap(lambda x: x[1][0])
sv = rdd_filter2.flatMap(lambda x: x[1][1])

join = en.zip(sv)
join.take(10)

maps = join.map(lambda x: (x,1))
maps.take(10)

reducer = maps.reduceByKey(add)
print(reducer.takeOrdered(60, key=lambda x: -x[1]))


[(('and', 'och'), 35328), (('is', 'är'), 27856), (('i', 'jag'), 27155), (('we', 'vi'), 26032), (('in', 'i'), 18605), (('to', 'att'), 18594), (('that', 'att'), 16472), (('a', 'en'), 15455), (('it', 'det'), 14683), (('of', 'av'), 13001), (('the', 'den'), 12603), (('this', 'detta'), 11423), (('not', 'inte'), 11141), (('the', 'det'), 10754), (('for', 'för'), 10247), (('the', 'de'), 10243), (('the', 'att'), 9390), (('a', 'ett'), 9381), (('the', 'i'), 8759), (('have', 'har'), 8179), (('to', 'till'), 7153), (('that', 'det'), 6740), (('the', 'av'), 6159), (('mr', 'herr'), 6046), (('with', 'med'), 5891), (('are', 'är'), 5879), (('to', 'för'), 5750), (('will', 'att'), 5683), (('there', 'det'), 5612), (('this', 'denna'), 5495), (('will', 'kommer'), 5423), (('the', 'för'), 5313), (('has', 'har'), 5268), (('the', 'och'), 5116), (('as', 'som'), 4947), (('must', 'måste'), 4934), (('this', 'det'), 4827), (('that', 'som'), 4562), (('the', 'som'), 4300), (('\xa0\xa0', '\xa0\xa0'), 4164), (('but', 'men')

In [57]:
# release the cores for another application!
spark_context.stop()