In [1]:
from pyspark.sql import SparkSession

spark_session = SparkSession\
        .builder\
        .master("local[1]") \
        .appName("Kolonskopi")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

### Question A1

In [2]:
en_tF = spark_context.textFile('hdfs://192.168.1.153:9000/europarl/europarl-v7.sv-en.en')

en_lc = en_tF.count()
print(f"Number of lines in English document {en_lc}\n")

sv_tF = spark_context.textFile('hdfs://192.168.1.153:9000/europarl/europarl-v7.sv-en.sv')

sv_lc = sv_tF.count()
print(f"Number of lines in Swedish document {sv_lc}\n")

if en_lc == sv_lc:
    print(f"There are equal number of lines in the two documents\n")

print(f"Number of partitions in files:\n EN:\t{en_tF.getNumPartitions()}\n SV:\t{sv_tF.getNumPartitions()}")



Number of lines in English document 1862234

Number of lines in Swedish document 1862234

There are equal number of lines in the two documents

Number of partitions in files:
 EN:	2
 SV:	3


### Question A2 

In [3]:
en_low_tF = en_tF.map(lambda line: line.lower().split(' ')) #split lines on spaces 
sv_low_tF = sv_tF.map(lambda line: line.lower().split(' '))

print(f"The first 10 objects in English and then Swedish file:\n")
print(en_low_tF.take(10))
print(sv_low_tF.take(10))

en_low_lc = en_low_tF.count()
sv_low_lc = sv_low_tF.count()

if en_low_lc == sv_low_lc:
    print("After pre-processing the files have the same number of lines")

The first 10 objects in English and then Swedish file:

[['resumption', 'of', 'the', 'session'], ['i', 'declare', 'resumed', 'the', 'session', 'of', 'the', 'european', 'parliament', 'adjourned', 'on', 'friday', '17', 'december', '1999,', 'and', 'i', 'would', 'like', 'once', 'again', 'to', 'wish', 'you', 'a', 'happy', 'new', 'year', 'in', 'the', 'hope', 'that', 'you', 'enjoyed', 'a', 'pleasant', 'festive', 'period.'], ['although,', 'as', 'you', 'will', 'have', 'seen,', 'the', 'dreaded', "'millennium", "bug'", 'failed', 'to', 'materialise,', 'still', 'the', 'people', 'in', 'a', 'number', 'of', 'countries', 'suffered', 'a', 'series', 'of', 'natural', 'disasters', 'that', 'truly', 'were', 'dreadful.'], ['you', 'have', 'requested', 'a', 'debate', 'on', 'this', 'subject', 'in', 'the', 'course', 'of', 'the', 'next', 'few', 'days,', 'during', 'this', 'part-session.'], ['in', 'the', 'meantime,', 'i', 'should', 'like', 'to', 'observe', 'a', "minute'", 's', 'silence,', 'as', 'a', 'number', 'of', 

### Question A3

In [4]:
en_words = en_low_tF.flatMap(lambda line: line)
en_word_counts = en_words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x+y).sortBy(lambda word_count: -word_count[1]).take(10)

sv_words = sv_low_tF.flatMap(lambda line: line)
sv_word_counts = sv_words\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda x, y: x+y)\
    .sortBy(lambda word_count: -word_count[1])\
    .take(10) #tuplify word with count, aggregate word counts, sort by descending occurances.


print(f"Most commonly used words in English carpus, and their count:\n {en_word_counts}\n\n"
    f"Most commonly used words in Swedish carpus, and their count:\n {sv_word_counts}\n\n"
    f"Seems reasonable that articles, conjunctions, determiners and prepositions are most ocuring in texts."
     )

Most commonly used words in English carpus, and their count:
 [('the', 3498375), ('of', 1659758), ('to', 1539760), ('and', 1288401), ('in', 1085993), ('that', 797516), ('a', 773522), ('is', 758050), ('for', 534242), ('we', 522849)]

Most commonly used words in Swedish carpus, and their count:
 [('att', 1706293), ('och', 1344830), ('i', 1050774), ('det', 924866), ('som', 913276), ('för', 908680), ('av', 738068), ('är', 694381), ('en', 620310), ('vi', 539797)]

Seems reasonable that articles, conjunctions, determiners and prepositions are most ocuring in texts.


### Question A4

In [19]:
indx_en_low_tF = en_low_tF\
    .zipWithIndex()\
    .map(lambda line: (line[1], line[0])) #index lines, swap key and value
    
indx_sv_low_tF = sv_low_tF\
    .zipWithIndex()\
    .map(lambda line: (line[1], line[0]))

#Inner join keys, filter for equal length lines, randomly selected number of words to start with 
j_sv_en_indx = indx_en_low_tF\
    .join(indx_sv_low_tF)\
    .filter(lambda line: len(line[1][0]) == len(line[1][1]))\
    .filter(lambda line: len(line[1][0]) < 10)\
    .filter(lambda line: len(line[1][0]) > 0)

word_pairs = j_sv_en_indx\
    .flatMap(lambda word_pair: list(zip(word_pair[1][0], word_pair[1][1])))\
    .map(lambda pair: (pair, 1))\
    .reduceByKey(lambda x, y: x+y)\
    .sortBy(lambda x: -x[1])\
    .take(20)


In [21]:
word_pairs

[(('is', 'är'), 10040),
 (('we', 'vi'), 5530),
 (('i', 'jag'), 5020),
 (('this', 'detta'), 3252),
 (('closed.', 'avslutad.'), 2964),
 (('and', 'och'), 2917),
 (('a', 'en'), 2888),
 (('it', 'det'), 2866),
 (('that', 'det'), 2806),
 (('not', 'inte'), 2650),
 (('(applause)', '(applåder)'), 2548),
 (('', '.'), 2223),
 (('.', '.'), 2143),
 (('have', 'har'), 1967),
 (('in', 'i'), 1932),
 (('will', 'att'), 1920),
 (('a', 'ett'), 1872),
 (('are', 'är'), 1789),
 (('the', 'omröstningen'), 1778),
 (('vote', 'kommer'), 1727)]

After mixing around a bit, I found that a line length of 1-9 words gave reasonable translations.

In [None]:
spark_context.stop()

### Question B1

In [100]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql import UDFRegistration


sqlContext = SparkSession\
        .builder\
        .master("local[1]") \
        .appName("Kolonskopi")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()


In [101]:
data = sqlContext\
    .read.option("header","true")\
    .csv("hdfs://192.168.1.153:9000/parking-citations.csv")\
    .cache()

In [147]:
data.show()

print(f"The schema for the data frame is the following:\n")
data.printSchema()

num_rows_df = data.count()
num_part_df = data.rdd.getNumPartitions()
print(f"\n\nThe number of rows in the table is: {num_rows_df}\n"
    f"The number of partitions of the RDD underlying the table: {num_part_df}\n")

data.drop('VIN')
data.drop('Latitude')
data.drop('Longitude')

data = data.withColumnRenamed("Fine amount", "fine_amount") # Remove cancerous white spaces

max_fine = data\
    .agg({"fine_amount": "max"})\
    .collect()[0]

rows_max_fine = data\
    .filter(data.fine_amount == max_fine['max(fine_amount)'])\
    .count()
    
print(f"The maximum fine reported was {max_fine['max(fine_amount)']}\n"
      f"with a total of {rows_max_fine} rows matching such fine amount\n")

make_freq = data\
    .groupby('Make')\
    .count()

print(f"The most reported makes are as follows:")
make_freq.orderBy(make_freq['count'].desc()).show(20)


def color_exp(short):
    """Extends color if it exists in the register"""
    COLORS = {
    'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
    'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
    'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
    'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
    'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
    'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
    'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White',
    'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
    }

    try:
        COLORS[short]
    except KeyError:
        return short
    return COLORS[short]
color_udf = F.udf(lambda x: color_exp(x), StringType())
data = data.withColumn("color long", color_udf(data.Color))

toyotas = data.filter(data.Make == 'TOYT')
toy_freq_colors = toyotas.groupBy('color long').count()

print(f"The most common color of reported Toyotas was:")
toy_freq_colors.orderBy(toy_freq_colors['count'].desc()).show(1)

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+----------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|fine_amount| Latitude|Longitude|color long|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+----------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|      Gray|
|   1103700150|2015-12-21T00:00: