In [103]:
#### Siddhartha
### Creating the Spark Context through the SparkSession
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("NYC_Parking_Tickets") \
    .getOrCreate()

In [104]:
NYC_Parking = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load('/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv')


In [105]:
NYC_Parking 

DataFrame[Summons Number: bigint, Plate ID: string, Registration State: string, Issue Date: timestamp, Violation Code: int, Vehicle Body Type: string, Vehicle Make: string, Violation Precinct: int, Issuer Precinct: int, Violation Time: string]

In [106]:
### Number of rows
NYC_Parking.count()

10803028

In [107]:
NYC_Parking.show()

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|         

In [108]:
### Dropping duplicates
NYC_Parking=NYC_Parking.dropDuplicates()
NYC_Parking.count()

10803028

In [109]:
# Droping null values
NYC_Parking =NYC_Parking.dropna()
NYC_Parking.count()

10803028

In [110]:
#### Creating the temporary view
NYC_Parking.createOrReplaceTempView("NYC_Parking")

In [111]:
spark.sql('Select * from NYC_Parking')

DataFrame[Summons Number: bigint, Plate ID: string, Registration State: string, Issue Date: timestamp, Violation Code: int, Vehicle Body Type: string, Vehicle Make: string, Violation Precinct: int, Issuer Precinct: int, Violation Time: string]

In [112]:
### Replacing the space in the field name with a underscore 
NYC_Parking= NYC_Parking.toDF(*(a.replace(' ', '_') for a in NYC_Parking.columns))
NYC_Parking.show

<bound method DataFrame.show of DataFrame[Summons_Number: bigint, Plate_ID: string, Registration_State: string, Issue_Date: timestamp, Violation_Code: int, Vehicle_Body_Type: string, Vehicle_Make: string, Violation_Precinct: int, Issuer_Precinct: int, Violation_Time: string]>

In [113]:
NYC_Parking.createOrReplaceTempView("NYC_Parking")

In [114]:
spark.sql('Select * from NYC_Parking')

DataFrame[Summons_Number: bigint, Plate_ID: string, Registration_State: string, Issue_Date: timestamp, Violation_Code: int, Vehicle_Body_Type: string, Vehicle_Make: string, Violation_Precinct: int, Issuer_Precinct: int, Violation_Time: string]

In [115]:
tickets_year = spark.sql("select year(Issue_Date) as Year, count(Summons_Number) as no_of_tickets from NYC_Parking group by Year order by Year Desc")
tickets_year.show(100)

+----+-------------+
|Year|no_of_tickets|
+----+-------------+
|2069|            4|
|2068|            1|
|2063|            2|
|2062|            2|
|2061|            1|
|2060|            2|
|2053|            1|
|2047|            2|
|2041|            1|
|2036|            1|
|2033|            2|
|2031|            5|
|2030|           12|
|2029|            2|
|2028|            8|
|2027|           50|
|2026|           24|
|2025|            6|
|2024|            3|
|2023|            5|
|2022|            4|
|2021|           22|
|2020|           22|
|2019|          472|
|2018|         1057|
|2017|      5431918|
|2016|      5368391|
|2015|          419|
|2014|          120|
|2013|           70|
|2012|           87|
|2011|           22|
|2010|           48|
|2009|            3|
|2008|            4|
|2007|           18|
|2006|            8|
|2005|            1|
|2004|            2|
|2003|            1|
|2002|            1|
|2001|            2|
|2000|          185|
|1997|            1|
|1996|       

In [116]:
### From the above results we con find that the number of tickets in 2016 and 2017 are as follows
### We are interested in the number of tickets data for 2017
### 2016 - 5368391 and 2017- 5431918

In [117]:
# Filtering only 2017 data
NYC_Parking.createOrReplaceTempView("2017_parking")
NYC_Parking=spark.sql("select * from 2017_parking where year(TO_DATE(CAST(UNIX_TIMESTAMP(Issue_Date,'MM/dd/yyyy') AS TIMESTAMP))) = 2017 ")
NYC_Parking.count()

5431918

In [118]:
### Removing the Blank Plates

NYC_Parking=NYC_Parking[NYC_Parking.Plate_ID!='BLANKPLATE']
NYC_Parking.count()

5426657

In [119]:
#### Replacing the temporary view
NYC_Parking.createOrReplaceTempView("2017_parking")


In [120]:
## Q1 Find the total number of tickets for the year.

Q1=spark.sql("Select count(distinct(Summons_Number)) from 2017_parking")
Q1.show()

### Total number of tickets issued in 2017 is 5426657

+------------------------------+
|count(DISTINCT Summons_Number)|
+------------------------------+
|                       5426657|
+------------------------------+



In [121]:
### Q2 Find out the number of unique states from where the cars that got parking tickets came

Q2 = spark.sql("SELECT distinct(Registration_State) from 2017_parking")
Q2.count()

### Total 65 registration states from which the parking tickets came

65

In [122]:
Q2 = spark.sql("SELECT distinct(Registration_State) AS Reg_State, count(*) Num_Tickets from 2017_parking group by Reg_State Order by  Num_Tickets desc")

Q2.show()



+---------+-----------+
|Reg_State|Num_Tickets|
+---------+-----------+
|       NY|    4273951|
|       NJ|     475825|
|       PA|     140286|
|       CT|      70403|
|       FL|      69468|
|       IN|      45525|
|       MA|      38941|
|       VA|      34367|
|       MD|      30213|
|       NC|      27152|
|       TX|      18827|
|       IL|      18666|
|       GA|      17537|
|       AZ|      12379|
|       OH|      12281|
|       CA|      12153|
|       ME|      10806|
|       99|      10794|
|       SC|      10395|
|       MN|      10083|
+---------+-----------+
only showing top 20 rows



In [123]:
### There is an column entry with numeric value of 99 which needs to be replaced with NY as NY is the state from which maximum tickets were issued

from pyspark.sql.functions import when,lit
NYC_Parking=NYC_Parking.withColumn('Registration_State',when(NYC_Parking["Registration_State"]=="99",lit('NY')).otherwise(NYC_Parking["Registration_State"]))

In [124]:
#### Replacing the temporary view after 99 is replaced by NY
NYC_Parking.createOrReplaceTempView("2017_parking")

In [125]:
### Q2 Find out the number of unique states from where the cars that got parking tickets came after replacing 99 with NY

Q2 = spark.sql("SELECT distinct(Registration_State) AS Reg_State, count(*) Num_Tickets from 2017_parking group by Reg_State Order by  Num_Tickets desc")

Q2.show()

+---------+-----------+
|Reg_State|Num_Tickets|
+---------+-----------+
|       NY|    4284745|
|       NJ|     475825|
|       PA|     140286|
|       CT|      70403|
|       FL|      69468|
|       IN|      45525|
|       MA|      38941|
|       VA|      34367|
|       MD|      30213|
|       NC|      27152|
|       TX|      18827|
|       IL|      18666|
|       GA|      17537|
|       AZ|      12379|
|       OH|      12281|
|       CA|      12153|
|       ME|      10806|
|       SC|      10395|
|       MN|      10083|
|       OK|       9088|
+---------+-----------+
only showing top 20 rows



In [126]:
### There are 64 distinct states from which the cars got picked (after replacing 99 with NY)
Q2.count()

64

In [127]:
# Aggregation Code
### How often does each violation code occur?  (Part of Q1 Aggregation Task)
from pyspark.sql.functions import count,desc,countDistinct
NYC_Parking.select(countDistinct("Violation_Code")).show()

+------------------------------+
|count(DISTINCT Violation_Code)|
+------------------------------+
|                           100|
+------------------------------+



In [128]:
# Aggregation Code
### How often does each violation code occur? 
Q3 = spark.sql("SELECT distinct(Violation_Code) AS Violation_Code, count(*) Num_Tickets from 2017_parking group by Violation_Code Order by  Num_Tickets desc")
Q3.count()

100

In [129]:
###  Display the frequency of the top five violation codes (Aggregation Task Q1)

Q3.show(5)

+--------------+-----------+
|Violation_Code|Num_Tickets|
+--------------+-----------+
|            21|     767740|
|            36|     662765|
|            38|     541526|
|            14|     476405|
|            20|     319439|
+--------------+-----------+
only showing top 5 rows



In [130]:
### How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? (Q2 in Aggregation Task)

vehicleBodyType = spark.sql("SELECT Vehicle_Body_Type, count(*) as Ticket from 2017_parking group by Vehicle_Body_Type order by Ticket desc")
vehicleBodyType.show(5)



+-----------------+-------+
|Vehicle_Body_Type| Ticket|
+-----------------+-------+
|             SUBN|1882978|
|             4DSD|1547063|
|              VAN| 723796|
|             DELV| 358924|
|              SDN| 192927|
+-----------------+-------+
only showing top 5 rows



In [131]:
### Vehicle make (Q2 in Aggregation Task)

vehicleMake = spark.sql("SELECT Vehicle_Make, count(*) as Ticket from 2017_parking group by Vehicle_Make order by Ticket desc")
vehicleMake.show(5)

+------------+------+
|Vehicle_Make|Ticket|
+------------+------+
|        FORD|636527|
|       TOYOT|605011|
|       HONDA|538460|
|       NISSA|461725|
|       CHEVR|355868|
+------------+------+
only showing top 5 rows



In [132]:
#### 'Violation Precinct' (This is the precinct of the zone where the violation occurred). (Q3 Part 1 in Aggregation Task)
### Using this, can you draw any insights for parking violations in any specific areas of the city?
#### Here, you would have noticed that the dataframe has the'Violating Precinct' or 'Issuing Precinct' as '0'. 
### These are erroneous entries. Hence, you need to provide the records for five correct precincts. 
### (Hint: Print the top six entries after sorting.)
Violation_Precinct = spark.sql("SELECT Violation_Precinct, count(*) as Ticket from 2017_parking group by Violation_Precinct order by Ticket desc")
Violation_Precinct.show(6)



+------------------+------+
|Violation_Precinct|Ticket|
+------------------+------+
|                 0|925395|
|                19|274264|
|                14|203375|
|                 1|174620|
|                18|169043|
|               114|147223|
+------------------+------+
only showing top 6 rows



In [133]:
### 'Issuer Precinct' (This is the precinct that issued the ticket.)(Q3 part 2 in Aggregation Task)

Issue_precinct = spark.sql("SELECT Issuer_Precinct, count(*) as Ticket from 2017_parking  group by Issuer_Precinct order by Ticket desc")  
Issue_precinct.show(6)


+---------------+-------+
|Issuer_Precinct| Ticket|
+---------------+-------+
|              0|1077884|
|             19| 266790|
|             14| 200328|
|              1| 168630|
|             18| 162908|
|            114| 143900|
+---------------+-------+
only showing top 6 rows



In [134]:
### Find the violation code frequencies for three precincts that have issued the most number of tickets. (Q4 Aggregation Task)


violation_code_freq = spark.sql("select Issuer_Precinct,Violation_Code, count(*) as Frequency from 2017_parking group by Issuer_Precinct, Violation_Code order by Frequency desc" )
violation_code_freq.show(3)


+---------------+--------------+---------+
|Issuer_Precinct|Violation_Code|Frequency|
+---------------+--------------+---------+
|              0|            36|   662765|
|              0|             7|   210175|
|              0|            21|   125923|
+---------------+--------------+---------+
only showing top 3 rows



In [135]:
### let us look at the top 10 since the top three issuer precinct are having code 0 which we are not considering
### So the top 3 precincts are 18, 19 and 14 (Q4 Aggregation Task)

violation_code_freq.show(10)


+---------------+--------------+---------+
|Issuer_Precinct|Violation_Code|Frequency|
+---------------+--------------+---------+
|              0|            36|   662765|
|              0|             7|   210175|
|              0|            21|   125923|
|             18|            14|    50135|
|             19|            46|    48422|
|              0|             5|    48076|
|             14|            14|    45019|
|              1|            14|    38345|
|             19|            38|    36332|
|             19|            37|    36046|
+---------------+--------------+---------+
only showing top 10 rows



In [136]:
### Looking for top 3 violation codes within precinct 18 having maximum tickets issued

violation_code_freq_18 = spark.sql("select Violation_Code, count(*) as Frequency from 2017_parking where Issuer_Precinct=18 group by Violation_Code order by Frequency desc" )
violation_code_freq_18.show(3)

+--------------+---------+
|Violation_Code|Frequency|
+--------------+---------+
|            14|    50135|
|            69|    20188|
|            47|    14105|
+--------------+---------+
only showing top 3 rows



In [137]:
### Looking for top 3 violation codes within precinct 19 having maximum tickets issued

violation_code_freq_19 = spark.sql("select Violation_Code, count(*) as Frequency from 2017_parking where Issuer_Precinct=19 group by Violation_Code order by Frequency desc" )
violation_code_freq_19.show(3)

+--------------+---------+
|Violation_Code|Frequency|
+--------------+---------+
|            46|    48422|
|            38|    36332|
|            37|    36046|
+--------------+---------+
only showing top 3 rows



In [138]:
### Looking for top 3 violation codes within precinct 14 having maximum tickets issued

violation_code_freq_14 = spark.sql("select Violation_Code, count(*) as Frequency from 2017_parking where Issuer_Precinct=14 group by Violation_Code order by Frequency desc" )
violation_code_freq_14.show(3)

+--------------+---------+
|Violation_Code|Frequency|
+--------------+---------+
|            14|    45019|
|            69|    30453|
|            31|    22528|
+--------------+---------+
only showing top 3 rows



In [139]:
###  The top three precincts which have issued maximum tickets on violation are precincts 18, 19 and 14
### Do these precinct zones have an exceptionally high frequency of certain violation codes? 
### Are these codes common across precincts? 
### To find out the above, we need to look at the combined top 3 violations for precincts 18, 19 and 14(Q4 Part 2 Aggreagtion Task)

combined_violation_codes =spark.sql("select Violation_Code, count(*) as Frequency from 2017_parking where Issuer_Precinct in (18,19,14) group by Violation_Code order by Frequency desc")
combined_violation_codes.show(3)

#### From the below results it is clear that violation codes 14, 46 and 69 are the top 3 violation codes for the combined data
#### As we can see from this, violation codes 14 and 69 are common across precincts 14 and 18 (this was also clear from the individual
### zone analysis above, precinct 19 has no common violation code shared with the other two zones)

+--------------+---------+
|Violation_Code|Frequency|
+--------------+---------+
|            14|   124945|
|            46|    63958|
|            69|    53549|
+--------------+---------+
only showing top 3 rows



In [140]:
# Should not see any null value as we had dropped the null values in the beginning
null_values = spark.sql("SELECT count(*) as null_values from 2017_parking where Violation_Time is NULL")
null_values.show()

+-----------+
|null_values|
+-----------+
|          0|
+-----------+



In [141]:
### Finding the seasonality in the violations data(Q6)
#### Spring runs from March 1 to May 31;
#### Summer runs from June 1 to August 31;
#### Fall (autumn) runs from September 1 to November 30; and.
#### Winter runs from December 1 to February 28

##### Source: Northern Meteorological Seasons from https://www.timeanddate.com


seasonality = spark.sql("select Violation_Code , Issuer_Precinct,Issue_Date, case when MONTH(TO_DATE(Issue_Date, 'MM/dd/yyyy')) between 03 and 05 then 'spring' when MONTH(TO_DATE(Issue_Date, 'MM/dd/yyyy')) between 06 and 08 then 'summer' when MONTH(TO_DATE(Issue_Date, 'MM/dd/yyyy')) between 09 and 11 then 'autumn' when MONTH(TO_DATE(Issue_Date, 'MM/dd/yyyy')) in (1,2,12) then 'winter' else 'unknown' end  as season from 2017_parking")
seasonality.show()


+--------------+---------------+-------------------+------+
|Violation_Code|Issuer_Precinct|         Issue_Date|season|
+--------------+---------------+-------------------+------+
|            53|              5|2017-03-19 00:00:00|spring|
|            10|             61|2017-06-20 00:00:00|summer|
|            14|            109|2017-02-03 00:00:00|winter|
|            36|              0|2017-05-23 00:00:00|spring|
|            14|            109|2017-06-01 00:00:00|summer|
|            84|             17|2017-04-05 00:00:00|spring|
|            69|             17|2017-05-06 00:00:00|spring|
|             7|              0|2017-03-08 00:00:00|spring|
|            37|              9|2017-04-18 00:00:00|spring|
|            14|              1|2017-02-21 00:00:00|winter|
|            38|             24|2017-02-04 00:00:00|winter|
|             7|              0|2017-01-10 00:00:00|winter|
|            24|            112|2017-04-20 00:00:00|spring|
|            37|              6|2017-03-

In [142]:
##### Creating a temp view for seasonality data
seasonality.createOrReplaceTempView("seasonality")

In [143]:
### Frequency of tickets for each season(Q6 Part 1)
seasonality_freq = spark.sql("select season, count(*) as tickets from seasonality  group by season order by tickets desc")
seasonality_freq.show()

+------+-------+
|season|tickets|
+------+-------+
|spring|2870491|
|winter|1702786|
|summer| 852405|
|autumn|    975|
+------+-------+



In [144]:
##### Then, find the three most common violations for each of these seasons.(Q6 Part2)

# Top 3 violations in the Spring season (we have not filtered the data for any precincts)
violation_spring = spark.sql("select Violation_Code, count(*) as Frequency from seasonality where season = 'spring' group by Violation_Code order by Frequency desc" )
violation_spring.show(3)


+--------------+---------+
|Violation_Code|Frequency|
+--------------+---------+
|            21|   402234|
|            36|   344834|
|            38|   270913|
+--------------+---------+
only showing top 3 rows



In [145]:
# Top 3 violations in the winter season (we have not filtered the data for any precincts)(Q6 Part2)
violation_winter = spark.sql("select Violation_Code, count(*) as Frequency from seasonality where season = 'winter' group by Violation_Code order by Frequency desc" )
violation_winter.show(3)

+--------------+---------+
|Violation_Code|Frequency|
+--------------+---------+
|            21|   238059|
|            36|   221268|
|            38|   187102|
+--------------+---------+
only showing top 3 rows



In [146]:
# Top 3 violations in the autumn season (we have not filtered the data for any precincts)(Q6 Part2)
violation_autumn = spark.sql("select Violation_Code, count(*) as Frequency from seasonality where season = 'autumn' group by Violation_Code order by Frequency desc" )
violation_autumn.show(3)

+--------------+---------+
|Violation_Code|Frequency|
+--------------+---------+
|            46|      231|
|            21|      128|
|            40|      115|
+--------------+---------+
only showing top 3 rows



In [147]:
# Top 3 violations in the summer season (we have not filtered the data for any precincts)(Q6 Part2)
violation_summer = spark.sql("select Violation_Code, count(*) as Frequency from seasonality where season = 'summer' group by Violation_Code order by Frequency desc" )
violation_summer.show(3)

+--------------+---------+
|Violation_Code|Frequency|
+--------------+---------+
|            21|   127319|
|            36|    96663|
|            38|    83503|
+--------------+---------+
only showing top 3 rows



In [148]:
## Find the total occurrences of the three most common violation codes.(Q7 Part1)

common_Violation = spark.sql("select Violation_Code, count(*) as Count_Violations from 2017_parking group by Violation_Code Order By Count_Violations desc")
common_Violation.show(3)

### Total occurrences of three most common violation codes are 1972031
### Violation Codes 21, 36 and 38 are among the top 3 in terms of the number of violations


+--------------+----------------+
|Violation_Code|Count_Violations|
+--------------+----------------+
|            21|          767740|
|            36|          662765|
|            38|          541526|
+--------------+----------------+
only showing top 3 rows



In [149]:
### Using the information from https://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page
### find the total amount collected for the three violation codes with the maximum tickets. 
### State the code that has the highest total collection.
### Violation Code 21 has average fine across locations is avg(65,45) = 55
### Violation Code 36 has average fine across locations is avg(50,50) = 50
### Violation Code 38 has average fine across locations is avg(65,35) = 50

#### Using the above average fine figures for each Violation Code, the Total COllection from the three violation codes
### for 2017 is (Q7 Part 2)

print('Total collection = ',767740*55+662765*50+541526*50)
print('Individual collection for Violation Code 21 = ',767740*55)
print('Individual collection for Violation Code 36 = ',662765*50)
print('Individual collection for Violation Code 38 = ',541526*50)



Total collection =  102440250
Individual collection for Violation Code 21 =  42225700
Individual collection for Violation Code 36 =  33138250
Individual collection for Violation Code 38 =  27076300


In [151]:
#### Following are the inferences(Q7 Part 3)

#### Violation Code 21 - Street Cleaning: No parking where parking is not allowed by sign, street marking or traffic control device
### has resulted in the maximum collection 

##### Top 3 Violations across all areas in NY are 21, 36 amd 38 resulting in total revenue of 102440250 USD in 2017
##### Top 3 precincts with highest violations are precincts 18, 19 and 14 
##### The most commonly occured vilations in these 3 precincts are violation code 14 and 69
##### Spring and Winter have the maximum number of violations followed by summer and autumn

In [152]:
spark.stop()