In [1]:
from pyspark.sql import SparkSession

# (8 cores, 16gb per machine) x 5 = 40 cores

# New API
spark_session = SparkSession\
        .builder\
        .master("spark://192.168.1.153:7077") \
        .appName("martin_luther_king")\
        .config("spark.dynamicAllocation.enabled", True)\
        .config("spark.shuffle.service.enabled", True)\
        .config("spark.dynamicAllocation.executorIdleTimeout","30s")\
        .config("spark.executor.cores",4)\
        .getOrCreate()

# Old API (RDD)
spark_context = spark_session.sparkContext

In [2]:
# 1: Load the CSV file from HDFS and call show to verify the data is loaded correctly

data_frame = spark_session.read\
    .option("header", "true")\
    .csv('hdfs://192.168.1.153:9000/parking-citations.csv')\
    .cache()

data_frame.show()


+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|Ticket number|         Issue Date|Issue time|Meter Id|Marked Time|RP State Plate|Plate Expiry Date| VIN|Make|Body Style|Color|            Location|Route|Agency|Violation code|Violation Description|Fine amount| Latitude|Longitude|
+-------------+-------------------+----------+--------+-----------+--------------+-----------------+----+----+----------+-----+--------------------+-----+------+--------------+---------------------+-----------+---------+---------+
|   1103341116|2015-12-21T00:00:00|      1251|    null|       null|            CA|           200304|null|HOND|        PA|   GY|     13147 WELBY WAY|01521|     1|        4000A1|   NO EVIDENCE OF REG|         50|    99999|    99999|
|   1103700150|2015-12-21T00:00:00|      1435|    null|       null|         

In [3]:
# 2: Print the schema for the data frame 
data_frame.printSchema()

root
 |-- Ticket number: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Issue time: string (nullable = true)
 |-- Meter Id: string (nullable = true)
 |-- Marked Time: string (nullable = true)
 |-- RP State Plate: string (nullable = true)
 |-- Plate Expiry Date: string (nullable = true)
 |-- VIN: string (nullable = true)
 |-- Make: string (nullable = true)
 |-- Body Style: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Route: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Violation code: string (nullable = true)
 |-- Violation Description: string (nullable = true)
 |-- Fine amount: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)



In [4]:
# 3: count the number of rows in the csv file
data_frame.count()

9257460

In [5]:
# 4: count the number of partitions in the underlying RDD
data_frame.rdd.getNumPartitions()

10

In [13]:
# 5: drop the columns VIN, Latitude, Longitude
drop_vin_lat_long = data_frame.drop("VIN", "Latitude", "Longitude")
drop_vin_lat_long.columns

['Ticket number',
 'Issue Date',
 'Issue time',
 'Meter Id',
 'Marked Time',
 'RP State Plate',
 'Plate Expiry Date',
 'Make',
 'Body Style',
 'Color',
 'Location',
 'Route',
 'Agency',
 'Violation code',
 'Violation Description',
 'Fine amount']

In [34]:
# 6: find the maximum fine amount, how many fines have this amount?

# convert to float 
new_type_df = drop_vin_lat_long.withColumn("Fine amount", drop_vin_lat_long["Fine amount"].cast("float"))

# print max fine amount
new_type_df.groupby().max('Fine amount').collect()[0].asDict()['max(Fine amount)']

# print number of occurences for max value


new_type_df.filter(new_type_df.Fin > 3).collect()
test = new_type_df.groupBy('Fine amount')


In [None]:
# 7: show the top 20 most frequent vehicle makes and their frequencies



In [None]:
# 8: 