# PySpark - Analyzing NYC Parking Data

## Section 1. Examine Data
### Q1. Find the total number of tickets for the year.

In [1]:
#create a Spark session

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
4,application_1647323775098_0005,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# S3 path to parking tickets data
s3path = 's3a://upgrad-data/Parking_Violation_Tickets.csv'

# Load the csv data and check the schema
tickets = spark.read.format("csv").option("header", "true").load(s3path)
tickets.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Summons Number: string (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: string (nullable = true)
 |-- Street Code2: string (nullable = true)
 |-- Street Code3: string (nullable = true)
 |-- Vehicle Expiration Date: string (nullable = true)
 |-- Violation Location: string (nullable = true)
 |-- Violation Precinct: string (nullable = true)
 |-- Issuer Precinct: string (nullable = true)
 |-- Issuer Code: string (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation County: str

In [3]:
# Query to see if there are tickets belonging to years other than 2017
tickets.select('Issue Date').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
|Issue Date|
+----------+
|07/10/2016|
|07/08/2016|
|08/23/2016|
|06/14/2017|
|11/21/2016|
|06/13/2017|
|08/03/2016|
|12/21/2016|
|11/21/2016|
|10/05/2016|
|01/11/2017|
|09/27/2016|
|10/27/2016|
|09/30/2016|
|02/04/2017|
|07/07/2016|
|09/24/2016|
|01/26/2017|
|04/30/2017|
|02/03/2017|
+----------+
only showing top 20 rows

In [4]:
# import sql functions
from pyspark.sql.functions import *

# filter the tickets whose issue date is in 2017 and verify the output
tickets_df = tickets.filter(year(to_date(col('Issue Date'), "MM/dd/yyyy")) == 2017)
tickets_df.select('Issue Date').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+
|Issue Date|
+----------+
|06/14/2017|
|06/13/2017|
|01/11/2017|
|02/04/2017|
|01/26/2017|
|04/30/2017|
|02/03/2017|
|05/27/2017|
|05/31/2017|
|05/26/2017|
|05/19/2017|
|06/09/2017|
|01/20/2017|
|04/13/2017|
|01/05/2017|
|02/18/2017|
|06/14/2017|
|01/29/2017|
|01/25/2017|
|06/12/2017|
+----------+
only showing top 20 rows

In [5]:
# Solution: Total number of tickets for the year
tickets_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

5431918

### Q2. Find out the number of unique states from where the cars that got parking tickets came.

In [6]:
# Solution: Number of unique states
tickets_df.select(countDistinct('Registration State').alias("Unique States")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+
|Unique States|
+-------------+
|           65|
+-------------+

### Q3. There is a numeric entry '99' in the column, which should be corrected. Replace it with the state having the maximum entries. Provide the number of unique states again.

In [7]:
# find states with maximum entries
reg_state = tickets_df.groupBy('Registration State').count().sort(desc('count')).select('Registration State').first()['Registration State']
reg_state

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'NY'

In [8]:
# Replace '99' with the state having maxmimum entries i.e. NY
tickets_df = tickets_df.withColumn('Registration State', when(col('Registration State') == "99", reg_state).otherwise(col('Registration State')))

# Find number of unique states again.
tickets_df.select(countDistinct('Registration State').alias("Unique States")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+
|Unique States|
+-------------+
|           64|
+-------------+

### Q4. Display the top 20 states with the most number of tickets along with their ticket count.

In [9]:
# Solution: Please note that every Summons Number will belong to a unique Registration State, therefore querying just on Registration state will get the result.
tickets_df.groupBy('Registration State').count().sort(desc('count')).show(20)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+-------+
|Registration State|  count|
+------------------+-------+
|                NY|4290006|
|                NJ| 475825|
|                PA| 140286|
|                CT|  70403|
|                FL|  69468|
|                IN|  45525|
|                MA|  38941|
|                VA|  34367|
|                MD|  30213|
|                NC|  27152|
|                TX|  18827|
|                IL|  18666|
|                GA|  17537|
|                AZ|  12379|
|                OH|  12281|
|                CA|  12153|
|                ME|  10806|
|                SC|  10395|
|                MN|  10083|
|                OK|   9088|
+------------------+-------+
only showing top 20 rows

## Section 2. Aggregation tasks
### Q1. How often does each violation code occur? Display the frequency of the top five violation codes.

In [10]:
# Solution:
tickets_df.groupBy('Violation Code').count().withColumnRenamed('count', 'frequency').sort(desc('frequency')).show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+---------+
|Violation Code|frequency|
+--------------+---------+
|            21|   768087|
|            36|   662765|
|            38|   542079|
|            14|   476664|
|            20|   319646|
+--------------+---------+
only showing top 5 rows

### Q2. How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? Find the top 5 for both.

In [11]:
#Solution: Top 5 for Vehicle Body Type
tickets_df.groupBy('Vehicle Body Type').count().sort(desc('count')).show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+-------+
|Vehicle Body Type|  count|
+-----------------+-------+
|             SUBN|1883954|
|             4DSD|1547312|
|              VAN| 724029|
|             DELV| 358984|
|              SDN| 194197|
+-----------------+-------+
only showing top 5 rows

In [12]:
#Solution: Top 5 for Vehicle Make
tickets_df.groupBy('Vehicle Make').count().sort(desc('count')).show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------+
|Vehicle Make| count|
+------------+------+
|        FORD|636844|
|       TOYOT|605291|
|       HONDA|538884|
|       NISSA|462017|
|       CHEVR|356032|
+------------+------+
only showing top 5 rows

### Q3. Let’s try and find some seasonality in this data:
#### Step 1. First, divide the year into 4 seasons, and find the frequencies of tickets for each season.

In [13]:
# creating a season column and assigning seasons based on month
tickets_df = tickets_df.withColumn("Month", month(to_date(col('Issue Date'), "MM/dd/yyyy"))).withColumn("Season", when(col('Month') > 9, "Autumn")
                                .when(col('Month') > 6, "Summer")
                                .when(col('Month') > 3, "Spring")
                                .otherwise("Winter")).drop("Month")

# show to verify data
tickets_df.select('Issue Date', 'Season').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------+
|Issue Date|Season|
+----------+------+
|06/14/2017|Spring|
|06/13/2017|Spring|
|01/11/2017|Winter|
|02/04/2017|Winter|
|01/26/2017|Winter|
|04/30/2017|Spring|
|02/03/2017|Winter|
|05/27/2017|Spring|
|05/31/2017|Spring|
|05/26/2017|Spring|
|05/19/2017|Spring|
|06/09/2017|Spring|
|01/20/2017|Winter|
|04/13/2017|Spring|
|01/05/2017|Winter|
|02/18/2017|Winter|
|06/14/2017|Spring|
|01/29/2017|Winter|
|01/25/2017|Winter|
|06/12/2017|Spring|
+----------+------+
only showing top 20 rows

#### Step 2. Then, find the three most common violations for each of these seasons.

In [14]:
# Solution: Winter
tickets_df.groupBy('Season', 'Violation Code').count().sort(desc('count')).filter(col('Season') == "Winter").show(3)

# Solution: Spring
tickets_df.groupBy('Season', 'Violation Code').count().sort(desc('count')).filter(col('Season') == "Spring").show(3)

# Solution: Summer
tickets_df.groupBy('Season', 'Violation Code').count().sort(desc('count')).filter(col('Season') == "Summer").show(3)

# Solution: Autumn
tickets_df.groupBy('Season', 'Violation Code').count().sort(desc('count')).filter(col('Season') == "Autumn").show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+--------------+------+
|Season|Violation Code| count|
+------+--------------+------+
|Winter|            21|373874|
|Winter|            36|348240|
|Winter|            38|287000|
+------+--------------+------+
only showing top 3 rows

+------+--------------+------+
|Season|Violation Code| count|
+------+--------------+------+
|Spring|            21|393885|
|Spring|            36|314525|
|Spring|            38|255064|
+------+--------------+------+
only showing top 3 rows

+------+--------------+-----+
|Season|Violation Code|count|
+------+--------------+-----+
|Summer|            21|  228|
|Summer|            46|  219|
|Summer|            40|  109|
+------+--------------+-----+
only showing top 3 rows

+------+--------------+-----+
|Season|Violation Code|count|
+------+--------------+-----+
|Autumn|            46|  219|
|Autumn|            40|  121|
|Autumn|            21|  100|
+------+--------------+-----+
only showing top 3 rows

### Q4. The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:
#### Step 1. Find the total occurrences of the three most common violation codes.

In [15]:
# Creating a new df to do calculations based on revenue for which count will be required for the next steps.
amount_collection = tickets_df.groupBy('Violation Code').count().sort(desc('count')).limit(3)

# Solution: Top 3 violation codes
amount_collection.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+------+
|Violation Code| count|
+--------------+------+
|            21|768087|
|            36|662765|
|            38|542079|
+--------------+------+

#### Step 2. Then, visit the website: http://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page It lists the fines associated with different violation codes. They’re divided into two categories: one for the highest-density locations in the city and the other for the rest of the city. For the sake of simplicity, take the average of the two.
#### Step 3. Using this information, find the total amount collected for each of the three violation codes with the maximum tickets. State the code that has the highest total collection (only based on the top 3 tickets).

In [16]:
# Create collection column to hold the total collected amount for each of the top 3 violation codes
# Violation Code 21 - $65
# Violation Code 36 - $50
# Violation Code 38 - $50 (avg.)
amount_collection = amount_collection.withColumn('Collection', when(col('Violation Code') == 21, col('count') * 65)
                                                .when(col('Violation Code') == 36, col('count') * 50)
                                                .when(col('Violation Code') == 38, col('count') * 50))
# Solution: Total amount collected for each of the top 3 violations
amount_collection.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+------+----------+
|Violation Code| count|Collection|
+--------------+------+----------+
|            21|768087|  49925655|
|            36|662765|  33138250|
|            38|542079|  27103950|
+--------------+------+----------+

In [17]:
#Solution: Violation code that has highest total collection
amount_collection.select(col('Violation Code').alias('Violation Code with highest total collection')).where(col('Collection') == amount_collection.groupBy().max('Collection').first()['max(Collection)']).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------------------------+
|Violation Code with highest total collection|
+--------------------------------------------+
|                                          21|
+--------------------------------------------+

#### Step 4. Find the top 3 states that have the highest ticket revenue based on the top 3 violation codes alone.

In [18]:
#Solution: create revenue column and for each violation code assign revenue for each state and show top 3
tickets_df.filter(col('Violation Code').isin(21, 36, 38)).groupBy('Registration State', 'Violation Code').count().withColumn("Revenue", when(col('Violation Code') == 21, col('count') * 65)
                                                                                                                            .when(col('Violation Code') == 36, col('count') * 50)
                                                                                                                            .when(col('Violation Code') == 38, col('count') * 50)).groupBy('Registration State').sum('Revenue').sort(desc('sum(Revenue)')).show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+------------+
|Registration State|sum(Revenue)|
+------------------+------------+
|                NY|    87901590|
|                NJ|     7368020|
|                PA|     3413995|
+------------------+------------+
only showing top 3 rows

#### Step 5. What can you intuitively infer from these findings?

1. A lot of vehicles registered in NJ and PA visit NYC
2. Violation code 21 is the highest for all seasons except for Autumn
3. Violation codes occur in the same order during Winter and Spring