In [1]:
from operator import add
import re
from collections import OrderedDict
from operator import itemgetter 
import itertools
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.1.153:7077") \
        .appName("linneaeriksson_A2-partB")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext
spark_context.setLogLevel("DEBUG")

In [2]:
# part 1
citations = spark_session.read\
.option("header", "true")\
.csv("hdfs://192.168.1.153:9000/parking-citations.csv")\
.cache()
citations.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|         

In [3]:
# part 2
citations.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [4]:
# part 3
print("Total number of rows in CSV: " + str(citations.count()))

Total number of rows in CSV: 9257460


In [5]:
# part 4
print("Total number of partitions in RDD: " + str(citations.rdd.getNumPartitions()))

Total number of partitions in RDD: 10


In [6]:
# part 5
citations_dropped = citations.drop("VIN", "Latitude", "Longitude")
citations_dropped.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)



In [7]:
# part 6
citations_fine_amount_conv2float = citations_dropped.withColumn("Fine_amount_float", citations_dropped['Fine amount'].cast("float")).na.fill(0)
citations_fine_view = citations_fine_amount_conv2float.createOrReplaceTempView("citations_fine")

spark_session.sql("SELECT Fine_amount_float, count(Fine_amount_float) as Number_of_Fines " + \
                          "FROM citations_fine " + \
                          "GROUP BY Fine_amount_float " + \
                          "ORDER BY Fine_amount_float DESC").show()

+-----------------+---------------+
|Fine_amount_float|Number_of_Fines|
+-----------------+---------------+
|            505.0|              6|
|            363.0|          63366|
|            353.0|             15|
|            345.0|             40|
|            330.0|              1|
|            293.0|          10401|
|            255.0|             30|
|            163.0|         106748|
|            155.0|              1|
|            143.0|            373|
|            133.0|           9185|
|            128.0|            338|
|            123.0|              3|
|            113.0|              2|
|            105.0|            729|
|            103.0|           7401|
|             98.0|            333|
|             93.0|        1097437|
|             88.0|            158|
|             85.0|              5|
+-----------------+---------------+
only showing top 20 rows



In [8]:
# part 7
spark_session.sql("SELECT Make, count(Make) AS Nr_of_Vehicles, count(Make)/(SELECT count(*) FROM citations_fine) as FREQ " + \
                          "FROM citations_fine " + \
                          "GROUP BY Make " + \
                          "ORDER BY FREQ DESC").show()

+----+--------------+--------------------+
|Make|Nr_of_Vehicles|                FREQ|
+----+--------------+--------------------+
|TOYT|       1531949| 0.16548264858827366|
|HOND|       1043276| 0.11269570702979002|
|FORD|        807498| 0.08722673389893124|
|NISS|        662097|  0.0715203738390444|
|CHEV|        631413| 0.06820585776228037|
| BMW|        422916| 0.04568380527704143|
|MERZ|        376830| 0.04070554990245705|
|VOLK|        316002| 0.03413484908387398|
|HYUN|        285286|0.030816876335409495|
|DODG|        271590| 0.02933742084761911|
|LEXS|        263269|0.028438578184512814|
| KIA|        217795|0.023526431656199432|
|JEEP|        214965|0.023220732252691344|
|AUDI|        179718|   0.019413316395642|
|MAZD|        169811|0.018343152441382408|
|OTHR|        154376|0.016675848450870973|
| GMC|        132788|0.014343891304958379|
|INFI|        120340| 0.01299924601348534|
|CHRY|        120317|0.012996761530700646|
|ACUR|        111265|0.012018955523437314|
+----+-----

In [10]:
# part 8
def expand_color_abbr(col):
    COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White',
'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}
    if col in COLORS:
        return COLORS[col]
    else:
        return col

udf_expand_color_abbr = udf(expand_color_abbr, StringType())
color_long = citations_fine_amount_conv2float.withColumn("Color_Long", udf_expand_color_abbr("Color"))


In [11]:
# part 9
color_long_view = color_long.createOrReplaceTempView("citations_color")
toyota_color = spark_session.sql("SELECT Color_Long, count(Color_Long) AS Nr_of_Toyotas " + \
                                  "FROM citations_color " + \
                                  "WHERE Make='TOYT' " + \
                                  "GROUP BY Color_Long " + \
                                  "ORDER BY count(Color_Long) DESC").show()

+----------+-------------+
|Color_Long|Nr_of_Toyotas|
+----------+-------------+
|      Gray|       346822|
|     White|       304620|
|     Black|       252199|
|    Silver|       248685|
|      Blue|       128051|
|       Red|        84175|
|     Green|        57627|
|      Gold|        30154|
|    Maroon|        19882|
|       Tan|        17006|
|     Beige|        11572|
|        OT|        10805|
|     Brown|         8466|
|    Yellow|         3413|
|        PR|         3010|
|    Orange|         2527|
|   Unknown|         1343|
|        TU|         1077|
|        CO|          423|
|      Pink|           89|
+----------+-------------+
only showing top 20 rows



In [12]:
spark_context.stop()