In [1]:
from pyspark.sql import SparkSession
from operator import add
from IPython.core.display import display, HTML
from pprint import pprint
import pyspark
from pyspark.sql.functions import udf, count, col 
from pyspark.sql.types import StringType
from pyspark.sql.functions import to_timestamp, dayofyear
import pyspark.sql.functions as f
from pyspark.sql import Window

In [2]:
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.1.153:7077") \
        .appName("hadoop_punnam_taskB")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",8)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [3]:
# Load the csv from HDFS
data_frame = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.1.153:9000/parking-citations.csv')\
    .cache()

In [4]:
# Verify that the data has loaded has correctly
data_frame.show()

+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|         

In [5]:
# Showing the schema of the dataframe
data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [6]:
# Count the no. of rows
data_frame.count()

9257460

In [7]:
# No. of Partitions in RDD
data_frame.rdd.getNumPartitions()

10

In [8]:
# Dropping the columns
new_df = data_frame.drop('VIN','Latitude','Longitude')

In [9]:
# Printing the new schema
new_df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)



In [10]:
# Casting the string type fine values to float
temp_df = new_df.withColumn('NewFine',new_df['Fine amount'].cast("float"))

In [11]:
# Checking the new type
temp_df.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- NewFine: float (nullable = true)



In [12]:
# Taking the top 10 tickets by their fine values
temp_df.orderBy('NewFine', ascending = False).take(10)

[Row(Ticket number='1109909220', Issue Date='2014-12-24T00:00:00', Issue time='125', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201507', Make='NISS', Body Style='PA', Color='BK', Location='617 E GAGE AV', Route='13E3', Agency='1', Violation code='558', Violation Description='8755**', Fine amount='505', NewFine=505.0),
 Row(Ticket number='1109909205', Issue Date='2014-12-24T00:00:00', Issue time='110', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201410', Make='HOND', Body Style=None, Color='BK', Location='6500 S AVALON BLVD', Route='13E3', Agency='1', Violation code='558', Violation Description='8755**', Fine amount='505', NewFine=505.0),
 Row(Ticket number='1109908995', Issue Date='2015-03-07T00:00:00', Issue time='1220', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201510', Make='FORD', Body Style='PA', Color='MA', Location='1203 E 35TH ST', Route='13E3', Agency='1', Violation code='558', Violation 

In [13]:
# Grouping tickets whose fine value is the maximum
temp_df.where(f.col('NewFine') == temp_df.orderBy('NewFine', ascending = False).take(1)[0]['NewFine']).count()

6

In [14]:
# https://stackoverflow.com/questions/35218882/find-maximum-row-per-group-in-spark-dataframe
# Grouping the make of the cars and counting each group
cnts = temp_df.groupBy("Make").agg(count("*").alias("cnt"))
# Cumulating the count of each group
maxs = cnts.groupBy("Make").agg(f.max("cnt").alias("mx"))

In [15]:
# Displaying the top 20 make of the cars
maxs.orderBy('mx', ascending = False).take(20)

[Row(Make='TOYT', mx=1531949),
 Row(Make='HOND', mx=1043276),
 Row(Make='FORD', mx=807498),
 Row(Make='NISS', mx=662097),
 Row(Make='CHEV', mx=631413),
 Row(Make='BMW', mx=422916),
 Row(Make='MERZ', mx=376830),
 Row(Make='VOLK', mx=316002),
 Row(Make='HYUN', mx=285286),
 Row(Make='DODG', mx=271590),
 Row(Make='LEXS', mx=263269),
 Row(Make='KIA', mx=217795),
 Row(Make='JEEP', mx=214965),
 Row(Make='AUDI', mx=179718),
 Row(Make='MAZD', mx=169811),
 Row(Make='OTHR', mx=154376),
 Row(Make='GMC', mx=132788),
 Row(Make='INFI', mx=120340),
 Row(Make='CHRY', mx=120317),
 Row(Make='ACUR', mx=111265)]

In [16]:
COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White',
'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

In [17]:
# Replacing the color shortcuts to the full name
def replace_color(keys):
    # If the color abbrevation is present, replace else remain the same
    if keys in COLORS.keys():
        return COLORS[keys]
    else:
        return keys
# Defining a UDF
replace_color_function = udf(replace_color,StringType())

In [18]:
# Creating a new column
color_df = temp_df.withColumn('color long',replace_color_function(temp_df['Color']))

In [19]:
color_df.take(5)

[Row(Ticket number='1103341116', Issue Date='2015-12-21T00:00:00', Issue time='1251', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='200304', Make='HOND', Body Style='PA', Color='GY', Location='13147 WELBY WAY', Route='01521', Agency='1', Violation code='4000A1', Violation Description='NO EVIDENCE OF REG', Fine amount='50', NewFine=50.0, color long='Gray'),
 Row(Ticket number='1103700150', Issue Date='2015-12-21T00:00:00', Issue time='1435', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201512', Make='GMC', Body Style='VN', Color='WH', Location='525 S MAIN ST', Route='1C51', Agency='1', Violation code='4000A1', Violation Description='NO EVIDENCE OF REG', Fine amount='50', NewFine=50.0, color long='White'),
 Row(Ticket number='1104803000', Issue Date='2015-12-21T00:00:00', Issue time='2055', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201503', Make='NISS', Body Style='PA', Color='BK', Location='200 WORLD W

In [20]:
# Seeing whether tickets whose color isn't a key in our dictionary changes or not
# See 2nd Ticket
color_df.orderBy('NewFine', ascending = False).take(10)

[Row(Ticket number='1109909220', Issue Date='2014-12-24T00:00:00', Issue time='125', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201507', Make='NISS', Body Style='PA', Color='BK', Location='617 E GAGE AV', Route='13E3', Agency='1', Violation code='558', Violation Description='8755**', Fine amount='505', NewFine=505.0, color long='Black'),
 Row(Ticket number='1109909205', Issue Date='2014-12-24T00:00:00', Issue time='110', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201410', Make='HOND', Body Style=None, Color='BK', Location='6500 S AVALON BLVD', Route='13E3', Agency='1', Violation code='558', Violation Description='8755**', Fine amount='505', NewFine=505.0, color long='Black'),
 Row(Ticket number='1109908995', Issue Date='2015-03-07T00:00:00', Issue time='1220', Meter Id=None, Marked Time=None, RP State Plate='CA', Plate Expiry Date='201510', Make='FORD', Body Style='PA', Color='MA', Location='1203 E 35TH ST', Route='13E3', Agen

In [21]:
# Tickets whose make is the maximum make i.e TOYT
new_color_df = color_df.where(f.col('Make') == maxs.orderBy('mx', ascending = False).take(1)[0]['Make'])

In [22]:
# https://stackoverflow.com/questions/35218882/find-maximum-row-per-group-in-spark-dataframe
# Grouping the tickets by their color and counting
cnts_color = new_color_df.groupBy("Color").agg(count("*").alias("cnt"))
# Cumulating the count of each color 
maxs_color = cnts_color.groupBy("Color").agg(f.max("cnt").alias("mx"))

In [23]:
# Top color in the TOYT make group
maxs_color.orderBy('mx', ascending = False).take(10)

[Row(Color='GY', mx=346822),
 Row(Color='WT', mx=304615),
 Row(Color='BK', mx=252199),
 Row(Color='SL', mx=248684),
 Row(Color='BL', mx=128051),
 Row(Color='RD', mx=84174),
 Row(Color='GN', mx=57627),
 Row(Color='GO', mx=30154),
 Row(Color='MR', mx=19882),
 Row(Color='TN', mx=17006)]

In [24]:
spark_context.stop()