# NYC Parking Tickets: An Exploratory Analysis

New York City is a thriving metropolis. Just like most other metros its size, one of the biggest problems its citizens face is parking. The classic combination of a huge number of cars and cramped geography leads to a huge number of parking tickets. 

In an attempt to scientifically analyse this phenomenon, the NYC Police Department has collected data for parking tickets. Of these, the data files for multiple years are publicly available. it is required to perform some exploratory analysis on a part of this data. Spark will allow us to analyse the full files at high speeds as opposed to taking a series of random samples that will approximate the population. For the scope of this analysis, we will analyse the parking tickets over the year 2017.

The purpose of this case study is to conduct an exploratory data analysis that will help you understand the data. The questions given below will guide your analysis.




In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre"
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

In [2]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('demo').master("local").getOrCreate()
spark

In [4]:
sc = spark.sparkContext

In [None]:
sc

In [None]:
## reading the data

#For loading data from HDFS
#nyc = spark.read.format('csv').options(header='true',inferschema='true').load("/nycdata.csv")

#For loading data from S3
s3path = 's3a://upgrad-data/Parking_Violation_Tickets.csv'
nyc = spark.read.format("csv").option("header", "true").load(s3path)

nyc


In [None]:
## inspecting few lines of data

nyc.show(5)

In [None]:
## checking the schema for the dataframe

nyc.schema

In [None]:
## printing the schema

nyc.printSchema()

In [None]:
## checking overall count

nyc.count()

In [None]:
## checking distinct count

nyc.distinct().count()

It shows that there are no duplicates in the file and the dataframe.

In [None]:
## extracting the data for 2017

from pyspark.sql.functions import year

nyc2017 = nyc.filter(year(nyc['Issue Date']) == 2017)
nyc2017.show(5)

In [None]:
nyc2017.count()

The above count shows the total number of records for year 2017.

In [None]:
## extracting data appropriately and creating new columns

from pyspark.sql.functions import col, when
from pyspark.sql.functions import unix_timestamp

## extacting hours, minutes and AM/PM values from Violation Time column
nyc2017 = nyc2017.withColumn("Violation Hour", col("Violation Time").substr(1,2).cast("int"))
nyc2017 = nyc2017.withColumn("Violation Minutes", col("Violation Time").substr(3,2).cast("int"))
nyc2017 = nyc2017.withColumn("Violation ampm", col("Violation Time").substr(5,1))

## converting AM / PM time to absolute hours ranging from 1-24
## here we are adding 12 to pm values provided the time is not between 12 and 13 (12:00 pm to 1:00 pm)
nyc2017 = nyc2017.withColumn("Violation Hour",when(col("Violation Hour")==12,when(col("Violation ampm")=='A',0).otherwise(12)) \
                             .otherwise(when(col("Violation ampm")=='P', col("Violation Hour")+12) \
                                        .otherwise(col("Violation Hour"))))

## creating the unix timestamp using date and time wchich can be used later
nyc2017 = nyc2017.withColumn("Long Time", unix_timestamp(nyc2017["Issue Date"])+ \
                             (nyc2017["Violation Hour"]*3600) + (nyc2017["Violation Minutes"]*60))

## converting Issue Date columns from timestamp to date to use it in future as date
nyc2017 = nyc2017.withColumn("Issue Date", col("Issue Date").cast("date"))

## after extracting the information the these columns are not required and hence dropped
nyc2017 = nyc2017.drop("Violation Time","Violation ampm")

## printing the schema now
nyc2017.printSchema()

### Data validation and cleaning

In [None]:
## checking max min Violation hours we computed in dataframe

from pyspark.sql.functions import min, max

nyc2017.select(max("Violation Hour"), min("Violation Hour")).show()

In [None]:
## checking count of incorrect Violation hours which are greater than or equal to 24

nyc2017.filter(col("Violation Hour")>=24).count()

In [None]:
## checking the count of records before dropping eror records

nyc2017.count()

In [None]:
## removing incorrect values of Violation Hour (which are quiet less in number than overall count) from dataframe

nyc2017 = nyc2017.filter(col("Violation Hour")<24)
nyc2017.show(5)

In [None]:
## checking the record counts after deleting  error records for Violation Hour

nyc2017.count()

In [None]:
## checking max min Violation hours we computed in dataframe

nyc2017.select(max("Violation Minutes"), min("Violation Minutes")).show()

In [None]:
## checking for null values count

from pyspark.sql.functions import isnan, count

nyc2017.select([count(when(col(c).isNull(), c)).alias(c) for c in nyc2017.columns]).show()

In [None]:
## dropping the null values (their count is very less)

nyc2017 = nyc2017.na.drop()
nyc2017.count()

In [None]:
## getting the count for the 'NA' values 

nyc2017.select([count(when(col(c)=='NA', c)).alias(c) for c in nyc2017.columns]).show()

## Examine the data

### Q1 Find the total number of tickets for the year.

In [None]:
## checking the count for tickets (we already know that records are distinct)
## after initial data analysis and removing null data records

nyc2017.count()

Total number of tickets for 2017 are 5431834. This number is obtained after some (very less) number of records were removed due to error in data due to null values. Initially the number of records were 5431918.

### Q2 Find out the number of unique states from where the cars that got parking tickets came. 

There is a numeric entry '99' in the column, which should be corrected. Replace it with the state having the maximum entries. Provide the number of unique states again.

In [None]:
## extracting distinct Registration State

from pyspark.sql.functions import countDistinct

nyc2017.select(countDistinct(col("Registration State"))).collect()

In [None]:
## checking ticket counts across states

nyc2017_ticket_across_states = nyc2017.select("Registration State").groupBy("Registration State") \
                                    .count().sort("count",ascending=False)

nyc2017_ticket_across_states.collect()

From the above command we observed that the Registration State 99 which is errored contain 16054 records.

In [None]:
## replacing the Registration state having '99' value with 'NY' state (state with max entries)

nyc2017 = nyc2017.withColumn("Registration State",when(nyc2017["Registration State"]=='99','NY') \
                             .otherwise(nyc2017["Registration State"]))

## again extracting distinct Registration State
nyc2017.select(countDistinct(col("Registration State"))).collect()

Initial number of distinct states obtained were 65 of which one was errored(with value 0). The error values were replaced with most frequent state 'NY' and then the count of distinct states was reduced to 64.

#### Let's plot ticket counts across registration states to check the distribution now after data correction

In [None]:
## To plot data first we need to create a data frame with Registration State and count
import pandas as pd
nyc2017_ticket_across_states = nyc2017.select("Registration State").groupBy("Registration State") \
                                    .count().sort("count",ascending=False)
pdDF = nyc2017_ticket_across_states.toPandas()
pdDF.head()

### Q3 Display the top 20 stateswith the most no of ticketsalong with their ticket count .

In [None]:
import matplotlib.pyplot as plt
pdDF.plot(x='Registration State',y='count',kind='bar',figsize=(20, 8),legend=None)
plt.yscale('log')
plt.xlabel('Registration State')
plt.ylabel('Count')
plt.title('Ticket counts across registration states')
plt.show()

#NY>NJ>PA>CT>FL>IN>MAVA>MD>NC>TX>IL>GA>AZ>OH>CA>ME>SC>MN>OK  are top 20 states
#Values also displayed

## Aggregation tasks

###  Q1 How often does each violation code occur? Display the frequency of the top five violation codes.

In [None]:
## register dataframe to temp table

nyc2017.createOrReplaceTempView("NYC2017")

In [None]:
## extracting the most frequent violation code along with their annual count

nyc2017_violation_code_count = spark.sql("SELECT \
                                                 `Violation Code`, \
                                                 count(*) as frequency_per_year \
                                            FROM \
                                                 NYC2017 \
                                            GROUP BY \
                                                 `Violation Code`\
                                             ORDER BY \
                                                 frequency_per_year DESC")


## displaying the top 5 violation codes

nyc2017_violation_code_count.show(5)

#### Plot to see frequency of top 5 violation code across year

In [None]:
## To plot data first we need to create a data frame 
pdDF = nyc2017_violation_code_count.toPandas()

## plot to show distribution of ticket counts across different states
plt.clf()
pdDF[0:5].plot(x='Violation Code',y='frequency_per_year',kind='bar',figsize=(20, 8),legend=None)
plt.yscale('log')
plt.xlabel('Violation Code')
plt.ylabel('frequency')
plt.title('Violation Code frequency')
plt.show()

### Q2 How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? 

In [None]:
## extracting the most frequent Vehicle Body Type along with annual count

nyc2017_vehicle_body_type_count = spark.sql("SELECT \
                                                 `Vehicle Body Type`, \
                                                 count(*) as frequency_per_year \
                                            FROM \
                                                 NYC2017 \
                                            GROUP BY \
                                                 `Vehicle Body Type`\
                                             ORDER BY \
                                                 frequency_per_year DESC")


## displaying the top 5 Vehicle Body Type

nyc2017_vehicle_body_type_count.show(5)

Top five Vehicle Body Type are SUBN, 4DSD, VAN, DELV, SDN.

In [None]:
## To plot data first we need to create a data frame 
pdDF = nyc2017_vehicle_body_type_count.toPandas()

pdDF.count()

In [None]:
## plot to show distribution of ticket counts across top 5 vehicle body
plt.clf()
pdDF[0:5].plot(x='Vehicle Body Type',y='frequency_per_year',kind='bar',figsize=(20, 8),legend=None)
plt.yscale('log')
plt.xlabel('Vehicle Body Type')
plt.ylabel('frequency')
plt.title('Vehicle Body Type frequency')
plt.show()

In [None]:
## extracting the most frequent Vehicle Make along with annual count

nyc2017_vehicle_make_count = spark.sql("SELECT \
                                                 `Vehicle Make`, \
                                                 count(*) as frequency_per_year \
                                            FROM \
                                                 NYC2017 \
                                            GROUP BY \
                                                 `Vehicle Make`\
                                             ORDER BY \
                                                 frequency_per_year DESC")


## displaying the top 5 Vehicle Make

nyc2017_vehicle_make_count.show(5)

Top five Vehicle Make are FORD, TOYOT, HONDA, NISSA, CHEVR. Their frequencies per year are mentioned above.

In [None]:
## To plot data first we need to create a data frame 
pdDF = nyc2017_vehicle_make_count.toPandas()
## plot to show distribution of ticket counts across top 5 Vehicle Make
plt.clf()
pdDF[0:5].plot(x='Vehicle Make',y='frequency_per_year',kind='bar',figsize=(20, 8),legend=None)
plt.yscale('log')
plt.xlabel('Vehicle Make')
plt.ylabel('frequency')
plt.title('Vehicle Make frequency')
plt.show()

### Q3 Let’s try and find some seasonality in this data:

- First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season.

- Then, find the three most common violations for each of these seasons.

In [None]:
from pyspark.sql.functions import month

nyc2017 = nyc2017.withColumn("Issue Month", month(col("Issue Date")))


## creating bucket borders as seasons across 12 months

seasonBucketBorders = [1,4,7,10,13]
seasonBucket = Bucketizer().setSplits(seasonBucketBorders).setInputCol("Issue Month") \
                    .setOutputCol("Season Bucket")
nyc2017 = seasonBucket.transform(nyc2017)

nyc2017 = nyc2017.withColumn("season", when(col("Season Bucket")==0,"WINTER") \
                                         .otherwise(when(col("Season Bucket")==1,"SPRING") \
                                         .otherwise(when(col("Season Bucket")==2,"SUMMER") \
                                         .otherwise("AUTUMN"))))

nyc2017.show(5)

The seasons have been assigned based on the month values for Issue date. For months 1-3, season is WINTER, for months 4-6 season is SPRING, for months 7-9 season is SUMMER and for months 10-12 season is AUTUMN.

In [None]:
## refresh dataframe to temp table so as to take up new column created in dataframe

nyc2017.createOrReplaceTempView("NYC2017")

In [None]:
## frequencies across seasons

nyc2017_season_ticket_count = spark.sql("SELECT \
                                                 season, \
                                                 count(1) as frequency_per_year \
                                            FROM \
                                                 NYC2017 \
                                            GROUP BY \
                                                 season\
                                             ORDER BY \
                                                 frequency_per_year DESC")

nyc2017_season_ticket_count.show()

The frequencies for the tickets across each season are mentioned above.

In [None]:
## To plot data first we need to create a data frame 
pdDF = nyc2017_season_ticket_count.toPandas()
## plot to show distribution of ticket counts across different states
plt.clf()
pdDF.plot(x='season',y='frequency_per_year',kind='bar',figsize=(15, 8),legend=None)
plt.xlabel('season')
plt.ylabel('frequency')
plt.yscale('log')
plt.title('season frequency')
plt.show()

### THEN,find the threemost common violations for each of these seasons.

In [None]:
## evaulating top 3 violation code across all the seasons

## first grouping the data based on season and Violation Code

nyc2017_season_violation_code_group = spark.sql("SELECT \
                                                     season, \
                                                     `Violation Code`, \
                                                     count(*) as frequency_per_year \
                                            FROM \
                                                     NYC2017 \
                                            GROUP BY \
                                                     season,`Violation Code`")

## creating a sql temp view

nyc2017_season_violation_code_group.createOrReplaceTempView("NYC2017_SEASON_VIOLATION_CODE_GROUP")

                                                
## finally evauluating top3 violation code for each bucket

nyc2017_season_violation_code_top3 = spark.sql("SELECT \
                                                        season,\
                                                        `Violation Code`, \
                                                        frequency_per_year,\
                                                        rank_top3\
                                                FROM ( \
                                                        SELECT \
                                                                season,\
                                                                `Violation Code`, \
                                                                frequency_per_year, \
                                                                dense_rank() OVER(PARTITION BY season \
                                                                        ORDER BY frequency_per_year DESC) AS rank_top3  \
                                                        FROM \
                                                                NYC2017_SEASON_VIOLATION_CODE_GROUP \
                                                    )  \
                                                WHERE rank_top3 <= 3")
                                    
nyc2017_season_violation_code_top3.show()

Top 3 validation codes across each season are mentioned above.

In [None]:
pdDF = nyc2017_season_violation_code_top3.toPandas()
import seaborn as sns
plt.figure(figsize=(12, 8))
ax = sns.barplot(x='Violation Code', y='frequency_per_year',hue='season', data=pdDF)
ax.set_ylabel("Frequency", fontsize=18);
ax.set_title('Top 3 violation code for each season',fontsize=15)
ax.set_xlabel("violation code", fontsize=18);
plt.legend(loc='center left', bbox_to_anchor=(1.0, 0.5))
plt.yscale('log')
plt.show()

In [None]:
Season Violation Code Frequency/Year Rank
WINTER 21              373,862      1
WINTER 36              348,240      2
WINTER 38              286,999      3
SPRING 21              393,866      1
SPRING 36              314,525      2
SPRING 38              255,064      3
SUMMER 21                  228      1
SUMMER 46                  219      2
SUMMER 40                  109      3
AUTUMN 46                  219      1
AUTUMN 40                  121      2
AUTUMN 21                  100      3

### Q4 The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:

- Find the total occurrences of the three most common violation codes.

- Then, visit the website:

http://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page

- It lists the fines associated with different violation codes. They’re divided into two categories: one for the highest-density locations in the city and the other for the rest of the city. For the sake of simplicity, take the average of the two.

- Using this information, find the total amount collected for the three violation codes with the maximum tickets. State the code that has the highest total collection.

- What can you intuitively infer from these findings?

In [None]:
## Total occurances of top 3 violation codes

## top 3 violation codes are 21, 36, 38

nyc2017_top3_violation_count = nyc2017.filter(col("Violation Code").isin(21,36,38)).groupBy(col("Violation Code")).count()
nyc2017_top3_violation_count.show()

##### For the top 3 violation codes avrage fine are as below :

- Average fine for Violation code 21 : (65+45)/2  = 55 `$`
- Average fine for Violation code 36 : (50+50)/2  = 50 `$`
- Average fine for Violation code 38 : (65+35)/2  = 50 `$`

In [None]:
## now first we need to enter above information into a new column in dataframe

## average fine amount for violation code 36 and 38 is 50 and average amount for violation code 21 is 55

nyc2017_top3_violation_amount = nyc2017_top3_violation_count.withColumn("Average fine per violation",\
                                                                        when(col("Violation Code")==21,55)
                                                                        .otherwise(50))
nyc2017_top3_violation_amount.show()

In [None]:
## now evaluating the total amount collected

from pyspark.sql.functions import sum

## to get the amount collected for each violation code we need to multiply the average amount collected with the 
## total conunt for each violation code and then we need to to sum up all the product values obtained

nyc2017_total_revenue_top3_violation_code = nyc2017_top3_violation_amount.select(sum(col("count")*\
                                                                                      col("Average fine per violation")))
nyc2017_total_revenue_top3_violation_code.show()