## NYC Parking Case Study 

The purpose of this case study is to conduct an exploratory data analysis that will help us understand the classic combination of a huge number of cars and cramped geography leading to the biggest problem of New York, that is parking.

In an attempt to scientifically analyse this phenomenon, the NYC Police Department has collected data for parking tickets.

We will perform some exploratory analysis on a part of this data. Spark will allow us to analyse the full files at high speeds as opposed to taking a series of random samples that will approximate the population. For the scope of this analysis, we will analyse the parking tickets over the year 2017.

In [1]:
# A SparkSession is used to create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and 
# read parquet files.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NYC Taxi Parking").getOrCreate()

In [2]:
# Reading the data file
nyc_taxi = spark.read.format("csv").option("header", "true").option("inferSchema", "true")\
                             .load("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")
nyc_taxi.cache()

DataFrame[Summons Number: bigint, Plate ID: string, Registration State: string, Issue Date: timestamp, Violation Code: int, Vehicle Body Type: string, Vehicle Make: string, Violation Precinct: int, Issuer Precinct: int, Violation Time: string]

In [3]:
# Display top 5 records
nyc_taxi.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|         

In [4]:
# Number of rows in the dataset
nyc_taxi.count()

10803028

In [5]:
# Number of columns in the dataset
len(nyc_taxi.columns)

10

In [6]:
# Since Column Names of the dataframes have spaces in between consecutive words it may lead to typos and coding issues. 
# Therefore, we will eliminate all white spaces and seperate consecutive words with "_"
nyc_taxi = nyc_taxi.toDF(*[col.replace(' ','_') for col in nyc_taxi.columns])
nyc_taxi.columns

['Summons_Number',
 'Plate_ID',
 'Registration_State',
 'Issue_Date',
 'Violation_Code',
 'Vehicle_Body_Type',
 'Vehicle_Make',
 'Violation_Precinct',
 'Issuer_Precinct',
 'Violation_Time']

In [7]:
# Display top five records
nyc_taxi.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons_Number|Plate_ID|Registration_State|         Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Violation_Precinct|Issuer_Precinct|Violation_Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|         

In [8]:
# Check the schema of the dataframe
nyc_taxi.printSchema()

root
 |-- Summons_Number: long (nullable = true)
 |-- Plate_ID: string (nullable = true)
 |-- Registration_State: string (nullable = true)
 |-- Issue_Date: timestamp (nullable = true)
 |-- Violation_Code: integer (nullable = true)
 |-- Vehicle_Body_Type: string (nullable = true)
 |-- Vehicle_Make: string (nullable = true)
 |-- Violation_Precinct: integer (nullable = true)
 |-- Issuer_Precinct: integer (nullable = true)
 |-- Violation_Time: string (nullable = true)



In [9]:
# register DataFrame as temp table
nyc_taxi.createOrReplaceTempView("dfTable")

### Examining the Data

**1. Find the total number of tickets for the year.**

In [10]:
spark.sql('SELECT COUNT(1) FROM dfTable').show()

+--------+
|count(1)|
+--------+
|10803028|
+--------+



In [11]:
# Check the years in dataset
spark.sql('SELECT EXTRACT(YEAR FROM CAST(Issue_Date as DATE)) as issue_year,count(*) as num_records FROM dfTable group by issue_year order by num_records desc').show()

+----------+-----------+
|issue_year|num_records|
+----------+-----------+
|      2017|    5431918|
|      2016|    5368391|
|      2018|       1057|
|      2019|        472|
|      2015|        419|
|      2000|        185|
|      2014|        120|
|      2012|         87|
|      2013|         70|
|      2027|         50|
|      2010|         48|
|      2026|         24|
|      2020|         22|
|      2011|         22|
|      2021|         22|
|      2007|         18|
|      2030|         12|
|      2006|          8|
|      2028|          8|
|      2025|          6|
+----------+-----------+
only showing top 20 rows



#### Assumption:
Though there are records for year other than 2017, we are going with the assumption that the whole dataset belongs to year 2017. So total number of tickets is 10803028

**2. Find out the number of unique states from where the cars that got parking tickets came.**

In [12]:
# Displaying the registration states that got the tickets
spark.sql('SELECT distinct Registration_State FROM dfTable').show()

+------------------+
|Registration_State|
+------------------+
|                AZ|
|                SC|
|                NS|
|                LA|
|                MN|
|                NJ|
|                MX|
|                DC|
|                OR|
|                99|
|                NT|
|                VA|
|                RI|
|                WY|
|                KY|
|                BC|
|                NH|
|                MI|
|                GV|
|                NV|
+------------------+
only showing top 20 rows



In [13]:
# Finding the total number of distinct registration states from which the cars got the tickets
spark.sql('SELECT COUNT(distinct Registration_State) FROM dfTable').show()

+----------------------------------+
|count(DISTINCT Registration_State)|
+----------------------------------+
|                                67|
+----------------------------------+



In [14]:
# Displaying the number of tickets state-wise in a descending order
spark.sql('SELECT Registration_State,COUNT(1) as num_tickets FROM dfTable group by Registration_State order by num_tickets desc').show()

+------------------+-----------+
|Registration_State|num_tickets|
+------------------+-----------+
|                NY|    8481061|
|                NJ|     925965|
|                PA|     285419|
|                FL|     144556|
|                CT|     141088|
|                MA|      85547|
|                IN|      80749|
|                VA|      72626|
|                MD|      61800|
|                NC|      55806|
|                IL|      37329|
|                GA|      36852|
|                99|      36625|
|                TX|      36516|
|                AZ|      26426|
|                OH|      25302|
|                CA|      24260|
|                SC|      21836|
|                ME|      21574|
|                MN|      18227|
+------------------+-----------+
only showing top 20 rows



It can be observed from the above table that New York(NY) state has the highest number of tickets for faulty car parking of approx 84 Lakhs. But, there is a numeric entry in the registration_state column with state code as 99, which must be erroneous.
We will replace it with the state having the maximum entries i.e NY.

In [15]:
# Replacing the state code 99 with NY
nyc_taxi = spark.sql(
    "SELECT Summons_Number,\
            Plate_ID,\
            CASE WHEN Registration_State = 99 THEN 'NY' ELSE Registration_State END AS  Registration_State,\
            Issue_Date,\
            Violation_Code,\
            Vehicle_Body_Type,\
            Vehicle_Make,\
            Violation_Precinct,\
            Issuer_Precinct,\
            Violation_Time\
    FROM dfTable")

In [16]:
from pyspark.sql.functions import col
nyc_taxi.where(col("Registration_State")=='99').count()

0

In [17]:
nyc_taxi.createOrReplaceTempView("dfTable")

In [18]:
# Displaying the registration states after handling erroneous data
spark.sql('SELECT distinct Registration_State FROM dfTable').show()

+------------------+
|Registration_State|
+------------------+
|                AZ|
|                SC|
|                NS|
|                LA|
|                MN|
|                NJ|
|                MX|
|                DC|
|                OR|
|                NT|
|                VA|
|                RI|
|                KY|
|                WY|
|                BC|
|                NH|
|                MI|
|                GV|
|                NV|
|                QB|
+------------------+
only showing top 20 rows



In [19]:
# Distinct count of registration states after changing state code 99 to NY
spark.sql('SELECT COUNT(distinct Registration_State) FROM dfTable').show()

+----------------------------------+
|count(DISTINCT Registration_State)|
+----------------------------------+
|                                66|
+----------------------------------+



After data manipulation, the count of registration states has decreased by 1 from earlier count of 67 to present count of 66. This is due to the replacement of registartion_state 99 with NY.

### Aggregation tasks

**1. How often does each violation code occur? Display the frequency of the top five violation codes.**

In [20]:
spark.sql('SELECT Violation_Code,count(*) as Frequency_of_Tickets FROM dfTable group by Violation_Code order by Frequency_of_Tickets desc').show(5)

+--------------+--------------------+
|Violation_Code|Frequency_of_Tickets|
+--------------+--------------------+
|            21|             1528588|
|            36|             1400614|
|            38|             1062304|
|            14|              893498|
|            20|              618593|
+--------------+--------------------+
only showing top 5 rows



The highest number of tickets falls under the Violation code 21 which is **Street Cleaning: No parking where parking is not allowed by sign, street marking or traffic control device.**

**2. How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'?**

In [21]:
spark.sql('SELECT Vehicle_Body_Type,count(*) as Frequency_of_Tickets FROM dfTable group by Vehicle_Body_Type order by Frequency_of_Tickets desc').show(5)

+-----------------+--------------------+
|Vehicle_Body_Type|Frequency_of_Tickets|
+-----------------+--------------------+
|             SUBN|             3719802|
|             4DSD|             3082020|
|              VAN|             1411970|
|             DELV|              687330|
|              SDN|              438191|
+-----------------+--------------------+
only showing top 5 rows



The Vehicle Body Type Codes have the following interpretation:

**[1] SUBN-The law defines a suburban as a vehicle that can be used to carry passengers and cargo. Vehicles that
can be registered with the suburban body type include station wagons, sport utility vehicles, hearses and
ambulances.**<br>
**[2] 4DSD-Four-door sedan**<br>
**[3] VAN-Type of road vehicle used for transporting goods or people.**<br>
**[4] DELV-Delivery Truck**<br>
**[5] SDN-Civilian Sedan**<br>

The SUBN body type results in the highest number of tickets.

In [22]:
spark.sql('SELECT Vehicle_Make,count(*) as Frequency_of_Tickets FROM dfTable group by Vehicle_Make order by Frequency_of_Tickets desc').show(5)

+------------+--------------------+
|Vehicle_Make|Frequency_of_Tickets|
+------------+--------------------+
|        FORD|             1280958|
|       TOYOT|             1211451|
|       HONDA|             1079238|
|       NISSA|              918590|
|       CHEVR|              714655|
+------------+--------------------+
only showing top 5 rows



Ford vehicles contribute to the highest frequency of Parking Tickets, followed by Totyota, Honda, Nissan and Chevrolet.

**3. A precinct is a police station that has a certain zone of the city under its command. Find the (5 highest) frequencies of tickets for each of the following:**

***Violation Precinct (Precinct of the zone where violation occurred)***

In [23]:
spark.sql('SELECT Violation_Precinct,count(*) as Frequency_of_Tickets FROM dfTable group by Violation_Precinct order by Frequency_of_Tickets desc').show(6)

+------------------+--------------------+
|Violation_Precinct|Frequency_of_Tickets|
+------------------+--------------------+
|                 0|             2072400|
|                19|              535671|
|                14|              352450|
|                 1|              331810|
|                18|              306920|
|               114|              296514|
+------------------+--------------------+
only showing top 6 rows



The Precinct named as 0 are erroneous entries. Ignoring that, the Violation Precinct 19 has the highest number of tickets. The top five highest frequency of tickets falls under precincts 19,14,1,18 and 114 respectively.

**Violation Code 19 : Bus Stop: Standing or parking where standing is not allowed by sign, street marking or; traffic control device.**<br>
**Violation Code 14 : General No Standing: Standing or parking where standing is not allowed by sign, street marking or; traffic control device.**<br>
**Violation Code 1 : Failure of an intercity bus to prominently display a copy of an intercity bus permit.**<br>
**Violation Code 18 : Bus Lane: Standing or parking where standing is not allowed by sign, street marking or; traffic control device.**<br>

It can be inferred that the most parking tickets are issued in places like Bus-stop, Bus Lane and No standing areas.

***Issuer Precinct (Precinct that issued the ticket)***

In [24]:
spark.sql('SELECT Issuer_Precinct,count(*) as Frequency_of_Tickets FROM dfTable group by Issuer_Precinct order by Frequency_of_Tickets desc').show(6)

+---------------+--------------------+
|Issuer_Precinct|Frequency_of_Tickets|
+---------------+--------------------+
|              0|             2388479|
|             19|              521513|
|             14|              344977|
|              1|              321170|
|             18|              296553|
|            114|              289950|
+---------------+--------------------+
only showing top 6 rows



The Precinct named as 0 are erroneous entries. Ignoring that, the Issuer Precinct 19 issued the highest number of tickets. The top five highest frequency of tickets are issued by precincts 19,14,1,18 and 114 respectively.

**4. Find the violation code frequencies for three precincts that have issued the most number of tickets. Do these precinct zones have an exceptionally high frequency of certain violation codes? Are these codes common across precincts?**

The top 3 ticket Issuing Precincts Codes ignoring precinct 0 as erroneous, are 19, 14 and 1.

In [25]:
# Violation Code Distribution in Issuer Precinct 19
spark.sql('SELECT Issuer_Precinct,Violation_Code,count(*) as Frequency_of_Tickets FROM dfTable \
           WHERE Issuer_Precinct = 19\
           group by Issuer_Precinct,Violation_Code order by Issuer_Precinct,Frequency_of_Tickets desc').show(5)

+---------------+--------------+--------------------+
|Issuer_Precinct|Violation_Code|Frequency_of_Tickets|
+---------------+--------------+--------------------+
|             19|            46|               86390|
|             19|            37|               72437|
|             19|            38|               72344|
|             19|            14|               57563|
|             19|            21|               54700|
+---------------+--------------+--------------------+
only showing top 5 rows



For Issuer Precinct 19,the highest number of tickets has been issued for violation code 46.

In [26]:
# Violation Code Distribution in Issuer Precinct 14
spark.sql('SELECT Issuer_Precinct,Violation_Code,count(*) as Frequency_of_Tickets FROM dfTable \
           WHERE Issuer_Precinct = 14\
           group by Issuer_Precinct,Violation_Code order by Issuer_Precinct,Frequency_of_Tickets desc').show(5)

+---------------+--------------+--------------------+
|Issuer_Precinct|Violation_Code|Frequency_of_Tickets|
+---------------+--------------+--------------------+
|             14|            14|               73837|
|             14|            69|               58026|
|             14|            31|               39857|
|             14|            47|               30540|
|             14|            42|               20663|
+---------------+--------------+--------------------+
only showing top 5 rows



For Issuer Precinct 14,the highest number of tickets has been issued for violation code 14.

In [27]:
# Violation Code Distribution in Issuer Precinct 1
spark.sql('SELECT Issuer_Precinct,Violation_Code,count(*) as Frequency_of_Tickets FROM dfTable \
           WHERE Issuer_Precinct = 1\
           group by Issuer_Precinct,Violation_Code order by Issuer_Precinct,Frequency_of_Tickets desc').show(5)

+---------------+--------------+--------------------+
|Issuer_Precinct|Violation_Code|Frequency_of_Tickets|
+---------------+--------------+--------------------+
|              1|            14|               73522|
|              1|            16|               38937|
|              1|            20|               27841|
|              1|            46|               22534|
|              1|            38|               16989|
+---------------+--------------+--------------------+
only showing top 5 rows



For Issuer Precinct 1,the highest number of tickets has been issued for violation code 14

In [28]:
# Violation Code Distribution in Other Issuer Precincts
spark.sql('SELECT Violation_Code,count(*) as Frequency_of_Tickets FROM dfTable \
           WHERE Issuer_Precinct NOT IN (0,19,14,1)\
           group by Violation_Code order by Frequency_of_Tickets desc').show(5)

+--------------+--------------------+
|Violation_Code|Frequency_of_Tickets|
+--------------+--------------------+
|            21|             1194971|
|            38|              966350|
|            14|              681488|
|            20|              553714|
|            37|              507848|
+--------------+--------------------+
only showing top 5 rows



For other Issuer precinct's, the highest number of tickets has been issued for violation code 21,38 and 14.

We aslo observed that among the top three precincts, precinct 14 and 1 has highest number of tickets issued for violation code 14. So violation code 14 which is General No Standing, can be cited as the most common violation code across different precincts.

**5. Find out the properties of parking violations across different times of the day:**

For this activity first we will be doing some data cleansing operations, like dropping null values if any, extract the hour portion from the violation_time column and divide it into different bins for analysis.

In [29]:
nyc_taxi.printSchema()

root
 |-- Summons_Number: long (nullable = true)
 |-- Plate_ID: string (nullable = true)
 |-- Registration_State: string (nullable = true)
 |-- Issue_Date: timestamp (nullable = true)
 |-- Violation_Code: integer (nullable = true)
 |-- Vehicle_Body_Type: string (nullable = true)
 |-- Vehicle_Make: string (nullable = true)
 |-- Violation_Precinct: integer (nullable = true)
 |-- Issuer_Precinct: integer (nullable = true)
 |-- Violation_Time: string (nullable = true)



In [30]:
# Check for any null values in any column
spark.sql('SELECT COUNT(1) FROM dfTable \
           WHERE Summons_Number IS NULL or Plate_ID is null or Registration_State is null\
           or Issue_Date is null or Violation_Code is null or Vehicle_Body_Type is null or Vehicle_Make is null\
           or Violation_Precinct is null or Issuer_Precinct is null or Violation_Time is null').show()

+--------+
|count(1)|
+--------+
|       0|
+--------+



We are assuming that only null values will indicate missing values. So as per the result of the above query there are no missing values in the dataset.

In [31]:
# Creating a new dataframe by dropping the null values, since there are no null values the count will remain same
nyc_taxi_dropnull = nyc_taxi.na.drop()

In [32]:
nyc_taxi_dropnull.count()

10803028

In [33]:
# Checking the violation time column
nyc_taxi_dropnull.select('Violation_Time').show()

+--------------+
|Violation_Time|
+--------------+
|         0143A|
|         0400P|
|         0233P|
|         1120A|
|         0555P|
|         0852P|
|         0215A|
|         0758A|
|         1005A|
|         0845A|
|         0015A|
|         0707A|
|         1022A|
|         1150A|
|         0525A|
|         0645P|
|         1122A|
|         0256P|
|         1232A|
|         1034A|
+--------------+
only showing top 20 rows



The violation time column has time in the format Hr+Min+TimeMeridiem. For eg: 0143A means 01Hr 43Mins AM. We will extract the hour, mins, and Meridiem part from the violation_time column and create separate columns for the same.

In [34]:
# Splitting the violation time in Hour Minutes and Meridiem columns
from pyspark.sql.functions import substring,concat,lit
nyc_taxi_final = nyc_taxi_dropnull.withColumn("Violation_Hour", substring("Violation_Time",1,2))
nyc_taxi_final = nyc_taxi_final.withColumn("Violation_Minute", substring("Violation_Time",3,2))
nyc_taxi_final = nyc_taxi_final.withColumn("Violation_AMPM", concat(substring("Violation_Time",5,1),lit("M")))

In [35]:
nyc_taxi_final.select('Violation_Time','Violation_Hour','Violation_Minute','Violation_AMPM').show()

+--------------+--------------+----------------+--------------+
|Violation_Time|Violation_Hour|Violation_Minute|Violation_AMPM|
+--------------+--------------+----------------+--------------+
|         0143A|            01|              43|            AM|
|         0400P|            04|              00|            PM|
|         0233P|            02|              33|            PM|
|         1120A|            11|              20|            AM|
|         0555P|            05|              55|            PM|
|         0852P|            08|              52|            PM|
|         0215A|            02|              15|            AM|
|         0758A|            07|              58|            AM|
|         1005A|            10|              05|            AM|
|         0845A|            08|              45|            AM|
|         0015A|            00|              15|            AM|
|         0707A|            07|              07|            AM|
|         1022A|            10|         

In [36]:
nyc_taxi_final.createOrReplaceTempView("dfTable")

In [37]:
# These are erroneous record having hour > 12
spark.sql('SELECT Violation_Time FROM dfTable WHERE Violation_Hour > 12').show()

+--------------+
|Violation_Time|
+--------------+
|         4733P|
|         2959P|
|         6815P|
|         1435P|
|         1820P|
|         8715P|
|         4930P|
|         5402P|
|         4317P|
|         5857P|
|         6820P|
|         3028P|
|         1305P|
|         5410P|
|         7640P|
|         2240P|
|         8735P|
|         6637P|
|         5959P|
|         8450P|
+--------------+
only showing top 20 rows



In [38]:
# Display all hours present in the dataset
spark.sql('SELECT distinct Violation_Hour FROM dfTable order by Violation_Hour').show(100)

+--------------+
|Violation_Hour|
+--------------+
|            .2|
|            .3|
|            .9|
|            0+|
|            0.|
|            00|
|            01|
|            02|
|            03|
|            04|
|            05|
|            06|
|            07|
|            08|
|            09|
|            10|
|            11|
|            12|
|            13|
|            14|
|            15|
|            16|
|            17|
|            18|
|            20|
|            21|
|            22|
|            23|
|            25|
|            26|
|            27|
|            28|
|            29|
|            30|
|            32|
|            33|
|            34|
|            36|
|            37|
|            38|
|            41|
|            43|
|            46|
|            47|
|            48|
|            49|
|            50|
|            51|
|            52|
|            54|
|            56|
|            57|
|            58|
|            59|
|            60|
|            6

It can be seen that there are lot of erroneous records with Hour as .2, .3, .9, 0+ etc. We will be ignoring them while segregating into bins.

In [39]:
spark.sql("SELECT Violation_Time FROM dfTable WHERE Violation_Hour = '.2'").show()

+--------------+
|Violation_Time|
+--------------+
|         .240P|
+--------------+



In [40]:
# There are records that have both 00xxAM as well as 12xxAM. Therefore we will replace all 00xxAM with 12xxAM
spark.sql("SELECT distinct Violation_Hour FROM dfTable WHERE Violation_AMPM = 'AM' ORDER BY Violation_Hour").show()

+--------------+
|Violation_Hour|
+--------------+
|            .3|
|            .9|
|            0+|
|            0.|
|            00|
|            01|
|            02|
|            03|
|            04|
|            05|
|            06|
|            07|
|            08|
|            09|
|            10|
|            11|
|            12|
+--------------+



In [41]:
spark.sql("SELECT distinct Violation_Hour FROM dfTable WHERE Violation_AMPM = 'PM' ORDER BY Violation_Hour").show()

+--------------+
|Violation_Hour|
+--------------+
|            .2|
|            0.|
|            00|
|            01|
|            02|
|            03|
|            04|
|            05|
|            06|
|            07|
|            08|
|            09|
|            10|
|            11|
|            12|
|            13|
|            14|
|            15|
|            16|
|            17|
+--------------+
only showing top 20 rows



We will be converting the 12 hour format into 24-hour format. During this process the erroneous rows will be replaced by null and rows having violation hour as 12 or 0 will be replaced by 0 hour and hour post 12 noon i.e PM will be replaced by hour+12. The data manipulation is done as follows:

In [42]:
spark.sql("SELECT Violation_Time,CAST(24_Violation_Hour AS INTEGER) 24_Violation_Hour\
           FROM\
           (\
           SELECT Violation_Time, \
           CASE WHEN Violation_AMPM = 'AM' AND (Violation_Hour = 12 OR Violation_Hour = 0) THEN 0\
                WHEN Violation_AMPM = 'AM' AND Violation_Hour BETWEEN 1 AND 11 THEN Violation_Hour\
                WHEN Violation_AMPM = 'PM' AND (Violation_Hour = 0 OR Violation_Hour = 12) THEN 12\
                WHEN Violation_AMPM = 'PM' AND Violation_Hour BETWEEN 1 AND 11 THEN Violation_Hour+12\
                ELSE NULL END 24_Violation_Hour\
           FROM dfTable)a").show()

+--------------+-----------------+
|Violation_Time|24_Violation_Hour|
+--------------+-----------------+
|         0143A|                1|
|         0400P|               16|
|         0233P|               14|
|         1120A|               11|
|         0555P|               17|
|         0852P|               20|
|         0215A|                2|
|         0758A|                7|
|         1005A|               10|
|         0845A|                8|
|         0015A|                0|
|         0707A|                7|
|         1022A|               10|
|         1150A|               11|
|         0525A|                5|
|         0645P|               18|
|         1122A|               11|
|         0256P|               14|
|         1232A|                0|
|         1034A|               10|
+--------------+-----------------+
only showing top 20 rows



In [43]:
# Adding a column 24_Violation_Hour to the dataframe having modified hour values 
nyc_taxi_final = spark.sql(
    "SELECT Summons_Number,\
            Plate_ID,\
            Registration_State,\
            Issue_Date,\
            Violation_Code,\
            Vehicle_Body_Type,\
            Vehicle_Make,\
            Violation_Precinct,\
            Issuer_Precinct,\
            Violation_Time,\
            Violation_Hour,\
            Violation_Minute,\
            Violation_AMPM,\
            CAST(CASE WHEN Violation_AMPM = 'AM' AND (Violation_Hour = 12 OR Violation_Hour = 0) THEN 0\
                WHEN Violation_AMPM = 'AM' AND Violation_Hour BETWEEN 1 AND 11 THEN Violation_Hour\
                WHEN Violation_AMPM = 'PM' AND (Violation_Hour = 0 OR Violation_Hour = 12) THEN 12\
                WHEN Violation_AMPM = 'PM' AND Violation_Hour BETWEEN 1 AND 11 THEN Violation_Hour+12\
                ELSE NULL END AS INTEGER) 24_Violation_Hour\
     FROM dfTable")

In [44]:
nyc_taxi_final.printSchema()

root
 |-- Summons_Number: long (nullable = true)
 |-- Plate_ID: string (nullable = true)
 |-- Registration_State: string (nullable = true)
 |-- Issue_Date: timestamp (nullable = true)
 |-- Violation_Code: integer (nullable = true)
 |-- Vehicle_Body_Type: string (nullable = true)
 |-- Vehicle_Make: string (nullable = true)
 |-- Violation_Precinct: integer (nullable = true)
 |-- Issuer_Precinct: integer (nullable = true)
 |-- Violation_Time: string (nullable = true)
 |-- Violation_Hour: string (nullable = true)
 |-- Violation_Minute: string (nullable = true)
 |-- Violation_AMPM: string (nullable = true)
 |-- 24_Violation_Hour: integer (nullable = true)



In [45]:
nyc_taxi_final.createOrReplaceTempView("dfTable")

**Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations.**

In [46]:
# Divide the hour into six equal discrete bins of time.
df_violation_hr = spark.sql("SELECT 24_Violation_Hour,\
                                       Violation_Code,\
                                       CASE WHEN 24_Violation_Hour BETWEEN 0 AND 3\
                                       THEN '0_3'\
                                       WHEN 24_Violation_Hour BETWEEN 4 AND 7\
                                       THEN '4_7'\
                                       WHEN 24_Violation_Hour BETWEEN 8 AND 11\
                                       THEN '8_11'\
                                       WHEN 24_Violation_Hour BETWEEN 12 AND 15\
                                       THEN '12_15' \
                                       WHEN 24_Violation_Hour BETWEEN 16 AND 19\
                                       THEN '16_19' \
                                       WHEN 24_Violation_Hour BETWEEN 20 AND 23\
                                       THEN '20_23' \
                                       END AS Violation_Hour_Bin\
                                       FROM dfTable WHERE 24_Violation_Hour is not null")

In [47]:
df_violation_hr.createOrReplaceTempView("df_violation_hr")

In [48]:
# Find three most commonly occurring violations
spark.sql("SELECT Violation_Hour_Bin,\
                                  Violation_Code,\
                                  Frequency_of_Tickets\
                                  FROM (SELECT Violation_Hour_Bin,\
                                  Violation_Code,\
                                  Frequency_of_Tickets,\
                                  dense_rank() over (partition by Violation_Hour_Bin order by Frequency_of_Tickets desc) Rnk\
                                  FROM (SELECT Violation_Hour_Bin,\
                                  Violation_Code,\
                                  count(*)as Frequency_of_Tickets\
                                  FROM df_violation_hr\
                                  GROUP BY Violation_Hour_Bin,\
                                  Violation_Code))\
                                  WHERE Rnk <= 3").show()

+------------------+--------------+--------------------+
|Violation_Hour_Bin|Violation_Code|Frequency_of_Tickets|
+------------------+--------------+--------------------+
|             16_19|            38|              203232|
|             16_19|            37|              145784|
|             16_19|            14|              144749|
|              8_11|            21|             1182689|
|              8_11|            36|              751422|
|              8_11|            38|              346518|
|               4_7|            14|              141276|
|               4_7|            21|              119469|
|               4_7|            40|              112186|
|             12_15|            36|              588395|
|             12_15|            38|              462758|
|             12_15|            37|              337075|
|               0_3|            21|               77461|
|               0_3|            40|               50948|
|               0_3|           

The above table shows the three most commonly occuring violations across different times of the day.

While observing the Frequency of Top-3 Violation Codes in each Violation Time Bin it is clear that the majority of the Tickets are issued between 0800-1100 Hrs and 1200-1500 Hrs.<br>

It is also important to note the exceptionally high number of tickets for Violation Code 21 [This is expected as Code 21 stands for No-Parking Zone Tickets. Majority of the public might park inappropriately during the morning rush] issued between
0800-1100 Hrs. <br>

There is also a high frequency of tickets for Violation Code 36 and 38 between 0800-1500 Hrs.
[This is also expected as Code 36 is due to exceeding speed limit near school zones, there are spikes between
0800-1500 Hrs during start and end of school day while Codes 37&38 are due to parking meter
between 1200-1500 Hrs. and 1600-1900 Hrs.]

**Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).**

In [49]:
spark.sql('SELECT Violation_Code,count(*) as no_of_tickets FROM df_violation_hr \
           group by Violation_Code order by no_of_tickets desc').show(3)

+--------------+-------------+
|Violation_Code|no_of_tickets|
+--------------+-------------+
|            21|      1528546|
|            36|      1400614|
|            38|      1062301|
+--------------+-------------+
only showing top 3 rows



The codes 21, 36 and 38 are most frequent. Now, let's see in which time bins these occur the most.

In [50]:
spark.sql("SELECT Violation_Code,\
          Violation_Hour_Bin,\
          count(*) no_of_tickets\
          FROM df_violation_hr\
          WHERE violation_code IN (21,36,38)\
          GROUP BY Violation_Code,Violation_Hour_Bin\
          ORDER BY Violation_Code,no_of_tickets desc").show()

+--------------+------------------+-------------+
|Violation_Code|Violation_Hour_Bin|no_of_tickets|
+--------------+------------------+-------------+
|            21|              8_11|      1182689|
|            21|             12_15|       148013|
|            21|               4_7|       119469|
|            21|               0_3|        77461|
|            21|             16_19|          551|
|            21|             20_23|          363|
|            36|              8_11|       751422|
|            36|             12_15|       588395|
|            36|               4_7|        33939|
|            36|             16_19|        26858|
|            38|             12_15|       462758|
|            38|              8_11|       346518|
|            38|             16_19|       203232|
|            38|             20_23|        47029|
|            38|               4_7|         2300|
|            38|               0_3|          464|
+--------------+------------------+-------------+


The violation code 21 and 36 issue maximum tickets in the morning time 0800-1100 Hrs. The code 38 is more frequently violated in the span of 1200-1500 Hrs, followed by next maximum tickets in the  0800-1100 Hrs time.

**6. Find some seasonality in this data: Divide the year into a certain number of seasons and find the frequencies of tickets for each season. Then, find the three most common violations for each of these seasons.**

In [51]:
nyc_taxi_final.printSchema()

root
 |-- Summons_Number: long (nullable = true)
 |-- Plate_ID: string (nullable = true)
 |-- Registration_State: string (nullable = true)
 |-- Issue_Date: timestamp (nullable = true)
 |-- Violation_Code: integer (nullable = true)
 |-- Vehicle_Body_Type: string (nullable = true)
 |-- Vehicle_Make: string (nullable = true)
 |-- Violation_Precinct: integer (nullable = true)
 |-- Issuer_Precinct: integer (nullable = true)
 |-- Violation_Time: string (nullable = true)
 |-- Violation_Hour: string (nullable = true)
 |-- Violation_Minute: string (nullable = true)
 |-- Violation_AMPM: string (nullable = true)
 |-- 24_Violation_Hour: integer (nullable = true)



In [52]:
nyc_taxi_final.createOrReplaceTempView("dfTable")

In [53]:
# Addingcolumn issue_mon as the month in which the ticket has been issued
nyc_taxi_final=spark.sql("SELECT Summons_Number,\
            Plate_ID,\
            Registration_State,\
            Issue_Date,\
            Violation_Code,\
            Vehicle_Body_Type,\
            Vehicle_Make,\
            Violation_Precinct,\
            Issuer_Precinct,\
            Violation_Time,\
            Violation_Hour,\
            Violation_Minute,\
            Violation_AMPM,\
            24_Violation_Hour,\
            CAST(EXTRACT(MONTH FROM CAST(Issue_Date as DATE)) AS String) issue_mon\
            FROM dfTable")

In [54]:
nyc_taxi_final.printSchema()

root
 |-- Summons_Number: long (nullable = true)
 |-- Plate_ID: string (nullable = true)
 |-- Registration_State: string (nullable = true)
 |-- Issue_Date: timestamp (nullable = true)
 |-- Violation_Code: integer (nullable = true)
 |-- Vehicle_Body_Type: string (nullable = true)
 |-- Vehicle_Make: string (nullable = true)
 |-- Violation_Precinct: integer (nullable = true)
 |-- Issuer_Precinct: integer (nullable = true)
 |-- Violation_Time: string (nullable = true)
 |-- Violation_Hour: string (nullable = true)
 |-- Violation_Minute: string (nullable = true)
 |-- Violation_AMPM: string (nullable = true)
 |-- 24_Violation_Hour: integer (nullable = true)
 |-- issue_mon: string (nullable = true)



In [55]:
nyc_taxi_final.createOrReplaceTempView("dfTable")

In [56]:
nyc_season=spark.sql("select issue_mon,Violation_Code,\
            CASE WHEN issue_mon IN (1,2,12)\
            THEN 'WINTER'\
            WHEN issue_mon BETWEEN 9 AND 11\
            THEN 'FALL'\
            WHEN issue_mon BETWEEN 3 AND 5\
            THEN 'SPRING'\
            WHEN issue_mon BETWEEN 6 AND 8\
            THEN 'SUMMER'\
            WHEN issue_mon like 12 THEN 'WINTER'\
             else  NULL\
            END AS SEASON\
            from dfTable where issue_mon is not null")

In [57]:
nyc_season.show(5)

+---------+--------------+------+
|issue_mon|Violation_Code|SEASON|
+---------+--------------+------+
|        7|             7|SUMMER|
|        7|             7|SUMMER|
|        8|             5|SUMMER|
|        6|            47|SUMMER|
|       11|            69|  FALL|
+---------+--------------+------+
only showing top 5 rows



In [58]:
nyc_season.createOrReplaceTempView("dfTable")

In [59]:
# Find the frequencies of tickets for each season
spark.sql('select SEASON,count(*) as Frequency_of_Tickets from dfTable\
            group by SEASON\
            order by Frequency_of_Tickets desc').show()

+------+--------------------+
|SEASON|Frequency_of_Tickets|
+------+--------------------+
|SPRING|             2880687|
|  FALL|             2830802|
|SUMMER|             2606208|
|WINTER|             2485331|
+------+--------------------+



Among the seasons Spring and Fall account for the highest number of parking tickets.

In [60]:
spark.sql('SELECT Season,\
                                  Violation_Code,\
                                  Frequency_of_Tickets\
                                  FROM (SELECT Season,\
                                  Violation_Code,\
                                  Frequency_of_Tickets,\
                                  dense_rank() over (partition by Season order by Frequency_of_Tickets desc) Rnk\
                                  FROM (SELECT Season,\
                                  Violation_Code,\
                                  count(*)as Frequency_of_Tickets\
                                  FROM dfTable\
                                  GROUP BY Season,\
                                  Violation_Code))\
                                  WHERE Rnk <= 3').show()

+------+--------------+--------------------+
|Season|Violation_Code|Frequency_of_Tickets|
+------+--------------+--------------------+
|WINTER|            21|              362341|
|WINTER|            36|              359338|
|WINTER|            38|              259723|
|SPRING|            21|              402807|
|SPRING|            36|              344834|
|SPRING|            38|              271192|
|  FALL|            36|              456046|
|  FALL|            21|              357479|
|  FALL|            38|              283828|
|SUMMER|            21|              405961|
|SUMMER|            38|              247561|
|SUMMER|            36|              240396|
+------+--------------+--------------------+



Except Fall season, it can be seen that the highest number of tickets are issued for violation code 21. For Fall season, highest number of tickets are issued for violation code 36.

**7. The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Estimate this for the three most commonly occurring codes:**

In [61]:
nyc_taxi_final.createOrReplaceTempView("dfTable")

**Total occurrences of the three most common violation codes**

In [62]:
top_violation = spark.sql('select Violation_Code, count(*) as Frequency_of_Tickets from dfTable group by Violation_Code order by Frequency_of_Tickets desc')
top_violation.show(3)

+--------------+--------------------+
|Violation_Code|Frequency_of_Tickets|
+--------------+--------------------+
|            21|             1528588|
|            36|             1400614|
|            38|             1062304|
+--------------+--------------------+
only showing top 3 rows



In [64]:
from pyspark.sql.functions import sum
top_violation.where('Violation_Code in (21,36,38)').select(sum('Frequency_of_Tickets').alias("Total_tickets_for_top_3_violations")).show()

+----------------------------------+
|Total_tickets_for_top_3_violations|
+----------------------------------+
|                           3991506|
+----------------------------------+



The top three most common violation codes are 21,36 and 38.
**[21: No Parking Violation, 36: Exceeding speed limit near School Zone, 38: Failed to show receipt or tag in windshield].**
The highest frequency of tickets was issued for Violation Code 21 followed by 36 and 38 respectively.
Total 3991506 tickets has been issued for the top 3 violation codes.

**The fines associated with different violation codes are divided into two categories: one for the highest-density locations in the city and the other for the rest of the city.**

**For the sake of simplicity, take the average of the two. Find the total amount collected for the three violation codes with the maximum tickets. State the code that has the highest total collection.**

In [65]:
collection=spark.sql("select Violation_Code, count(*) as freq_tickets,\
          CASE WHEN Violation_Code like 21 THEN CAST((65+45)/2 as INTEGER)\
          WHEN Violation_Code like 36 THEN CAST((50+50)/2 as INTEGER)\
          WHEN Violation_Code like 38 THEN CAST((65+35)/2 as INTEGER)\
          END AS FINE\
        from dfTable where Violation_Code IN (21,36,38) \
        group by Violation_Code order by freq_tickets desc")

In [66]:
collection.show()

+--------------+------------+----+
|Violation_Code|freq_tickets|FINE|
+--------------+------------+----+
|            21|     1528588|  55|
|            36|     1400614|  50|
|            38|     1062304|  50|
+--------------+------------+----+



In [67]:
collection.createOrReplaceTempView("dfTable")

In [68]:
# Find the total amount collected for the three violation codes with the maximum tickets
spark.sql("select Violation_Code,freq_tickets,FINE,freq_tickets*FINE as total_collection from dfTable").show()

+--------------+------------+----+----------------+
|Violation_Code|freq_tickets|FINE|total_collection|
+--------------+------------+----+----------------+
|            21|     1528588|  55|        84072340|
|            36|     1400614|  50|        70030700|
|            38|     1062304|  50|        53115200|
+--------------+------------+----+----------------+



The maximum revenue comes from the violation against code 21 which prohibits parking in areas having no parking sign, followed by code 36 which is violated if driven above the speedlimit near a school and 38 which is imposed on failure to show a receipt or tag in the windshield.

**Violation Code 21 brought in \$84 million in total fine, while Code 36 and 38 brought in \$70 million and \$53 million respectively.**

### Business Inference :

The business inferences drawn from the above project are as follows:
    
- The car type SUBN issues the maximum number of tickets around 0.87 million.
- The car types of Ford and Toyota issue maximum tickets around 12 Lakhs, when compared to other company models.
- The issuer precincts 19 issue maximum number of tickets against violation code 46 which restricts double parking and is mostly common in Mid Manhattan. Due to high demand of service delivery, this might occur and must be taken into checklist. Where as, precincts 14 and 1 issue maximum tickets against code 14 which is violated due to General No Standing: Standing or parking where standing is not allowed by sign, street marking or; traffic control device.
- The morning slot of 8-11 AM issue maximum number of tickets against code 21 (illegal parking) and 36 (driving beyond speed limit near a school). This might happen due to rush hours in the office-school time. 
- If we look into the data season-wise, then also code 21 is violated the most, followed by 36, thereby resulting in a huge revenue in these two.

**Hence from the EDA on the NYC Parking data, it is clear that from all the views created the most problematic area in the parking field is parking in the wrong place and breaking the speed limit. The department must dig in more in order to find out the driving factors leading to this and attempting to solve this. **

In [None]:
spark.stop()