In [1]:
from pyspark.sql import SparkSession
import re
from operator import add

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.87:7077") \
        .appName("mei_wu_part-1")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()
#        .config('spark.executor.cores', 2)\


# Old API (RDD)
spark_context = spark_session.sparkContext

rdd_en = spark_context.newAPIHadoopFile(
    'hdfs://192.168.2.87:9000/europarl/europarl-v7.sv-en.en',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text'
)\
.cache() # Keep this RDD in memory!

rdd_sv = spark_context.newAPIHadoopFile(
    'hdfs://192.168.2.87:9000/europarl/europarl-v7.sv-en.sv',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text'
)\
.cache() # Keep this RDD in memory!

In [28]:
def rdd_checks(en, sv):
    en_count = en.count() # A.1.1 Counts the lines in English
    sv_count = sv.count() # A.1.2 Counts the lines in Swedish
    
    en_part = en.getNumPartitions() # A.1.4 Count number of partitions in English
    sv_part = en.getNumPartitions() # A.1.4 Count number of partitions in Swedish
    
    out_message = {"A.1-2": f"Number of lines, English: {en_count} and Swedish: {sv_count} ",
                  "A.1.3": f"Verify the lines are of same length in both languages: {en_count == sv_count}",
                  "A.1.4": f"Number of Partitions, English: {en_part} and Swedish: {sv_part} "}
    
    return [(x, y) for x, y in out_message.items()] 

rdd_checks(rdd_en, rdd_sv)

[('A.1-2', 'Number of lines, English: 1862234 and Swedish: 1862234 '),
 ('A.1.3', 'Verify the lines are of same length in both languages: True'),
 ('A.1.4', 'Number of Partitions, English: 2 and Swedish: 2 ')]

In [29]:
# A.2.1
def lowerNsplit(w):
    w = w.lower().split(' ')
    return w

# A.2.2
words_en = rdd_en.flatMapValues(lowerNsplit) # Put individual words per sentence in their own tuples.
words_en.take(10)

[(0, 'resumption'),
 (0, 'of'),
 (0, 'the'),
 (0, 'session'),
 (26, 'i'),
 (26, 'declare'),
 (26, 'resumed'),
 (26, 'the'),
 (26, 'session'),
 (26, 'of')]

In [30]:
# A.2.2
words_sv = rdd_sv.flatMapValues(lowerNsplit)
words_sv.take(10)

[(0, 'återupptagande'),
 (0, 'av'),
 (0, 'sessionen'),
 (29, 'jag'),
 (29, 'förklarar'),
 (29, 'europaparlamentets'),
 (29, 'session'),
 (29, 'återupptagen'),
 (29, 'efter'),
 (29, 'avbrottet')]

In [9]:
# A.2.3 Verify that the line counts still match after the pre-processing.
# Reduce the words by key before counting.
count_en = words_en.reduceByKey(lambda x, y: x).count()
count_sv = words_sv.reduceByKey(lambda x, y: x).count()

count_en == count_sv

True

In [31]:
# A.3.1 Use Spark to compute the 10 most frequently according words in the English language corpus. Repeat for the other language.
result_en = words_en.map(lambda x: (x[1], 1)).reduceByKey(add)
result_sv = words_sv.map(lambda x: (x[1], 1)).reduceByKey(add)

# A.3.2 Verify that your results are reasonable.
result_en.takeOrdered(10, key=lambda x: -x[1])

[('the', 3498375),
 ('of', 1659758),
 ('to', 1539760),
 ('and', 1288401),
 ('in', 1085993),
 ('that', 797516),
 ('a', 773522),
 ('is', 758050),
 ('for', 534242),
 ('we', 522849)]

In [32]:
# A.3.2 Verify that your results are reasonable.
result_sv.takeOrdered(10, key=lambda x: -x[1])

[('att', 1706293),
 ('och', 1344830),
 ('i', 1050774),
 ('det', 924866),
 ('som', 913276),
 ('för', 908680),
 ('av', 738068),
 ('är', 694381),
 ('en', 620310),
 ('vi', 539797)]

In [33]:
# A.4
en = words_en.map(lambda x: (x[0], [ x[1] ]))\
                    .combineByKey(lambda value: (value, 1),
                            lambda x, value: (x[0] + value, x[1] + 1),
                            lambda x, y: (x[0] + y[0], x[1] + y[1]),
                           )


sv = words_sv.map(lambda x: (x[0], [ x[1] ]))\
                    .combineByKey(lambda value: (value, 1),
                            lambda x, value: (x[0] + value, x[1] + 1),
                            lambda x, y: (x[0] + y[0], x[1] + y[1]),
                           )

In [38]:
# A.4
ready_en = en.sortBy(lambda x: x[0]).map(lambda x: (x[1])).zipWithIndex().map(lambda x: (x[1],x[0]))
ready_sv = sv.sortBy(lambda x: x[0]).map(lambda x: (x[1])).zipWithIndex().map(lambda x: (x[1],x[0]))

joined = ready_en.join(ready_sv)

In [37]:
# A.4
joined.filter(lambda x: x[1][1][1] != 0)\
      .filter(lambda x: x[1][0][1] < 10 and x[1][1][1] < 10)\
      .filter(lambda x: x[1][0][1] == x[1][1][1])\
      .map(lambda x: list( zip((x[1][0][0]),(x[1][1][0])) ))\
      .flatMap(lambda x: x)\
      .map(lambda x: (x, 1))\
      .reduceByKey(add)\
      .takeOrdered(10, key=lambda x: -x[1])

[(('is', 'är'), 10040),
 (('we', 'vi'), 5530),
 (('i', 'jag'), 5020),
 (('this', 'detta'), 3252),
 (('closed.', 'avslutad.'), 2964),
 (('and', 'och'), 2917),
 (('a', 'en'), 2888),
 (('it', 'det'), 2866),
 (('that', 'det'), 2806),
 (('not', 'inte'), 2650)]

In [39]:
spark_session.stop()