In [24]:
# libraries to be imported
from pyspark.sql import SparkSession
from collections import Counter 

In [2]:
# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.87:7077")\
        .appName("Uvais_Section_A_Session")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()
        
# Old API (RDD)
spark_context = spark_session.sparkContext

### ---------------- Question A.1 -----------------

In [3]:
# Question A1.1
# reading English file from hadoop
rdd1 = spark_context.newAPIHadoopFile(
    'hdfs://192.168.2.87:9000/europarl/europarl-v7.sv-en.en',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text'
)

print("Example lines to check language: "+str(rdd1.take(2)))
print("Number of lines in europarl-v7.sv-en.en: "+str(rdd1.count()))

Example lines to check language: [(0, 'Resumption of the session'), (26, 'I declare resumed the session of the European Parliament adjourned on Friday 17 December 1999, and I would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period.')]
Number of lines in europarl-v7.sv-en.en: 1862234


In [4]:
# Question A1.2
# reading Swedish file from hadoop
rdd2 = spark_context.newAPIHadoopFile(
    'hdfs://192.168.2.87:9000/europarl/europarl-v7.sv-en.sv',
    'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'org.apache.hadoop.io.Text'
)

print("Example lines to check language: "+str(rdd2.take(2)))
print("Number of lines in europarl-v7.sv-en.sv: "+str(rdd2.count()))

Example lines to check language: [(0, 'Återupptagande av sessionen'), (29, 'Jag förklarar Europaparlamentets session återupptagen efter avbrottet den 17 december. Jag vill på nytt önska er ett gott nytt år och jag hoppas att ni haft en trevlig semester.')]
Number of lines in europarl-v7.sv-en.sv: 1862234


In [5]:
# Question A.1.3
# Checking the number of lines are same
print("Check if both documents  have same number of line: "+str(rdd1.count()==rdd2.count()))

Check if both documents  have same number of line: True


In [6]:
# Question A.1.4
# Checking the number of partitions in RDD's
print("No of partions present in RDD1: "+str(rdd1.getNumPartitions()))
print("No of partions present in RDD2: "+str(rdd2.getNumPartitions()))

No of partions present in RDD1: 2
No of partions present in RDD2: 3


### ---------------- Question A.2 -----------------

In [7]:
# Question A2.1
# converting to lowercase and tokenizing
def lowerandtokenize(rdd):
    rdd_tokenized = rdd.map(lambda eachtuple: eachtuple[1].lower())\
            .map(lambda eachSentence: eachSentence.split(' '))
    return rdd_tokenized
rdd1_Tokenized = lowerandtokenize(rdd1)
rdd2_Tokenized = lowerandtokenize(rdd2)
# print the output from function
print("Sample of RDD1: "+str(rdd1_Tokenized.take(2)))
print("\nSample of RDD2: "+str(rdd2_Tokenized.take(2)))

Sample of RDD1: [['resumption', 'of', 'the', 'session'], ['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999,', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period.']]

Sample of RDD2: [['återupptagande', 'av', 'sessionen'], ['jag', 'förklarar', 'europaparlamentets', 'session', 'återupptagen', 'efter', 'avbrottet', 'den', '17', 'december.', 'jag', 'vill', 'på', 'nytt', 'önska', 'er', 'ett', 'gott', 'nytt', 'år', 'och', 'jag', 'hoppas', 'att', 'ni', 'haft', 'en', 'trevlig', 'semester.']]


In [8]:
# Question A2.2
# converting to lowercase and tokenizing
print("10 RDD1 Sentence After Tokenization: \n"+\
      str(rdd1_Tokenized.take(10)))
print("\n10 RDD2 Sentence After Tokenization: \n"+\
      str(rdd2_Tokenized.take(10)))

10 RDD1 Sentence After Tokenization: 
[['resumption', 'of', 'the', 'session'], ['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999,', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period.'], ['although,', 'as', 'you', 'will', 'have', 'seen,', 'the', 'dreaded', "'millennium", "bug'", 'failed', 'to', 'materialise,', 'still', 'the', 'people', 'in', 'a', 'number', 'of', 'countries', 'suffered', 'a', 'series', 'of', 'natural', 'disasters', 'that', 'truly', 'were', 'dreadful.'], ['you', 'have', 'requested', 'a', 'debate', 'on', 'this', 'subject', 'in', 'the', 'course', 'of', 'the', 'next', 'few', 'days,', 'during', 'this', 'part-session.'], ['in', 'the', 'meantime,', 'i', 'should', 'like', 'to', 'observe', 'a', "minute'", 's', 'silence,', 'as', 'a', 'number', 'of', 'members', 'have',

In [9]:
# Question A2.3
# counting sentence in Each RDD
print("Sentence in RDD1: "+str(rdd1_Tokenized.count()))
print("Sentence in RDD2: "+str(rdd2_Tokenized.count()))

Sentence in RDD1: 1862234
Sentence in RDD2: 1862234


### ---------------- Question A.3 -----------------

In [10]:
# Question A3.1
# 1. converting to lowercase and tokenizing to words
# 2. mapping to key, value pair
# 3. reducing to count the number of word occurance
# Finally list of no of times each word occur
# English Doc
rdd1_Word_Frequency = rdd1.map(lambda eachtuple: eachtuple[1].lower())\
        .flatMap(lambda eachSentence: (eachSentence.split(' ')))\
        .map(lambda eachWord: (eachWord,1))\
        .reduceByKey(lambda value1,value2: value1+value2)\
        .sortBy(lambda Pair: Pair[1],False)

# top 10 Frequently occuring word 
rdd1_Top_10_words = rdd1_Word_Frequency.take(10)
rdd1_Top_10_words

[('the', 3498375),
 ('of', 1659758),
 ('to', 1539760),
 ('and', 1288401),
 ('in', 1085993),
 ('that', 797516),
 ('a', 773522),
 ('is', 758050),
 ('for', 534242),
 ('we', 522849)]

In [11]:
# Swedish Doc
rdd2_Word_Frequency = rdd2.map(lambda eachtuple: eachtuple[1].lower())\
        .flatMap(lambda eachSentence: (eachSentence.split(' ')))\
        .map(lambda eachWord: (eachWord,1))\
        .reduceByKey(lambda value1,value2: value1+value2)\
        .sortBy(lambda Pair: Pair[1],False)

# top 10 Frequently occuring word 
rdd2_Top_10_words = rdd2_Word_Frequency.take(10)
rdd2_Top_10_words

[('att', 1706293),
 ('och', 1344830),
 ('i', 1050774),
 ('det', 924866),
 ('som', 913276),
 ('för', 908680),
 ('av', 738068),
 ('är', 694381),
 ('en', 620310),
 ('vi', 539797)]

In [12]:
# Question A.3.2
# Checking whether the above count are reasonable
print("Total no of word in RDD1: "+str(rdd1_Word_Frequency.values().sum()))
print("Average no of word in RDD1: "+str(rdd1_Word_Frequency.values().sum()/rdd1_Word_Frequency.count()))
print("\nTotal no of word in RDD2: "+str(rdd2_Word_Frequency.values().sum()))
print("Average no of word in RDD2: "+str(rdd2_Word_Frequency.values().sum()/rdd1_Word_Frequency.count()))

# the most occuring words should be verbs...
# ...like the, a, is which are always frequently...
# ...used to build sentences.
# even in my answers :)

Total no of word in RDD1: 45778381
Average no of word in RDD1: 166.48802938555815

Total no of word in RDD2: 41604741
Average no of word in RDD2: 151.3092248104304


### ---------------- Question A.4 -----------------

In [13]:
# Question A.4.1.1
# Assigning index to line list
en_1 = rdd1_Tokenized.zipWithIndex()
sv_1 = rdd2_Tokenized.zipWithIndex()

print("rdd1 after adding index: "+str(en_1.take(2)))
print("rdd2 after adding index: "+str(sv_1.take(2)))

rdd1 after adding index: [(['resumption', 'of', 'the', 'session'], 0), (['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999,', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period.'], 1)]
rdd2 after adding index: [(['återupptagande', 'av', 'sessionen'], 0), (['jag', 'förklarar', 'europaparlamentets', 'session', 'återupptagen', 'efter', 'avbrottet', 'den', '17', 'december.', 'jag', 'vill', 'på', 'nytt', 'önska', 'er', 'ett', 'gott', 'nytt', 'år', 'och', 'jag', 'hoppas', 'att', 'ni', 'haft', 'en', 'trevlig', 'semester.'], 1)]


In [14]:
# Question A.4.1.2
# Swapping Keys and Values
en_2 = en_1.map(lambda tuple1: (tuple1[1], tuple1[0]))
sv_2 = sv_1.map(lambda tuple1: (tuple1[1], tuple1[0]))

print("rdd1 after adding index: "+str(en_2.take(2)))
print("rdd2 after adding index: "+str(sv_2.take(2)))

rdd1 after adding index: [(0, ['resumption', 'of', 'the', 'session']), (1, ['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999,', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period.'])]
rdd2 after adding index: [(0, ['återupptagande', 'av', 'sessionen']), (1, ['jag', 'förklarar', 'europaparlamentets', 'session', 'återupptagen', 'efter', 'avbrottet', 'den', '17', 'december.', 'jag', 'vill', 'på', 'nytt', 'önska', 'er', 'ett', 'gott', 'nytt', 'år', 'och', 'jag', 'hoppas', 'att', 'ni', 'haft', 'en', 'trevlig', 'semester.'])]


In [15]:
# Question A.4.1.3
# Joining rdd based on key 
en_sv_3 = en_2.join(sv_2)

print("rdd after joining: "+str(en_sv_3.take(2)))

rdd after joining: [(83890, (['the', 'saddest', 'case', 'of', 'all', 'is', 'a', 'school', 'leaver', 'with', 'no', 'qualifications', 'to', 'his', 'credit', 'descending', 'into', 'hopelessness.'], ['det', 'sorgligaste', 'fallet', 'av', 'dem', 'alla', 'är', 'en', 'avgångselev', 'utan', 'några', 'erhållna', 'kvalifikationer', 'som', 'sjunker', 'ned', 'i', 'hopplöshet.'])), (94660, (['while', 'we', 'welcome', 'the', 'adoption', 'of', 'the', 'legislation', 'regarding', 'the', 'khmer', 'rouge', 'tribunal,', 'both', 'by', 'the', 'national', 'assembly', 'and', 'by', 'the', 'senate,', 'it', 'should', 'be', 'borne', 'in', 'mind', 'that', 'we', 'are', 'still', 'awaiting', 'the', 'official', 'statement', 'from', 'the', 'united', 'nations', 'on', 'the', 'draft', 'text.'], ['vi', 'välkomnar', 'antagandet', 'av', 'lagstiftningen', 'om', 'röda', 'khmertribunalen,', 'både', 'i', 'nationalförsamlingen', 'och', 'i', 'senaten.', 'men', 'vi', 'bör', 'komma', 'ihåg', 'att', 'vi', 'fortfarande', 'väntar', 'på

In [16]:
# Question A.4.1.4
# Filter empty or null sentence
en_sv_4 = en_sv_3.filter(lambda x: x[1][0] is not None)\
        .filter(lambda x: x[1][1] is not None)\
        .filter(lambda x: x[1][0] is not '')\
        .filter(lambda x: x[1][1] is not '')
        
print("rdd after removing empty and null: "+str(en_sv_4.take(2)))

rdd after removing empty and null: [(83890, (['the', 'saddest', 'case', 'of', 'all', 'is', 'a', 'school', 'leaver', 'with', 'no', 'qualifications', 'to', 'his', 'credit', 'descending', 'into', 'hopelessness.'], ['det', 'sorgligaste', 'fallet', 'av', 'dem', 'alla', 'är', 'en', 'avgångselev', 'utan', 'några', 'erhållna', 'kvalifikationer', 'som', 'sjunker', 'ned', 'i', 'hopplöshet.'])), (94660, (['while', 'we', 'welcome', 'the', 'adoption', 'of', 'the', 'legislation', 'regarding', 'the', 'khmer', 'rouge', 'tribunal,', 'both', 'by', 'the', 'national', 'assembly', 'and', 'by', 'the', 'senate,', 'it', 'should', 'be', 'borne', 'in', 'mind', 'that', 'we', 'are', 'still', 'awaiting', 'the', 'official', 'statement', 'from', 'the', 'united', 'nations', 'on', 'the', 'draft', 'text.'], ['vi', 'välkomnar', 'antagandet', 'av', 'lagstiftningen', 'om', 'röda', 'khmertribunalen,', 'både', 'i', 'nationalförsamlingen', 'och', 'i', 'senaten.', 'men', 'vi', 'bör', 'komma', 'ihåg', 'att', 'vi', 'fortfarande

In [17]:
# Question A.4.1.5
# Filter sentence with more than 10 words
en_sv_5 = en_sv_4.filter(lambda x: len(x[1][0]) < 10 and len(x[1][1]) < 10 )
print("rdd after filtering sentence with less than 6 words: "+str(en_sv_5.take(10)))

rdd after filtering sentence with less than 6 words: [(292930, (['my', 'next', 'question', 'concerns', 'the', 'food', 'safety', 'authority.'], ['nästa', 'fråga', 'gäller', 'myndigheten', 'för', 'livsmedelssäkerhet.'])), (402305, (['this', 'can', 'lead', 'to', 'people', 'being', 'deported.'], ['det', 'kan', 'innebära', 'att', 'folk', 'utvisas.'])), (421220, (['this', 'evidence', 'should', 'involve', 'fishermen', 'as', 'well', 'as', 'scientists.'], ['såväl', 'yrkesfiskare', 'som', 'forskare', 'bör', 'delta', 'i', 'denna', 'kartläggning.'])), (528870, (['\xa0\xa0', 'mr\xa0president,', 'there', 'is', 'some', 'confusion', 'here.'], ['\xa0\xa0', '–', 'herr', 'talman!', 'det', 'råder', 'viss', 'förvirring', 'här.'])), (69965, (['question', 'no', '33', 'by', '(h-0573/00):'], ['fråga', 'nr', '33', 'från', '(h-0573/00):'])), (389975, (['this', 'is', 'a', 'false', 'distinction.'], ['detta', 'är', 'en', 'falsk', 'skiljelinje.'])), (429180, (['clearly,', 'no', 'country', 'or', 'sector', 'is', 'risk

In [18]:
# Question A.4.1.6
# Filter sentence with equal no of words
en_sv_6 = en_sv_5.filter(lambda x: len(x[1][0]) == len(x[1][1]))
print("rdd after filtering pair wit equal no of words: "+str(en_sv_6.take(10)))

rdd after filtering pair wit equal no of words: [(421220, (['this', 'evidence', 'should', 'involve', 'fishermen', 'as', 'well', 'as', 'scientists.'], ['såväl', 'yrkesfiskare', 'som', 'forskare', 'bör', 'delta', 'i', 'denna', 'kartläggning.'])), (69965, (['question', 'no', '33', 'by', '(h-0573/00):'], ['fråga', 'nr', '33', 'från', '(h-0573/00):'])), (389975, (['this', 'is', 'a', 'false', 'distinction.'], ['detta', 'är', 'en', 'falsk', 'skiljelinje.'])), (429180, (['clearly,', 'no', 'country', 'or', 'sector', 'is', 'risk-free.'], ['uppenbarligen', 'är', 'inget', 'land', 'eller', 'område', 'riskfritt.'])), (724035, (['secondly,', 'the', 'situation', 'calls', 'for', 'greater', 'transparency.'], ['för', 'det', 'andra', 'kräver', 'situationen', 'större', 'öppenhet.'])), (357520, (['our', 'committee', 'can', 'make', 'proposals', 'but', 'not', 'set', 'parameters.'], ['utskottet', 'kan', 'endast', 'komma', 'med', 'förslag,', 'inte', 'ge', 'instruktioner.'])), (672750, (['we', 'have', 'no', 'alt

In [19]:
# Question A.4.1.7
# Mapping to generate a list of translation
en_sv_7 = en_sv_6.map(lambda pair: list(zip(pair[1][0],pair[1][1])) )
en_sv_7.take(20)

[[('this', 'såväl'),
  ('evidence', 'yrkesfiskare'),
  ('should', 'som'),
  ('involve', 'forskare'),
  ('fishermen', 'bör'),
  ('as', 'delta'),
  ('well', 'i'),
  ('as', 'denna'),
  ('scientists.', 'kartläggning.')],
 [('question', 'fråga'),
  ('no', 'nr'),
  ('33', '33'),
  ('by', 'från'),
  ('(h-0573/00):', '(h-0573/00):')],
 [('this', 'detta'),
  ('is', 'är'),
  ('a', 'en'),
  ('false', 'falsk'),
  ('distinction.', 'skiljelinje.')],
 [('clearly,', 'uppenbarligen'),
  ('no', 'är'),
  ('country', 'inget'),
  ('or', 'land'),
  ('sector', 'eller'),
  ('is', 'område'),
  ('risk-free.', 'riskfritt.')],
 [('secondly,', 'för'),
  ('the', 'det'),
  ('situation', 'andra'),
  ('calls', 'kräver'),
  ('for', 'situationen'),
  ('greater', 'större'),
  ('transparency.', 'öppenhet.')],
 [('our', 'utskottet'),
  ('committee', 'kan'),
  ('can', 'endast'),
  ('make', 'komma'),
  ('proposals', 'med'),
  ('but', 'förslag,'),
  ('not', 'inte'),
  ('set', 'ge'),
  ('parameters.', 'instruktioner.')],
 [('w

In [20]:
# Question A.4.1.8
# Generate frequency for the pair
en_sv_8 = en_sv_7.countByKey()
en_sv_8

defaultdict(int,
            {('exactly', 'i'): 1,
             ('the', 'jag'): 1344,
             ('no.', 'nej.'): 112,
             ('i', 'därför'): 94,
             ('\xa0\xa0', '\xa0\xa0'): 1618,
             ('madam', 'fru'): 108,
             ('10.', '10.'): 128,
             ('this', 'det'): 1307,
             ('these', 'detta'): 107,
             ('i', 'tack'): 34,
             ('.', '.'): 1276,
             ('question', 'fråga'): 1291,
             ('', '.'): 2223,
             ('that', 'sådan'): 4,
             ('that', 'det'): 2357,
             ('you', 'ni'): 441,
             ('6.', '6.'): 180,
             ('this', 'detta'): 2077,
             ('we', 'vi'): 4171,
             ('president.', 'talmannen.'): 57,
             ('i', 'jag'): 4346,
             ('there', 'det'): 1145,
             ('that', 'detta'): 838,
             ('waste', 'avfall'): 2,
             ('nobody', 'ingen'): 33,
             ('mr', 'herr'): 423,
             ('that,', 'det'): 23,
             ('t

In [26]:
# Question A.4.1.9
# Printing top 20 frequently ocurring
print("Top 20 frquently occuring translation pair")
Counter(en_sv_8).most_common(20) 

Top 20 frquently occuring translation pair


[(('i', 'jag'), 4346),
 (('we', 'vi'), 4171),
 (('(applause)', '(applåder)'), 2548),
 (('that', 'det'), 2357),
 (('it', 'det'), 2313),
 (('', '.'), 2223),
 (('this', 'detta'), 2077),
 (('the', 'omröstningen'), 1737),
 (('\xa0\xa0', '\xa0\xa0'), 1618),
 (('the', 'jag'), 1344),
 (('this', 'det'), 1307),
 (('question', 'fråga'), 1291),
 (('.', '.'), 1276),
 (('the', 'debatten'), 1196),
 (('there', 'det'), 1145),
 (('written', 'skriftliga'), 847),
 (('that', 'detta'), 838),
 (('they', 'de'), 696),
 (('the', 'den'), 652),
 (('let', 'låt'), 632)]

In [27]:
# release the cores for another application!
spark_context.stop()