In [3]:
from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("local") \
        .appName("A.2")\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

distFileEN = spark_context.textFile("europarl-v7.sv-en.en")
distFileSV = spark_context.textFile("europarl-v7.sv-en.sv")
TokenEN = distFileEN.pipe("/home/ubuntu/tools/tokenizer.perl -l en")
TokenSV = distFileSV.pipe("/home/ubuntu/tools/tokenizer.perl -l sv")
LowerCaseEN = TokenEN.flatMap(lambda line: line.lower().split("/n"))
LowerCaseSV = TokenSV.flatMap(lambda line: line.lower().split("/n"))

In [4]:
LowerCaseEN.take(5)

['resumption of the session',
 'i declare resumed the session of the european parliament adjourned on friday 17 december 1999 , and i would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period .',
 "although , as you will have seen , the dreaded ' millennium bug ' failed to materialise , still the people in a number of countries suffered a series of natural disasters that truly were dreadful .",
 'you have requested a debate on this subject in the course of the next few days , during this part-session .',
 "in the meantime , i should like to observe a minute ' s silence , as a number of members have requested , on behalf of all the victims concerned , particularly those of the terrible storms , in the various countries of the european union ."]

In [5]:
ENzip = LowerCaseEN.zipWithIndex()
SVzip = LowerCaseSV.zipWithIndex()
#LowerCaseSV.zipWithIndex()
ENzip.take(5)
SVzip.take(5)

[('återupptagande av sessionen', 0),
 ('jag förklarar europaparlamentets session återupptagen efter avbrottet den 17 december . jag vill på nytt önska er ett gott nytt år och jag hoppas att ni haft en trevlig semester .',
  1),
 ('som ni kunnat konstatera ägde " den stora år 2000-buggen " aldrig rum . däremot har invånarna i ett antal av våra medlemsländer drabbats av naturkatastrofer som verkligen varit förskräckliga .',
  2),
 ('ni har begärt en debatt i ämnet under sammanträdesperiodens kommande dagar .',
  3),
 ('till dess vill jag att vi , som ett antal kolleger begärt , håller en tyst minut för offren för bl.a. stormarna i de länder i europeiska unionen som drabbats .',
  4)]

In [6]:
ENInverted = ENzip.map(lambda x: (x[1], x[0]))
SVInverted = SVzip.map(lambda x: (x[1], x[0]))

In [None]:
SVInverted.take(5)
ENInverted.take(5)

In [7]:
Joined = ENInverted.join(SVInverted)
#Joined.take(5)

In [8]:
newJoined = Joined.map(lambda x: (x[0], (x[1][0].split(" "), x[1][1].split(" "))))

In [9]:
newJoined.take(3)

[(0,
  (['resumption', 'of', 'the', 'session'],
   ['återupptagande', 'av', 'sessionen'])),
 (1835010,
  (['i',
    'hope',
    'that',
    'he',
    'will',
    'take',
    'a',
    'little',
    'time',
    'to',
    'address',
    'this',
    'issue',
    'before',
    'we',
    'close',
    '.'],
   ['jag',
    'hoppas',
    'att',
    'han',
    'tar',
    'sig',
    'litet',
    'tid',
    'för',
    'att',
    'gå',
    'in',
    'på',
    'det',
    '.'])),
 (1310724,
  (['they',
    'must',
    'be',
    'able',
    'to',
    'make',
    'an',
    'informed',
    'choice',
    'on',
    'the',
    'basis',
    'of',
    'clear',
    'information',
    '.'],
   ['de',
    'måste',
    'få',
    'möjlighet',
    'att',
    'göra',
    'ett',
    'upplyst',
    'val',
    'med',
    'utgångspunkt',
    'från',
    'tydlig',
    'information',
    '.']))]

In [22]:
#len(x[1][0])!=0
CutEmpty = newJoined.filter(lambda x: (x[1][0])!=None and (x[1][1]!=None))
ShortSen = CutEmpty.filter(lambda x: (len(x[1][0]))<8 and (len(x[1][1]))<8)
KeepSamLength = ShortSen.filter(lambda x: (len(x[1][0]) == len(x[1][1])))
FirstSen = KeepSamLength.flatMap(lambda x: x[1][0])
SecSen = KeepSamLength.flatMap(lambda x: x[1][1])
ZipWords = FirstSen.zip(SecSen)

In [26]:
#KeepSamLength.count()
#45090
CutEmpty.count()

1862234

In [24]:
ZipWords.take(50)

[('we', 'vi'),
 ('must', 'måste'),
 ('act', 'agera'),
 ('.', '.'),
 ('the', 'den'),
 ('second', 'andra'),
 ('is', 'är'),
 ('immigration', 'invandringen'),
 ('.', '.'),
 ('the', 'jag'),
 ('debate', 'förklarar'),
 ('is', 'debatten'),
 ('closed', 'avslutad'),
 ('.', '.'),
 ('(', '('),
 ('1', '1'),
 (')', ')'),
 ('i', 'jag'),
 ('have', 'har'),
 ('high', 'höga'),
 ('hopes', 'förhoppningar'),
 ('for', 'på'),
 ('reach', 'reach-förordningen'),
 ('.', '.'),
 ('documents', 'inkomna'),
 ('received', 'dokument'),
 (':', ':'),
 ('see', 'se'),
 ('minutes', 'protokollet'),
 ('(', '('),
 ('applause', 'applåder'),
 (')', ')'),
 ('', '.'),
 ('this', 'detta'),
 ('is', 'är'),
 ('factually', 'faktiskt'),
 ('untrue', 'oriktigt'),
 ('.', '.'),
 ('is', 'är'),
 ('this', 'detta'),
 ('simply', 'enbart'),
 ('a', 'en'),
 ('political', 'politisk'),
 ('gesture', 'gest'),
 ('?', '?'),
 ('', '.'),
 ('any', 'alla'),
 ('other', 'andra'),
 ('approach', 'tillvägagångssätt'),
 ('would', 'skulle')]

In [31]:
stopwords = [',', ')', '(', '?', '.', ':']
counts = ZipWords.map(lambda x: (x, 1)) \
         .reduceByKey(lambda a, b: a +b) \
         .sortBy(lambda x: x[1], False) \
         .filter(lambda x: x[0][0] not in stopwords and x[0][1] not in stopwords)

In [32]:
counts.take(5)

[(('is', 'är'), 6064),
 (('applause', 'applåder'), 3335),
 (('closed', 'avslutad'), 2994),
 (('we', 'vi'), 2299),
 (('that', 'det'), 2083)]