In [61]:
from pyspark.sql import SparkSession
from operator import add

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.119:7077") \
        .appName("Jonathan-A.1")\
        .config("spark.executor.cores",2)\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.dynamicAllocation.shuffleTracking.enabled", True)\
        .config("spark.shuffle.service.enabled", False)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.driver.port",9998)\
        .config("spark.blockManager.port",10005)\
        .getOrCreate()

In [2]:
# B.1

df = spark_session.read\
    .option("header", True)\
    .csv('hdfs://192.168.2.119:9000/parking-citations.csv')\
    .cache()

                                                                                

In [3]:
df.show()

                                                                                

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|null|HOND|        PA|  

In [4]:
# B.2

df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [5]:
# B.3 

print("Rows: " + str(df.count()))



Rows: 13077724


                                                                                

In [6]:
# B.4

print("Partitions: " + str(df.rdd.getNumPartitions()))

Partitions: 16


In [48]:
# B.5
df = df.drop('VIN', 'Latitude', 'Longitude')

In [8]:
# B.6
max_fine = df.agg({"Fine amount": "max"}).collect()[0]
print("Maximum fine amount: " + max_fine["max(Fine amount)"])



Maximum fine amount: 98.0


                                                                                

In [40]:
# B.7
make = df.groupBy('Make').count().orderBy('count', ascending=False)
make.take(20)

                                                                                

[Row(Make='TOYT', count=2150768),
 Row(Make='HOND', count=1479996),
 Row(Make='FORD', count=1116235),
 Row(Make='NISS', count=945133),
 Row(Make='CHEV', count=892676),
 Row(Make='BMW', count=603092),
 Row(Make='MERZ', count=543298),
 Row(Make='VOLK', count=432030),
 Row(Make='HYUN', count=404917),
 Row(Make='DODG', count=391686),
 Row(Make='LEXS', count=368420),
 Row(Make='KIA', count=328155),
 Row(Make='JEEP', count=316300),
 Row(Make='AUDI', count=255395),
 Row(Make='MAZD', count=242344),
 Row(Make='OTHR', count=205546),
 Row(Make='GMC', count=184889),
 Row(Make='INFI', count=174315),
 Row(Make='CHRY', count=159948),
 Row(Make='SUBA', count=154640)]

In [73]:
# B.8
from itertools import chain
from pyspark.sql.functions import create_map, lit
from pyspark.sql.functions import coalesce
    

def color_long(colors):
    
    #Map colors to color long
    mapping = create_map([lit(x) for x in chain(*colors.items())])
    cl = df.withColumn('ColorLong', mapping[df['Color']])
    
    #Fill none with old color
    color_long = cl.withColumn("ColorLong", coalesce(cl.ColorLong,cl.Color)) 
    

    return color_long

In [79]:
COLORS = {
    'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
    'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
    'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
    'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
    'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
    'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
    'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White', 'WH':'White',
    'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

cl = color_long(COLORS)

In [83]:
# B.9

most_freq_color = cl.groupBy('ColorLong')\
                    .count()\
                    .orderBy('count', ascending=False)

In [84]:
most_freq_color.take(5)

                                                                                

[Row(ColorLong='White', count=2922676),
 Row(ColorLong='Black', count=2846765),
 Row(ColorLong='Gray', count=2567571),
 Row(ColorLong='Silver', count=1682040),
 Row(ColorLong='Blue', count=1007674)]

In [None]:
# Most frequent color seems to be white.