In [1]:
from pyspark.sql import SparkSession

spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.87:7077") \
        .appName("yuen_ting_cheung")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()
        
spark_context = spark_session.sparkContext

In [3]:
# B.1 Load the CSV file from HDFS, and call show() to verify the data is loaded correctly.
data_frame = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.2.87:9000/parking-citations.csv')\
    .cache()
    
data_frame.show()

+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|Ticket number|          Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|Agency Description|Color Description|Body Style Description|
+-------------+--------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+------------------+-----------------+----------------------+
|   1103341116|2015-12-21T00:00:...|      1251|    null|       null|            CA|           200304|null|HOND|        PA|  

In [6]:
# B.2 Count the number of partitions in the underlying RDD.
data_frame.rdd.getNumPartitions()

11

In [7]:
# B.3 Print the schema for the DataFrame.
data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Agency Description: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [8]:
# B.4 Count the number of rows in the CSV file.
data_frame.count()

9881842

In [9]:
# B.5 Drop the columns Agency Description, Agency, and Route.
data_frame = data_frame.drop("Agency Description","Agency","Route")
data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)



In [16]:
# B.6 Find the mean fine amount (you need to convert the column to a float).
from pyspark.sql.functions import col
data_frame=data_frame.withColumn('Fine amount',col("Fine amount").cast("float"))
data_frame.printSchema()

data_frame.agg({'Fine amount': 'mean'}).show()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: float (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Color Description: string (nullable = true)
 |-- Body Style Description: string (nullable = true)

+----------------+
|avg(Fine amount)|
+----------------+
|70.1855354220642|
+----------------+



In [24]:
# B.7 Show the top 10 most frequent vehicle makes, and their frequencies.
df_b7 = data_frame.groupBy("Make").agg({'Make': 'count'}).orderBy('count(Make)', ascending=False)
df_b7.take(10)

[Row(Make='TOYT', count(Make)=1633266),
 Row(Make='HOND', count(Make)=1113834),
 Row(Make='FORD', count(Make)=860828),
 Row(Make='NISS', count(Make)=709250),
 Row(Make='CHEV', count(Make)=674422),
 Row(Make='BMW', count(Make)=450909),
 Row(Make='MERZ', count(Make)=402126),
 Row(Make='VOLK', count(Make)=335618),
 Row(Make='HYUN', count(Make)=304934),
 Row(Make='DODG', count(Make)=290979)]

In [31]:
# B.8 Let’s expand some abbreviations in the color column. Create a User Defined Function to
# create a new column, ‘color long’, mapping the original colors to their corresponding values
# in the dictionary below. If there is no key matching the original color, use the original color.
COLORS = {
'AL':'Aluminum', 'AM':'Amber', 'BG':'Beige', 'BK':'Black',
'BL':'Blue', 'BN':'Brown', 'BR':'Brown', 'BZ':'Bronze',
'CH':'Charcoal', 'DK':'Dark', 'GD':'Gold', 'GO':'Gold',
'GN':'Green', 'GY':'Gray', 'GT':'Granite', 'IV':'Ivory',
'LT':'Light', 'OL':'Olive', 'OR':'Orange', 'MR':'Maroon',
'PK':'Pink', 'RD':'Red', 'RE':'Red', 'SI':'Silver', 'SL':'Silver',
'SM':'Smoke', 'TN':'Tan', 'VT':'Violet', 'WT':'White',
'WH':'White', 'YL':'Yellow', 'YE':'Yellow', 'UN':'Unknown'
}

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def to_long_color(color_code):
  return COLORS.get(color_code, color_code)

udf_to_long_color = udf(to_long_color, StringType())

data_frame_with_long_color = data_frame.withColumn("long color", udf_to_long_color("color"))

data_frame_with_long_color.select('Ticket number','color', 'long color').show()

+-------------+-----+----------+
|Ticket number|color|long color|
+-------------+-----+----------+
|   1103341116|   GY|      Gray|
|   1103700150|   WH|     White|
|   1104803000|   BK|     Black|
|   1104820732|   WH|     White|
|   1105461453|   BK|     Black|
|   1106226590|   GY|      Gray|
|   1106500452|   BL|      Blue|
|   1106500463|   BK|     Black|
|   1106506402|   BR|     Brown|
|   1106506413|   SI|    Silver|
|   1106506424|   WH|     White|
|   1106506435|   GO|      Gold|
|   1106506446|   BK|     Black|
|   1106549754|   BK|     Black|
|   1107179581|   BK|     Black|
|   1107179592|   BK|     Black|
|   1107179603|   WH|     White|
|   1107539823| null|      null|
|   1107539834|   BK|     Black|
|   1107780811|   BK|     Black|
+-------------+-----+----------+
only showing top 20 rows



In [36]:
# B.9 Using this new column, what’s the?
df_b9 = data_frame_with_long_color.filter(data_frame_with_long_color["Make"] == "HOND")\
    .groupBy("long color")\
    .agg({'long color': 'count'})\
    .orderBy('count(long color)', ascending=False)
df_b9.take(1)
# So the most frequent colour value for Hondas (HOND) is Gray

[Row(long color='Gray', count(long color)=266135)]

In [37]:
# release the cores for another application!
spark_context.stop()