In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from operator import add

# New API
spark_session = SparkSession.builder\
        .master("spark://192.168.2.70:7077")\
        .appName("Jinglin_PartA")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout", "30s")\
        .config("spark.cores.max", 4)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

spark_context.setLogLevel("ERROR")

sc = SparkContext

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/22 11:58:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# QUESTION A.1 A1.1 Read the English transcripts, and count the number of lines
corpus_en = spark_context.textFile('hdfs://192.168.2.70:9000/europarl/europarl-v7.sv-en.en')
en_count = corpus_en.count()
print(f'There are {en_count} lines in the English transcripts in total.')

[Stage 0:>                                                          (0 + 2) / 2]

There are 1862234 lines in the English transcripts in total.


                                                                                

In [3]:
# QUESTION A.1 A1.2 Do the same with the other language
corpus_sv = spark_context.textFile('hdfs://192.168.2.70:9000/europarl/europarl-v7.sv-en.sv')
sv_count = corpus_sv.count()
print(f'There are {sv_count} lines in Swedish in total.')



There are 1862234 lines in Swedish in total.


                                                                                

In [4]:
# QUESTION A.1 A1.3 Verify that the line counts are the same for the two languages.
if en_count == sv_count:
    print("The line counts are the same for the two languages")
else:
    print("The line counts are different")

The line counts are the same for the two languages


In [5]:
# QUESTION A.1 A1.4 Count the number of partitions
partitions_en = corpus_en.getNumPartitions()
partitions_sv = corpus_sv.getNumPartitions()
print("The partitions of English version: ", partitions_en)
print("The partitions of Swedish version: ", partitions_sv)

The partitions of English version:  2
The partitions of Swedish version:  3


In [6]:
# QUESTION A.2 A2.1 Pre-process the text from both RDDs by doing the following:
# Lowercase the text
def to_lower(rdd):
    return rdd.map(lambda x: x.lower())
# rdd.map return a new RDD by applying a function to each element of this RDD
corpus_enlower = to_lower(corpus_en)
corpus_svlower = to_lower(corpus_sv)

In [7]:
# Tokenize the text(split on space)
def split(corpus):
    return corpus.map(lambda x: x.split(" "))

corpus_enlowsp = split(corpus_enlower)
corpus_svlowsp = split(corpus_svlower)

In [8]:
# A2.2 Inspect 10 entries from each of your RDDs to verify pre-processing
corpus_enlowsp.take(10)
corpus_svlowsp.take(10)

                                                                                

[['återupptagande', 'av', 'sessionen'],
 ['jag',
  'förklarar',
  'europaparlamentets',
  'session',
  'återupptagen',
  'efter',
  'avbrottet',
  'den',
  '17',
  'december.',
  'jag',
  'vill',
  'på',
  'nytt',
  'önska',
  'er',
  'ett',
  'gott',
  'nytt',
  'år',
  'och',
  'jag',
  'hoppas',
  'att',
  'ni',
  'haft',
  'en',
  'trevlig',
  'semester.'],
 ['som',
  'ni',
  'kunnat',
  'konstatera',
  'ägde',
  '"den',
  'stora',
  'år',
  '2000-buggen"',
  'aldrig',
  'rum.',
  'däremot',
  'har',
  'invånarna',
  'i',
  'ett',
  'antal',
  'av',
  'våra',
  'medlemsländer',
  'drabbats',
  'av',
  'naturkatastrofer',
  'som',
  'verkligen',
  'varit',
  'förskräckliga.'],
 ['ni',
  'har',
  'begärt',
  'en',
  'debatt',
  'i',
  'ämnet',
  'under',
  'sammanträdesperiodens',
  'kommande',
  'dagar.'],
 ['till',
  'dess',
  'vill',
  'jag',
  'att',
  'vi,',
  'som',
  'ett',
  'antal',
  'kolleger',
  'begärt,',
  'håller',
  'en',
  'tyst',
  'minut',
  'för',
  'offren',
  'f

In [9]:
# Verify that the line counts still match after the pre-processing
if corpus_enlowsp.count() == corpus_svlowsp.count():
    print("It seems ok.")
else:
    print("The lines are not match now.")



It seems ok.


                                                                                

In [10]:
# QUESTION A.3 A3.1 Use Spark to compute the 10 most frequently according words in the English language corpus.
corpus_enflat = corpus_enlowsp.flatMap(lambda x: x)
corpus_en_p = corpus_enflat.map(lambda x: (x,1))
# corpus_wk = corpus_en_p.groupByKey()
corpus_wordcount = corpus_en_p.reduceByKey(add)
corpus_sort = corpus_wordcount.map(lambda pair: (pair[1], pair[0]))
corpus_sort.sortByKey(False).take(10)

                                                                                

[(3498375, 'the'),
 (1659758, 'of'),
 (1539760, 'to'),
 (1288401, 'and'),
 (1085993, 'in'),
 (797516, 'that'),
 (773522, 'a'),
 (758050, 'is'),
 (534242, 'for'),
 (522849, 'we')]

In [11]:
# QUESTION A.3 A3.1 Repeat for swedish
corpus_svflat = corpus_svlowsp.flatMap(lambda x: x)
corpus_sv_p = corpus_svflat.map(lambda x: (x,1))
corpus_svwordcount = corpus_sv_p.reduceByKey(add)
corpus_svsort = corpus_svwordcount.map(lambda pair: (pair[1], pair[0]))
corpus_svsort.sortByKey(False).take(10)

                                                                                

[(1706293, 'att'),
 (1344830, 'och'),
 (1050774, 'i'),
 (924866, 'det'),
 (913276, 'som'),
 (908680, 'för'),
 (738068, 'av'),
 (694381, 'är'),
 (620310, 'en'),
 (539797, 'vi')]

In [12]:
# A3.2 Verify that results are reasonable.
# Based on the information from internet, these words are the common words. Seems reasonable.

In [13]:
corpus_en_zip = corpus_enlowsp.zipWithIndex()
corpus_sv_zip = corpus_svlowsp.zipWithIndex()
# swap the key and value so that the line number is the key.
en_sw = corpus_en_zip.map(lambda pair: (pair[1], pair[0]))
sv_sw = corpus_sv_zip.map(lambda pair: (pair[1], pair[0]))
# join the two RDDs together
corpus_join = en_sw.join(sv_sw)
corpus_join.take(1)
# filter_empty = corpus_join.filter(lambda pair: pair.isEmpty()==True)
# filter_empty.count()

                                                                                

[(4780,
  (['i',
    'want',
    'to',
    'know',
    'whether',
    'this',
    'also',
    'applies',
    'in',
    'the',
    'document',
    'which',
    'the',
    'commission',
    'has',
    'presented',
    'now.'],
   ['jag',
    'vill',
    'veta',
    'om',
    'detta',
    'också',
    'gäller',
    'i',
    'det',
    'dokument',
    'som',
    'kommissionen',
    'har',
    'presenterat',
    'nu.']))]

In [14]:
# filter the empty/missing pairs
filter_corpus = corpus_join.filter(lambda pair: len(pair[1][0])>0 and len(pair[1][1])>0)
filter_corpus.count()

                                                                                

1862234

In [15]:
# filter to leave only pairs of sentences with a small number of words in each sentence.
filter_short = filter_corpus.filter(lambda pair: len(pair[1][0])<10 or len(pair[1][1])<10)
filter_sameshort = filter_short.filter(lambda pair: len(pair[1][0])==len(pair[1][1]))
filter_sameshort.count()

                                                                                

76296

In [18]:
# For each sentence pair, map so that pair each(in order) word in the two sentences.
def split_sentence(sentence):
    return sentence.split()

shortpairs = filter_sameshort.map(lambda pair: pair[1])
wordpairs = shortpairs.map(lambda pair: list(zip(pair[0],pair[1]))).flatMap(lambda pairs: pairs)

wordpairs_p = wordpairs.map(lambda pair: (pair, 1))
wordpairs_count = wordpairs_p.reduceByKey(lambda a, b: a + b)
pairsort = wordpairs_count.map(lambda pair: (pair[1], pair[0]))
pairsort.sortByKey(False).take(10)


                                                                                

[(10040, ('is', 'är')),
 (5530, ('we', 'vi')),
 (5020, ('i', 'jag')),
 (3252, ('this', 'detta')),
 (2964, ('closed.', 'avslutad.')),
 (2917, ('and', 'och')),
 (2888, ('a', 'en')),
 (2866, ('it', 'det')),
 (2806, ('that', 'det')),
 (2650, ('not', 'inte'))]

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 48920)
Traceback (most recent call last):
  File "/usr/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 262, in handle
    poll(accum_updates)
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 235, in poll
    if func():
  File "/home/ubuntu/.local/lib/python3.10/site-packages/pyspark/accumulators.py", line 239, in accum_updates
    num_updates = re

In [None]:
# Check the result with dictionary:
# is => är correct
# we => vi correct
# i => jag correct
# this => datta correct
# closed => avsluted incorrect
# and => och correct
# a => en correct
# it => det correct
# that => det correct
# not => inte correct