### connect to spark cluster

In [1]:
from pyspark import RDD
from pyspark.sql import SparkSession
from operator import add

spark_session = SparkSession.builder\
        .master("spark://192.168.2.250:7077") \
        .appName("PatrickHennig_Lab3")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled",True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores", 2)\
        .config("spark.driver.port",9999)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

# RDD API
spark_context = spark_session.sparkContext

# too verbose..
spark_context.setLogLevel("INFO")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/02/14 15:06:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark_context.setLogLevel("WARN")

spark RDD documentation https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

## Question A.1

A.1.1 Read the English transcripts with Spark, and count the number of lines.

A.1.2 Do the same with the other language (so that you have a separate lineage of RDDs for
each).

A.1.3 Verify that the line counts are the same for the two languages.

A.1.4 Count the number of partitions.

In [3]:
# .textFile() reads into RDD containing individual lines

# A.1.1
lines_de = spark_context.textFile("hdfs://192.168.2.250:9000/europarl/europarl-v7.de-en.de")
print("lines in the german version:",lines_de.count())

# A.1.2
lines_en = spark_context.textFile("hdfs://192.168.2.250:9000/europarl/europarl-v7.de-en.en")
print("lines in the english version:",lines_en.count())

# A.1.3
assert lines_de.count()==lines_en.count(),f"number of lines differ between english and german (en:{lines_en.count()}, de:{lines_de.count()})"

# A.1.4
print("num partitions in the german set:",lines_de.getNumPartitions())
print("num partitions in the english set:",lines_en.getNumPartitions())

                                                                                

lines in the german version: 1920209


                                                                                

lines in the english version: 1920209




num partitions in the german set: 3
num partitions in the english set: 3


                                                                                

# Question A.2 #
    
A.2.1 Pre-process the text from both RDDs by doing the following:
 - Lowercase the text
 - Tokenize the text (split on space)
   
Hint: define a function to run in your driver application to avoid writing this code twice.

A.2.2 Inspect 10 entries from each of your RDDs to verify your pre-processing.

A.2.3 Verify that the line counts still match after the pre-processing.

In [8]:
# A.2.1
def tok_lower(text:RDD)->RDD:
    return text.map(lambda l:l.lower()).map(lambda l:l.split(" "))

lines_de_lt=tok_lower(lines_de)
lines_en_lt=tok_lower(lines_en)

print(f"num lines in the german version: {lines_de_lt.count()}")
print(f"num lines in the english version: {lines_en_lt.count()}")

# A.2.2
print(lines_de_lt.take(10))
print(lines_en_lt.take(10))

# A.2.3
assert lines_de_lt.count()==lines_en_lt.count(), \
    f"number of lines differ between english and german after pre-processing" \
    f"(en:{lines_en.count()}, de:{lines_de.count()})"

                                                                                

num words in the german version: 1920209


                                                                                

num words in the english version: 1920209


                                                                                

[['wiederaufnahme', 'der', 'sitzungsperiode'], ['ich', 'erkläre', 'die', 'am', 'freitag,', 'dem', '17.', 'dezember', 'unterbrochene', 'sitzungsperiode', 'des', 'europäischen', 'parlaments', 'für', 'wiederaufgenommen,', 'wünsche', 'ihnen', 'nochmals', 'alles', 'gute', 'zum', 'jahreswechsel', 'und', 'hoffe,', 'daß', 'sie', 'schöne', 'ferien', 'hatten.'], ['wie', 'sie', 'feststellen', 'konnten,', 'ist', 'der', 'gefürchtete', '"millenium-bug', '"', 'nicht', 'eingetreten.', 'doch', 'sind', 'bürger', 'einiger', 'unserer', 'mitgliedstaaten', 'opfer', 'von', 'schrecklichen', 'naturkatastrophen', 'geworden.'], ['im', 'parlament', 'besteht', 'der', 'wunsch', 'nach', 'einer', 'aussprache', 'im', 'verlauf', 'dieser', 'sitzungsperiode', 'in', 'den', 'nächsten', 'tagen.'], ['heute', 'möchte', 'ich', 'sie', 'bitten', '-', 'das', 'ist', 'auch', 'der', 'wunsch', 'einiger', 'kolleginnen', 'und', 'kollegen', '-,', 'allen', 'opfern', 'der', 'stürme,', 'insbesondere', 'in', 'den', 'verschiedenen', 'ländern

                                                                                

[['resumption', 'of', 'the', 'session'], ['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999,', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period.'], ['although,', 'as', 'you', 'will', 'have', 'seen,', 'the', 'dreaded', "'millennium", "bug'", 'failed', 'to', 'materialise,', 'still', 'the', 'people', 'in', 'a', 'number', 'of', 'countries', 'suffered', 'a', 'series', 'of', 'natural', 'disasters', 'that', 'truly', 'were', 'dreadful.'], ['you', 'have', 'requested', 'a', 'debate', 'on', 'this', 'subject', 'in', 'the', 'course', 'of', 'the', 'next', 'few', 'days,', 'during', 'this', 'part-session.'], ['in', 'the', 'meantime,', 'i', 'should', 'like', 'to', 'observe', 'a', "minute'", 's', 'silence,', 'as', 'a', 'number', 'of', 'members', 'have', 'requested,', 'on', 'behalf', 'of', '

                                                                                

## Question A.3
A.3.1 Use Spark to compute the 10 most frequently according words in the English language
corpus. Repeat for the other language.
A.3.2 Verify that your results are reasonable.

In [33]:
def get_word_count(rdd:RDD)->RDD:
    """
    rdd must consist of lines already split into words

    1) flatten sentences (which are already split into lists of words) into one huge list of words
    2) remove zero length strings
    3) group by word (i.e. create one list of all occurences of each word)
    4) map each list of words using the len function, effectively counting the number of occurences of this word
    5) sort in descending order so that the first word is the most common word
    """
    return rdd \
        .flatMap(lambda x:x) \
        .filter(lambda x:len(x)>0) \
        .groupBy(lambda x:x) \
        .mapValues(len) \
        .sortBy(lambda x:x[1],ascending=False)

In [22]:
de_word_freq=get_word_count(lines_de_lt)
en_word_freq=get_word_count(lines_en_lt)

                                                                                

In [23]:
de_word_freq_sample=de_word_freq.take(10)
en_word_freq_sample=en_word_freq.take(10)

                                                                                

In [27]:
print(f"10 most used german words: {', '.join([w[0] for w in de_word_freq_sample])}")
print(f"10 most used german words: {', '.join([w[0] for w in en_word_freq_sample])}")
# results seem reasonable

10 most used german words: die, der, und, in, zu, den, wir, für, ich, das
10 most used german words: the, of, to, and, in, that, a, is, for, we


## Question A.4
A.4.1 Use this parallel corpus to mine some translations in the form of word pairs, for the two
languages. Do this by pairing words found on short lines with the same number of words
respectively. We (incorrectly) assume the words stay in the same order when translated.
Follow this approach. Work with the pair of RDDs you created in question A.2.
Hint: make a new pair of RDDs for each step, sv_1, en_1, sv_2, en_2, ...
4
1. Key the lines by their line number (hint: ZipWithIndex()).
2. Swap the key and value - so that the line number is the key.
3. Join the two RDDs together according to the line number key, so you have pairs of
matching lines.
4. Filter to exclude line pairs that have an empty/missing “corresponding” sentence.
5. Filter to leave only pairs of sentences with a small number of words per sentence,
this should give a more reliable translation (you can experiment).
6. Filter to leave only pairs of sentences with the same number of words in each
sentence.
7. For each sentence pair, map so that you pair each (in order) word in the two
sentences. We no longer need the line numbers. (hint: use python’s built in zip()
function)
8. Use reduce to count the number of occurrences of the word-translation-pairs.
9. Print some of the most frequently occurring pairs of words.
Do your translations seem reasonable? Use a dictionary to check a few (don’t worry, you
won’t be marked down for incorrect translations).

In [49]:
def infer_translation(rdd_0:RDD,rdd_1:RDD)->RDD:
    # 1.
    rdd_0=rdd_0.zipWithIndex()
    rdd_1=rdd_1.zipWithIndex()
    # 2.
    rdd_0=rdd_0.map(lambda x:(x[1],x[0]))
    rdd_1=rdd_1.map(lambda x:(x[1],x[0]))
    # 3.
    rdd=rdd_0.join(rdd_1)
    # 4.
    rdd=rdd.filter(lambda x:len(x[1][0])>0 and len(x[1][1])>0)
    # 5.
    max_sentence_length=20
    rdd=rdd.filter(lambda x:len(x[1][0])<=max_sentence_length and len(x[1][1])<=max_sentence_length)
    # 6.
    rdd=rdd.filter(lambda x:len(x[1][0])==len(x[1][1]))
    # 7.
    rdd=rdd.map(lambda x:tuple(zip(*x[1])))
    # 8.
    # instruction say to use reduce, but I don't know how reduce would be the natural way to solve this
    rdd=rdd \
        .flatMap(lambda x:x) \
        .groupBy(lambda x:x) \
        .mapValues(len) \
        .sortBy(lambda x:x[1],ascending=False)
    return rdd

# 9. (view the results)
word_translations_de_en=infer_translation(lines_de_lt, lines_en_lt)

                                                                                

In [56]:
print(
    "top n most common word pairings:",
    *[
        f"de : {w_de} =?= {w_en} : en ({wp_cnt})"
        for ((w_de,w_en),wp_cnt)
        in word_translations_de_en.take(20)
    ],
    sep="\n"
)
# these are all sensible (even though one of them is just " ", which in fact does also mean " " in german)

top n most common word pairings:
de : die =?= the : en (24647)
de : ist =?= is : en (18050)
de : ich =?= i : en (14071)
de : wir =?= we : en (13066)
de : und =?= and : en (12336)
de : der =?= the : en (12156)
de : der =?= of : en (5292)
de : herr =?= mr : en (5284)
de : in =?= in : en (5149)
de : aussprache =?= debate : en (4750)
de : es =?= it : en (4450)
de : nicht =?= not : en (4406)
de : das =?= the : en (4365)
de : geschlossen. =?= closed. : en (4340)
de : das =?= that : en (4186)
de : dass =?= that : en (4120)
de : eine =?= a : en (3717)
de :    =?=    : en (3599)
de : für =?= for : en (3558)
de : ein =?= a : en (3435)


In [None]:
spark_session.stop()