### Assignment 3, Naeim Rashidfarokhi, Data Engineering I 2022

#### Part A - Working with the RDD API

In [11]:
# Importing needed libraries to work with Spark through Python API
from pyspark.sql import SparkSession
from pyspark import SparkConf

In [12]:
# This boolean is to run the code easier both locally or on server
server_mode = True

In [13]:
if server_mode:
    #New API
    spark_session = SparkSession\
            .builder\
            .master("spark://192.168.2.119:7077") \
            .appName("Naeim_Rashidfarokhi_A3_A")\
            .config("spark.dynamicAllocation.enabled", True)\
            .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
            .config("spark.shuffle.service.enabled", False)\
            .config("spark.dynamicAllocation.executorIdleTimeout","300s")\
            .config("spark.executor.cores",4)\
            .config("spark.driver.port",9998)\
            .config("spark.blockManager.port",10005)\
            .getOrCreate()

    # Old API (RDD)
    spark_context = spark_session.sparkContext
    # spark_context.setLogLevel("INFO")
    spark_context.setLogLevel("ERROR")

else:
    # local version, deactivate later!
    spark_session = SparkSession.builder.appName('A3_A').getOrCreate()

In [14]:
spark_session

In [15]:
# To access RDD, we need an object of sparkContext (work with the old API)
sc = spark_session.sparkContext

## Question A.1
#### A.1.1 & A.1.2 Read the English and Swedish transcripts with Spark, and count the number of lines.

In [16]:
if server_mode:
    rdd_EN = sc.textFile('hdfs://192.168.2.119:9000/europarl/europarl-v7.sv-en.en')
    rdd_SW = sc.textFile('hdfs://192.168.2.119:9000/europarl/europarl-v7.sv-en.sv')  
else:
    # to read data with the old API
    rdd_EN = sc.textFile("/home/naeim/Desktop/DE1/europarl-v7.sv-en.en")
    rdd_SW = sc.textFile("/home/naeim/Desktop/DE1/europarl-v7.sv-en.sv")

type(rdd_EN), type(rdd_SW)

(pyspark.rdd.RDD, pyspark.rdd.RDD)

In [17]:
# to get some insight about loaded text
rdd_EN.take(2)

                                                                                

['Resumption of the session',
 'I declare resumed the session of the European Parliament adjourned on Friday 17 December 1999, and I would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period.']

In [18]:
rdd_SW.take(2)

['Återupptagande av sessionen',
 'Jag förklarar Europaparlamentets session återupptagen efter avbrottet den 17 december. Jag vill på nytt önska er ett gott nytt år och jag hoppas att ni haft en trevlig semester.']

In [19]:
nbr_lines_EN = rdd_EN.count()
nbr_lines_SW = rdd_SW.count()
print(f"The number of rows in Enlgish file:{nbr_lines_EN}")
print(f"The number of rows in Swedish file:{nbr_lines_SW}")



The number of rows in Enlgish file:1862234
The number of rows in Swedish file:1862234


                                                                                

#### A.1.3 Verify that the line counts are the same for the two languages.

In [20]:
def check_line_counts(a, b):
    if a == b:
        print("Similar line counts in both documents!")
    else:
        print("Different line counts in both documents!")

check_line_counts(nbr_lines_EN, nbr_lines_SW)

Similar line counts in both documents!


#### A.1.4 Count the number of partitions.

In [21]:
rdd_EN.getNumPartitions(), rdd_SW.getNumPartitions()

(2, 3)

## Question A.2
#### A.2.1 Pre-process the text from both RDDs by doing the following

In [22]:
import re # to keep only alpha-numeric characters
def lower_the_tokens(lines):
    words = lines.map(lambda line: \
        re.sub("[^åäöÅÄÖa-zA-Z0-9\s]+","", line).lower().split(' '))
    return words

In [23]:
rdd_EN_pre = lower_the_tokens(rdd_EN)
rdd_SW_pre = lower_the_tokens(rdd_SW)
type(rdd_EN_pre), type(rdd_SW_pre)

(pyspark.rdd.PipelinedRDD, pyspark.rdd.PipelinedRDD)

#### A.2.2 Inspect 10 entries from each of your RDDs to verify your pre-processing.

In [24]:
print(rdd_EN_pre.take(10), rdd_SW_pre.take(10))

[['resumption', 'of', 'the', 'session'], ['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period'], ['although', 'as', 'you', 'will', 'have', 'seen', 'the', 'dreaded', 'millennium', 'bug', 'failed', 'to', 'materialise', 'still', 'the', 'people', 'in', 'a', 'number', 'of', 'countries', 'suffered', 'a', 'series', 'of', 'natural', 'disasters', 'that', 'truly', 'were', 'dreadful'], ['you', 'have', 'requested', 'a', 'debate', 'on', 'this', 'subject', 'in', 'the', 'course', 'of', 'the', 'next', 'few', 'days', 'during', 'this', 'partsession'], ['in', 'the', 'meantime', 'i', 'should', 'like', 'to', 'observe', 'a', 'minute', 's', 'silence', 'as', 'a', 'number', 'of', 'members', 'have', 'requested', 'on', 'behalf', 'of', 'all', 'the', 'v

#### A.2.3 Verify that the line counts still match after the pre-processing.

In [25]:
nbr_lines_EN_pre = rdd_EN_pre.count()
nbr_lines_SW_pre = rdd_SW_pre.count()
print(f"The number of rows in Enlgish file:{nbr_lines_EN_pre}")
print(f"The number of rows in Swedish file:{nbr_lines_SW_pre}")
check_line_counts(nbr_lines_EN_pre, nbr_lines_SW_pre)



The number of rows in Enlgish file:1862234
The number of rows in Swedish file:1862234
Similar line counts in both documents!


                                                                                

## Question 1.A.3
#### A.3.1 Use Spark to compute the 10 most frequently according words in the English language corpus. Repeat for the other language.

In [26]:
def get_frequent_words(lines, counts=True, nbr=10):
    
    # mapping the words in pairs
    words_flatten =  lines.flatMap(lambda word: word)
    words_pairs =  words_flatten.map(lambda x: (x,1))
    
    # reducing pairs
    if counts:
        #return RDD of word counts
        word_counts = words_pairs.reduceByKey(lambda a, b : a + b)
        # find the highest frequencies
        word_counts = word_counts.map(lambda x: (x[1], x[0])).sortByKey(False)
        print(word_counts.take(nbr))
        
    else: 
        # countByKey() returns dictionary
        word_counts = words_pairs.countByKey()
        word_counts = sorted(word_counts, key = word_counts.get, reverse = True)
        print(word_counts[:nbr])
        
    return word_counts

In [27]:
rdd_EN_pairs =  get_frequent_words(rdd_EN_pre, counts=True, nbr=10)
rdd_SW_pairs =  get_frequent_words(rdd_SW_pre, counts=True, nbr=10)

                                                                                

[(3505085, 'the'), (1662002, 'of'), (1543739, 'to'), (1318342, 'and'), (1088891, 'in'), (839072, 'that'), (774941, 'is'), (774540, 'a'), (538191, 'for'), (526480, 'we')]


                                                                                

[(1709936, 'att'), (1350369, 'och'), (1054249, 'i'), (952985, 'det'), (917580, 'som'), (915079, 'för'), (740724, 'av'), (701840, 'är'), (636827, 'en'), (546068, 'vi')]


#### A.3.2 Verify that your results are reasonable.

In [28]:
# looking at the results above, the conuts do not exatly match word by word
# but comparing the most 10 frequent words in both lists, they are reasonable!
words_list_EN =  get_frequent_words(rdd_EN_pre, counts=False, nbr=10)
words_list_SW =  get_frequent_words(rdd_SW_pre, counts=False, nbr=10)

                                                                                

['the', 'of', 'to', 'and', 'in', 'that', 'is', 'a', 'for', 'we']


                                                                                

['att', 'och', 'i', 'det', 'som', 'för', 'av', 'är', 'en', 'vi']


In [29]:
print(f"The total number of unique words in English corpus:{len(words_list_EN)}")
print(f"The total number of unique words in Swedish corpus:{len(words_list_SW)}")

The total number of unique words in English corpus:127332
The total number of unique words in Swedish corpus:343473


## Question A.4
#### A.4.1 Use this parallel corpus to mine some translations in the form of word pairs, for the two languages. Do this by pairing words found on short lines with the same number of words respectively. We (incorrectly) assume the words stay in the same order when translated.

##### Step1: Key the lines by their line number (hint: ZipWithIndex())

In [30]:
en1 = rdd_EN_pre.coalesce(1).zipWithIndex()
sv1 = rdd_SW_pre.coalesce(1).zipWithIndex()
# en1 = rdd_EN_pre.repartition(9).zipWithIndex()
# sv1 = rdd_SW_pre.repartition(9).zipWithIndex()

In [31]:
print(en1.take(3), sv1.take(3))

[(['resumption', 'of', 'the', 'session'], 0), (['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period'], 1), (['although', 'as', 'you', 'will', 'have', 'seen', 'the', 'dreaded', 'millennium', 'bug', 'failed', 'to', 'materialise', 'still', 'the', 'people', 'in', 'a', 'number', 'of', 'countries', 'suffered', 'a', 'series', 'of', 'natural', 'disasters', 'that', 'truly', 'were', 'dreadful'], 2)] [(['återupptagande', 'av', 'sessionen'], 0), (['jag', 'förklarar', 'europaparlamentets', 'session', 'återupptagen', 'efter', 'avbrottet', 'den', '17', 'december', 'jag', 'vill', 'på', 'nytt', 'önska', 'er', 'ett', 'gott', 'nytt', 'år', 'och', 'jag', 'hoppas', 'att', 'ni', 'haft', 'en', 'trevlig', 'semester'], 1), (['som', 'ni', 'kunnat', '

##### Step2: Swap the key and value - so that the line number is the key

In [32]:
en2 = en1.map(lambda x: (x[1], x[0]))
sv2 = sv1.map(lambda x: (x[1], x[0]))

In [33]:
print(en2.take(3), sv2.take(3))

[(0, ['resumption', 'of', 'the', 'session']), (1, ['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period']), (2, ['although', 'as', 'you', 'will', 'have', 'seen', 'the', 'dreaded', 'millennium', 'bug', 'failed', 'to', 'materialise', 'still', 'the', 'people', 'in', 'a', 'number', 'of', 'countries', 'suffered', 'a', 'series', 'of', 'natural', 'disasters', 'that', 'truly', 'were', 'dreadful'])] [(0, ['återupptagande', 'av', 'sessionen']), (1, ['jag', 'förklarar', 'europaparlamentets', 'session', 'återupptagen', 'efter', 'avbrottet', 'den', '17', 'december', 'jag', 'vill', 'på', 'nytt', 'önska', 'er', 'ett', 'gott', 'nytt', 'år', 'och', 'jag', 'hoppas', 'att', 'ni', 'haft', 'en', 'trevlig', 'semester']), (2, ['som', 'ni', 'kunnat'

##### Step3: Join the two RDDs together according to the line number key, so you have pairs of matching lines

In [34]:
# read the difference of join and union here:
# https://stackoverflow.com/questions/34249247/difference-between-join-and-union-followed-by-groupbykey-in-spark
en_sv3 = en2.union(sv2)
en_sv3 = en_sv3.groupByKey().mapValues(list)

In [35]:
print(en_sv3.take(3))

[Stage 34:>                                                         (0 + 1) / 1]

[(996, [['i', 'would', 'emphasise', 'that', 'this', 'is', 'going', 'to', 'be', 'extremely', 'important', 'for', 'our', 'future', 'for', 'europe', 's', 'economy', 'and', 'above', 'all', 'for', 'the', 'protection', 'of', 'our', 'wealth', 'and', 'technological', 'development', 'in', 'europe', 'in', 'general'], ['jag', 'hävdar', 'bestämt', 'att', 'detta', 'kommer', 'att', 'vara', 'av', 'yttersta', 'vikt', 'för', 'vår', 'framtid', 'för', 'den', 'europeiska', 'ekonomin', 'och', 'framför', 'allt', 'för', 'att', 'skydda', 'vårt', 'välstånd', 'och', 'den', 'allmänna', 'tekniska', 'utvecklingen', 'i', 'europa']]), (2160, [['i', 'have', 'referred', 'to', 'a', 'series', 'of', 'issues', 'which', 'i', 'will', 'lay', 'out', 'in', 'more', 'detail', 'if', 'you', 'wish', 'in', 'an', 'appearance', 'before', 'a', 'special', 'committee', 'on', 'this', 'matter', 'or', 'when', 'i', 'present', 'the', 'actual', 'initiatives', 'in', 'the', 'coming', 'months'], ['mina', 'damer', 'och', 'herrar', 'jag', 'har', 't

                                                                                

##### Step4: Filter to exclude line pairs that have an empty/missing “corresponding” sentence

In [36]:
en_sv4 = en_sv3.filter(lambda x: (len(x[1][0])!=0) and (x[1][0]!=''))

In [37]:
print(en_sv4.take(1))

[Stage 36:>                                                         (0 + 1) / 1]

[(140, [['the', 'report', 'looks', 'at', 'the', 'issue', 'of', 'harmonising', 'the', 'examination', 'requirements', 'for', 'safety', 'advisors', 'working', 'in', 'the', 'areas', 'of', 'transportation', 'of', 'dangerous', 'goods', 'by', 'road', 'rail', 'and', 'inland', 'waterway'], ['han', 'behandlar', 'frågan', 'om', 'harmonisering', 'av', 'examineringskraven', 'för', 'säkerhetsrådgivare', 'för', 'transport', 'av', 'farligt', 'gods', 'på', 'väg', 'järnväg', 'och', 'inre', 'vattenvägar']])]


                                                                                

##### Step 5: Filter to leave only pairs of sentences with a small number of words per sentence, this should give a more reliable translation (you can experiment).

In [38]:
limit = 10
en_sv5 = en_sv4.filter(lambda x: (len(x[1][0]) <= limit) or (len(x[1][1]) <= limit))

In [39]:
print(en_sv5.take(1))

[Stage 38:>                                                         (0 + 1) / 1]

[(4412, [['mr', 'president', 'i', 'simply', 'wanted', 'to', 'pass', 'on', 'some', 'news'], ['herr', 'talman']])]


                                                                                

##### Step 6: Filter to leave only pairs of sentences with the same number of words in each sentence.

In [40]:
en_sv6 = en_sv5.filter(lambda x: len(x[1][0]) == len(x[1][1]))

In [41]:
print(en_sv6.take(1))

[Stage 40:>                                                         (0 + 1) / 1]

[(6298, [['the', 'common', 'position', 'is', 'a', 'reasonable', 'compromise'], ['den', 'gemensamma', 'ståndpunkten', 'är', 'en', 'rimlig', 'kompromiss']])]


                                                                                

##### Step 7: For each sentence pair, map so that you pair each (in order) word in the two sentences. We no longer need the line numbers. (hint: use python’s built in zip() function)

In [42]:
en_sv7 = en_sv6.map(lambda x: list(zip(*x[1]))).flatMap(lambda l: l)

In [43]:
print(en_sv7.take(20))

[Stage 42:>                                                         (0 + 1) / 1]

[('that', 'det'), ('is', 'är'), ('a', 'en'), ('disgraceful', 'skymflig'), ('attitude', 'inställning'), ('it', 'det'), ('is', 'är'), ('not', 'inte'), ('a', 'ett'), ('directive', 'direktiv'), ('or', 'eller'), ('a', 'en'), ('regulation', 'förordning'), ('', ''), ('it', 'i'), ('defines', 'förslaget'), ('tougher', 'fastställs'), ('enforcement', 'hårdare'), ('measures', 'motåtgärder'), ('the', 'omröstningen')]


                                                                                

##### Step 8: Use reduce to count the number of occurrences of the word-translation-pairs.

In [44]:
en_sv8 = en_sv7.map(lambda x: (x,1))
en_sv8 = en_sv8.reduceByKey(lambda a, b : a + b)
en_sv8 = en_sv8.map(lambda x: (x[1], x[0])).sortByKey(False)

                                                                                

##### Step 9: Print some of the most frequently occurring pairs of words

In [45]:
print(en_sv8.take(50))

[(11343, ('is', 'är')), (6612, ('we', 'vi')), (6143, ('i', 'jag')), (5769, ('', '')), (4498, ('this', 'detta')), (3941, ('and', 'och')), (3707, ('it', 'det')), (3500, ('a', 'en')), (3469, ('that', 'det')), (3314, ('applause', 'applåder')), (3273, ('not', 'inte')), (2972, ('closed', 'avslutad')), (2531, ('in', 'i')), (2350, ('have', 'har')), (2246, ('a', 'ett')), (2218, ('will', 'att')), (2113, ('are', 'är')), (2033, ('this', 'det')), (2009, ('the', 'omröstningen')), (1970, ('for', 'för')), (1955, ('vote', 'kommer')), (1948, ('there', 'det')), (1896, ('the', 'den')), (1859, ('to', 'att')), (1852, ('place', 'rum')), (1821, ('take', 'äga')), (1721, ('\xa0\xa0', '\xa0\xa0')), (1686, ('question', 'fråga')), (1644, ('of', 'av')), (1564, ('the', 'det')), (1552, ('must', 'måste')), (1517, ('very', 'mycket')), (1449, ('that', 'detta')), (1373, ('the', 'jag')), (1334, ('has', 'har')), (1331, ('is', 'debatten')), (1327, ('no', 'nr')), (1320, ('debate', 'förklarar')), (1317, ('that', 'att')), (127

In [46]:
sc.stop()
print("Spark closed!")

Spark closed!
