## Empirical translation with Spark ##
Use parallel corpus of transcripts from the European Parliament to find statistics (wordcounts) and translations between english and another language (Czech used here). Download parallel corpus from http://www.statmt.org/europarl/

Create the spark session

In [1]:
from pyspark.sql import SparkSession

# New API
spark_session = SparkSession\
        .builder\
        .master("local") \
        .appName("assignment2")\
        .getOrCreate()

# Old API (RDD)
sc = spark_session.sparkContext

### A.1 ###

Read the english file and count how many lines

In [2]:
rdd_en = sc.textFile('europarl-v7.cs-en.en')\
.cache() # Keep this RDD in memory!
rdd_en.count()
#rdd_en.first()

646605

Read the czech file and count how many lines

In [3]:
rdd_cs = sc.textFile('europarl-v7.cs-en.cs')\
.cache() # Keep this RDD in memory!
rdd_cs.count()

646605

Examine first line

In [4]:
rdd_en.first()

"Action taken on Parliament's resolutions: see Minutes"

Check how many partitions are in use

In [5]:
rdd_en.getNumPartitions()

3

### A.2 ###

Filter out any lines beginning with "<", they are markup. Convert to lower case and split into 'words' on whitespace and punctuation

In [6]:
import re
rdd_en = rdd_en.filter(lambda line: line[:1]!='<')\
.map(lambda line: re.split('\W+', line.lower()))
rdd_en.count()

646605

There are no markup  lines. Checked by using linux command: cat europarl-v7.cs-en.en | grep "^<"

Inspect first line to check pre-processing

In [7]:
rdd_en.first()

['action', 'taken', 'on', 'parliament', 's', 'resolutions', 'see', 'minutes']

Same for Czech

In [8]:
rdd_cs = rdd_cs.filter(lambda line: line[:1]!='<')\
.map(lambda line: re.split('\W+', line.lower()))
rdd_cs.count()

646605

In [9]:
rdd_cs.first()

['následný',
 'postup',
 'na',
 'základě',
 'usnesení',
 'parlamentu',
 'viz',
 'zápis']

Inspect first 100 lines to check that preprocessing seems to have the desired effect

In [10]:
rdd_en.take(100)

[['action', 'taken', 'on', 'parliament', 's', 'resolutions', 'see', 'minutes'],
 ['documents', 'received', 'see', 'minutes'],
 ['written', 'statements', 'rule', '116', 'see', 'minutes'],
 ['texts',
  'of',
  'agreements',
  'forwarded',
  'by',
  'the',
  'council',
  'see',
  'minutes'],
 ['membership', 'of', 'parliament', 'see', 'minutes'],
 ['membership', 'of', 'committees', 'and', 'delegations', 'see', 'minutes'],
 ['future',
  'action',
  'in',
  'the',
  'field',
  'of',
  'patents',
  'motions',
  'for',
  'resolutions',
  'tabled',
  'see',
  'minutes'],
 ['agenda', 'for', 'next', 'sitting', 'see', 'minutes'],
 ['closure', 'of', 'sitting'],
 ['', 'the', 'sitting', 'was', 'closed', 'at', '11', '55', 'p', 'm', ''],
 ['opening', 'of', 'the', 'sitting'],
 ['', 'the', 'sitting', 'was', 'opened', 'at', '9', 'a', 'm', ''],
 ['documents', 'received', 'see', 'minutes'],
 ['approval', 'of', 'minutes', 'of', 'previous', 'sitting', 'see', 'minutes'],
 ['membership', 'of', 'committees', 'an

In [11]:
rdd_cs.take(100)

[['následný',
  'postup',
  'na',
  'základě',
  'usnesení',
  'parlamentu',
  'viz',
  'zápis'],
 ['předložení', 'dokumentů', 'viz', 'zápis'],
 ['písemná',
  'prohlášení',
  'článek',
  '116',
  'jednacího',
  'řádu',
  'viz',
  'zápis'],
 ['texty', 'smluv', 'dodané', 'radou', 'viz', 'zápis'],
 ['složení', 'parlamentu', 'viz', 'zápis'],
 ['členství', 've', 'výborech', 'a', 'delegacích', 'viz', 'zápis'],
 ['budoucí',
  'akce',
  'v',
  'oblasti',
  'patentů',
  'předložené',
  'návrhy',
  'usnesení',
  'viz',
  'zápis'],
 ['pořad', 'jednání', 'příštího', 'zasedání', 'viz', 'zápis'],
 ['ukončení', 'zasedání'],
 ['', 'la', 'seduta', 'è', 'tolta', 'alle', '23', '55', ''],
 ['zahájení', 'zasedání'],
 ['', 'συνεδρίαση', 'αρχίζει', 'στις', '9', 'π', 'μ', ''],
 ['předložení', 'dokumentů', 'viz', 'zápis'],
 ['schválení', 'zápisu', 'z', 'předchozího', 'zasedání', 'viz', 'zápis'],
 ['členství', 've', 'výborech', 'a', 'delegacích', 'viz', 'zápis'],
 ['1', ''],
 ['dohoda',
  'es',
  'bulharsko',
 

### A.3 ###

Reduce lines to individual words using flatmap, filter out empty words (these correspond to brackets in the original text), then map-reduce into wordcounts and show the top 10 by value.

In [12]:
wordcounts = rdd_en\
.flatMap(lambda words: words)\
.filter(lambda word: word != '')\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
wordcounts.takeOrdered(10, key=lambda x: -x[1])

[('the', 1186954),
 ('of', 572139),
 ('to', 519242),
 ('and', 472757),
 ('in', 381971),
 ('that', 272178),
 ('a', 262539),
 ('is', 258271),
 ('for', 192522),
 ('we', 177781)]

Repeat for the Czech file

In [13]:
wordcounts_cs = rdd_cs\
.flatMap(lambda words: words)\
.filter(lambda word: word != '')\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
wordcounts_cs.takeOrdered(10, key=lambda x: -x[1])

[('a', 485513),
 ('v', 293134),
 ('se', 224287),
 ('na', 200226),
 ('je', 195307),
 ('že', 189908),
 ('o', 128539),
 ('pro', 112098),
 ('s', 93584),
 ('to', 92817)]

### A.4 ###


rdd_en and rdd_cs still contain the tokenized lines. For each line, output a key-value pair where the line number is the key. Filter out any where there is no matching line or where the line is blank in either version (none in this corpus)

In [27]:
rdd_en_indexed = rdd_en.zipWithIndex()
rdd_cs_indexed = rdd_cs.zipWithIndex()
joined = rdd_en_indexed.map(lambda line: (line[1], line[0]))\
.join(rdd_cs_indexed.map(lambda line: (line[1], line[0])))\
.filter(lambda line: len(line) == 2 and len(line[1][0])>0 and len(line[1][1])>0 )
joined.take(5)

[(0,
  (['action',
    'taken',
    'on',
    'parliament',
    's',
    'resolutions',
    'see',
    'minutes'],
   ['následný',
    'postup',
    'na',
    'základě',
    'usnesení',
    'parlamentu',
    'viz',
    'zápis'])),
 (262146,
  (['we',
    'now',
    'come',
    'to',
    'our',
    'rapporteur',
    'mrs',
    'riera',
    'madurell',
    ''],
   ['slovo',
    'nyní',
    'dáme',
    'naší',
    'zpravodajce',
    'paní',
    'riera',
    'madurellové',
    ''])),
 (524292,
  (['finally',
    'i',
    'will',
    'convey',
    'to',
    'vice',
    'president',
    'reding',
    'the',
    'points',
    'that',
    'have',
    'been',
    'made',
    'here',
    'this',
    'evening',
    'including',
    'the',
    'question',
    'raised',
    'by',
    'one',
    'member',
    'in',
    'relation',
    'to',
    'which',
    'an',
    'answer',
    'is',
    'expected',
    ''],
   ['a',
    'konečně',
    'předám',
    'místopředsedkyni',
    'redingové',
    'vaše'

Filter out longer sentences and those with different number of words in translation

In [50]:
short_sentences = joined.filter(lambda line: len(line[1][0]) < 10)\
.filter(lambda line: len(line[1][0])==len(line[1][1]))
short_sentences.count()

19970

Now we assume that every word in the czech data is a direct translation of the corresponding word in the english data. Map them into a set of pairs (sentence structure no longer needed so flatMap) and discard blanks
Also discard numbers.

In [51]:
pairs = short_sentences.flatMap(lambda line: zip(line[1][0], line[1][1]))\
.filter(lambda pair: pair[0] != '' and pair[1] != '')\
.filter(lambda pair: not pair[0].replace('.','',1).isdigit() and not pair[1].replace('.','',1).isdigit())
pairs.take(10)

[('action', 'následný'),
 ('taken', 'postup'),
 ('on', 'na'),
 ('parliament', 'základě'),
 ('s', 'usnesení'),
 ('resolutions', 'parlamentu'),
 ('see', 'viz'),
 ('minutes', 'zápis'),
 ('documents', 'předložení'),
 ('received', 'dokumentů')]

In [52]:
pairs.count()

78542

Then map-reduce these pairs to find the most common ones

In [53]:
translations = pairs.map(lambda line: (line, 1))\
.reduceByKey(lambda a, b: a+b)

In [54]:
translations.takeOrdered(20, key=lambda x: -x[1])

[(('applause', 'potlesk'), 1851),
 (('is', 'je'), 1449),
 (('written', 'písemná'), 821),
 (('rule', 'článek'), 800),
 (('statements', 'prohlášení'), 796),
 (('see', 'viz'), 748),
 (('minutes', 'zápis'), 742),
 (('and', 'a'), 698),
 (('that', 'to'), 620),
 (('thank', 'děkuji'), 618),
 (('you', 'vám'), 605),
 (('this', 'to'), 541),
 (('is', 'to'), 360),
 (('in', 'v'), 298),
 (('mr', 'pane'), 296),
 (('are', 'jsou'), 264),
 (('why', 'proč'), 247),
 (('vote', 'hlasování'), 246),
 (('debate', 'rozprava'), 224),
 (('president', 'předsedající'), 222)]

In [55]:
translations.count()

33547