  ##           Case Study:                                  NYC Parking Tickets 

In [1]:
## Creating a Spark session Object to make use of spark functions

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Spark DF and SQL').getOrCreate()

In [3]:
## Creating a dataframe using sparksession object and load requried data into dataframe

In [94]:
## loading data into dataframe

NYC_park = spark.read.option("header","True").option("inferschema","true")\
.csv('/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv')

In [95]:
## looking at the data and structure of dataframe

In [96]:
NYC_park

DataFrame[Summons Number: bigint, Plate ID: string, Registration State: string, Issue Date: timestamp, Violation Code: int, Vehicle Body Type: string, Vehicle Make: string, Violation Precinct: int, Issuer Precinct: int, Violation Time: string]

In [97]:
NYC_park.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Violation Time: string (nullable = true)



In [8]:
# Number of records in dataframe
NYC_park.count()

10803028

In [10]:
NYC_park.describe

<bound method DataFrame.describe of DataFrame[Summons Number: bigint, Plate ID: string, Registration State: string, Issue Date: timestamp, Violation Code: int, Vehicle Body Type: string, Vehicle Make: string, Violation Precinct: int, Issuer Precinct: int, Violation Time: string]>

In [11]:
# checking top 5 rows of dataframe
NYC_park.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|         

In [9]:
NYC_park.head()

Row(Summons Number=5092469481, Plate ID='GZH7067', Registration State='NY', Issue Date=datetime.datetime(2016, 7, 10, 0, 0), Violation Code=7, Vehicle Body Type='SUBN', Vehicle Make='TOYOT', Violation Precinct=0, Issuer Precinct=0, Violation Time='0143A')

In [12]:
NYC_park.columns

['Summons Number',
 'Plate ID',
 'Registration State',
 'Issue Date',
 'Violation Code',
 'Vehicle Body Type',
 'Vehicle Make',
 'Violation Precinct',
 'Issuer Precinct',
 'Violation Time']

In [13]:
# Mentioned in case study that For the scope of this analysis, we will analyse the parking tickets over the year 2017. 
# so we will check the issue date column whether it have any data other than 2017.

# I am considering the data with the issue year of 2017 only.

In [99]:
from pyspark.sql.functions import col
NYC_park.groupBy(year(NYC_park['Issue Date'])).count().sort(col('count').desc()).show(20)

+----------------+-------+
|year(Issue Date)|  count|
+----------------+-------+
|            2017|5431918|
|            2016|5368391|
|            2018|   1057|
|            2019|    472|
|            2015|    419|
|            2000|    185|
|            2014|    120|
|            2012|     87|
|            2013|     70|
|            2027|     50|
|            2010|     48|
|            2026|     24|
|            2020|     22|
|            2021|     22|
|            2011|     22|
|            2007|     18|
|            2030|     12|
|            2006|      8|
|            2028|      8|
|            2025|      6|
+----------------+-------+
only showing top 20 rows



#### We can see that there are rows other than the year 2017. so I am considering here only for 2017 data and filter out the remaining data.

In [100]:
## Taking only 2017 year data

In [101]:
from pyspark.sql.functions import year,month, dayofmonth
NYC_park=NYC_park.filter((year('Issue Date')==2017))

In [102]:
NYC_park.show(2)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    8478629828| 66623ME|                NY|2017-06-14 00:00:00|            47|             REFG|       MITSU|                14|             14|         1120A|
|    5096917368| FZD8593|                NY|2017-06-13 00:00:00|             7|             SUBN|       ME/BE|                 0|              0|         0852P|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
only showing top 2 rows



In [124]:
## checking the count of records after filtering only 2017 year data

In [103]:
NYC_park.groupBy(year(NYC_park['Issue Date'])).count().sort(col('count').desc()).show(20)

+----------------+-------+
|year(Issue Date)|  count|
+----------------+-------+
|            2017|5431918|
+----------------+-------+



 ##  Examine the data

###  1. Find the total number of tickets for the year.

In [104]:
NYC_park.select(col('Summons Number')).count()

5431918

#### There are 5431918 total tickets for the year

### 2. Find out the number of unique states from where the cars that got parking tickets came. (Hint: Use the column 'Registration State'.
### There is a numeric entry '99' in the column, which should be corrected. Replace it with the state having the maximum entries. Provide the number of unique states again.

In [17]:
## countDistinct() is used to get unique values in a column

In [105]:
NYC_park.select(countDistinct("Registration State")).show()

+----------------------------------+
|count(DISTINCT Registration State)|
+----------------------------------+
|                                65|
+----------------------------------+



#### There are 65 unique states that got parking tickets

In [108]:
## checking which state got more parking tickets

In [107]:
(NYC_park.groupBy(NYC_park["Registration State"]).count().sort(col("count").desc()).show())

+------------------+-------+
|Registration State|  count|
+------------------+-------+
|                NY|4273951|
|                NJ| 475825|
|                PA| 140286|
|                CT|  70403|
|                FL|  69468|
|                IN|  45525|
|                MA|  38941|
|                VA|  34367|
|                MD|  30213|
|                NC|  27152|
|                TX|  18827|
|                IL|  18666|
|                GA|  17537|
|                99|  16055|
|                AZ|  12379|
|                OH|  12281|
|                CA|  12153|
|                ME|  10806|
|                SC|  10395|
|                MN|  10083|
+------------------+-------+
only showing top 20 rows



In [22]:
### SO NY(New York) is the state from where cars that got parking tickets much higher than the other states.

In [109]:
## There is a numeric entry '99' in the column, which should be corrected. we replace "99" with STATE "NY" as it got 
## highest tickets i.e. maximum entries

In [110]:
NYC_park.filter(NYC_park["Registration State"]=='99').count()

16055

In [111]:
## There are 16055 entries need to be replaced with "NY"

In [112]:
## replacing 99 with  NY
from pyspark.sql.functions import *
NYC_park=NYC_park.withColumn("Registration State",regexp_replace(col("Registration State"),'99','NY'))

In [113]:
NYC_park.filter(NYC_park["Registration State"]=='NY').count()

4290006

In [29]:
## Again checking number of unique states again.

In [115]:
## SO we have 64 unique states

NYC_park.select(countDistinct("Registration State")).show()

+----------------------------------+
|count(DISTINCT Registration State)|
+----------------------------------+
|                                64|
+----------------------------------+



In [116]:
(NYC_park.groupBy(NYC_park["Registration State"]).count().sort(col("count").desc()).distinct().show())

+------------------+-------+
|Registration State|  count|
+------------------+-------+
|                NY|4290006|
|                NJ| 475825|
|                PA| 140286|
|                CT|  70403|
|                FL|  69468|
|                IN|  45525|
|                MA|  38941|
|                VA|  34367|
|                MD|  30213|
|                NC|  27152|
|                TX|  18827|
|                IL|  18666|
|                GA|  17537|
|                AZ|  12379|
|                OH|  12281|
|                CA|  12153|
|                ME|  10806|
|                SC|  10395|
|                MN|  10083|
|                OK|   9088|
+------------------+-------+
only showing top 20 rows



In [117]:
## So successfully replaced the '99' with 'NY'

## Aggregation tasks

### 1. How often does each violation code occur? Display the frequency of the top five violation codes.

In [118]:
# distinct violation codes present in data
NYC_park.select('Violation code').distinct().count()

100

In [119]:
## so there are 100 unique violation codes. Below code shows how often each violation code occurs

In [120]:
NYC_park.groupBy(NYC_park['Violation Code']).count().show(100)

+--------------+------+
|Violation Code| count|
+--------------+------+
|            31| 80593|
|            85|  9316|
|            65|    26|
|            53| 19488|
|            78| 26776|
|            34|    11|
|            81|    14|
|            28|     5|
|            76|    18|
|            27|  3039|
|            26|   660|
|            44|     4|
|            12|    53|
|            91|   433|
|            22|    81|
|            93|     8|
|            47| 65440|
|             1|   674|
|            52|  1001|
|            13| 11673|
|            16| 74790|
|             6|   192|
|            86|     6|
|             3|   407|
|            20|319646|
|            40|277184|
|            94|   199|
|            57|     3|
|            54|     3|
|            96|    41|
|            48| 40987|
|             5| 48081|
|            19|149062|
|            92|    20|
|            64|  6764|
|            41|  2621|
|            43|   174|
|            15|     7|
|            37|

In [121]:
## Frequency of top 5 violation codes

In [122]:
NYC_park.groupBy(NYC_park['Violation Code']).count().sort(col('count').desc()).show(5)

+--------------+------+
|Violation Code| count|
+--------------+------+
|            21|768087|
|            36|662765|
|            38|542079|
|            14|476664|
|            20|319646|
+--------------+------+
only showing top 5 rows



#### 21,36,38,14 and 20 are top5 violation codes

### 2. How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? 

In [123]:
# count of distinct vehicle body types
NYC_park.select('Vehicle Body Type').distinct().count()

1165

In [124]:
## There are 1165 distinct vehicle body types. we need to show top 5 body type that got tickets.

In [125]:
NYC_park.groupBy(NYC_park["Vehicle Body Type"]).count().sort(col('count').desc()).show(5)

+-----------------+-------+
|Vehicle Body Type|  count|
+-----------------+-------+
|             SUBN|1883954|
|             4DSD|1547312|
|              VAN| 724029|
|             DELV| 358984|
|              SDN| 194197|
+-----------------+-------+
only showing top 5 rows



#### SUBN,4DSD,VAN,DELV and SDN are  top 5 vehicle body types got highest tickets

### how often Vehicle maker getting parking tickets

In [126]:
## count of distinct vehicle Make
NYC_park.select('Vehicle Make').distinct().count()

3179

In [127]:
## there are 3179 unique vehicle(car) makers. need to show top 5 vehicle makers who got tickets

In [128]:
NYC_park.groupBy(NYC_park["Vehicle Make"]).count().sort(col('count').desc()).show(5)

+------------+------+
|Vehicle Make| count|
+------------+------+
|        FORD|636844|
|       TOYOT|605291|
|       HONDA|538884|
|       NISSA|462017|
|       CHEVR|356032|
+------------+------+
only showing top 5 rows



#### Ford,Toyota,Honda,Nissan,Chevrolet are top 5 vehicle maker got highest tickets

### 3. A precinct is a police station that has a certain zone of the city under its command. Find the (5 highest) frequencies of tickets for each of the following:

#### 1. 'Violation Precinct' 

In [129]:
## distinct number of violation precincts
NYC_park.select('Violation precinct').distinct().count()

171

In [130]:
NYC_park.groupBy(NYC_park["Violation precinct"]).count().sort(col('count').desc()).show(6)

+------------------+------+
|Violation precinct| count|
+------------------+------+
|                 0|925596|
|                19|274445|
|                14|203553|
|                 1|174702|
|                18|169131|
|               114|147444|
+------------------+------+
only showing top 6 rows



#### Given in problem statement that violation precinct '0' is erroneous entry. so showing top 6 entries. excluding 0, the top5 violation precincts are 19,14,1,18,114

#### 2. 'Issuer Precinct'

In [133]:
## distinct number of Issuer precincts in data
NYC_park.select('Issuer precinct').distinct().count()

511

In [134]:
NYC_park.groupBy(NYC_park["Issuer precinct"]).count().sort(col('count').desc()).show(6)

+---------------+-------+
|Issuer precinct|  count|
+---------------+-------+
|              0|1078406|
|             19| 266961|
|             14| 200495|
|              1| 168740|
|             18| 162994|
|            114| 144054|
+---------------+-------+
only showing top 6 rows



#### Given in problem statement that issuer precinct '0' is erroneous entry. so showing top 6 entries. excluding 0, top5 issuer precincts are 19, 14, 1, 18 and 114

### 4. Find the violation code frequencies for three precincts that have issued the most number of tickets.Do these precinct zones have an exceptionally high frequency of certain violation codes? Are these codes common across precincts?

In [141]:
## we got to know from above queries that top3 violation/issuer precincts are 1,14,19. we need to consider top 3 precincts
## that have issued most number of tickets. so they are 19,14,1

In [146]:
NYC_park.filter((col('Issuer Precinct')==1) | (col('Issuer Precinct')==14) | (col('Issuer Precinct')==19)).\
groupBy(NYC_park['Issuer Precinct'],NYC_park['Violation Code']).count().sort(col('count').desc()).show(30)

+---------------+--------------+-----+
|Issuer Precinct|Violation Code|count|
+---------------+--------------+-----+
|             19|            46|48445|
|             14|            14|45036|
|              1|            14|38354|
|             19|            38|36386|
|             19|            37|36056|
|             14|            69|30464|
|             19|            14|29797|
|             19|            21|28415|
|             14|            31|22555|
|              1|            16|19081|
|             14|            47|18364|
|              1|            20|15408|
|             19|            20|14629|
|              1|            46|12745|
|             19|            40|11416|
|             14|            42|10027|
|             19|            16| 9926|
|              1|            38| 8535|
|             14|            46| 7679|
|              1|            17| 7526|
|             19|            71| 7493|
|             14|            19| 7031|
|             19|        

In [143]:
## will check seperately for each of top precinct 1,14,19

In [150]:
NYC_park.filter((col('Violation Precinct')==1) | (col('issuer Precinct')==1)).\
groupBy(NYC_park['Violation Code']).count().sort(col('count').desc()).show(10)

+--------------+-----+
|Violation Code|count|
+--------------+-----+
|            14|40469|
|            16|19279|
|            20|16363|
|            46|13583|
|            38| 8592|
|            17| 7803|
|            37| 6493|
|            19| 6242|
|            31| 6064|
|            69| 5682|
+--------------+-----+
only showing top 10 rows



In [151]:
NYC_park.filter((col('Violation Precinct')==14) | (col('issuer Precinct')==14)).\
groupBy(NYC_park['Violation Code']).count().sort(col('count').desc()).show(10)

+--------------+-----+
|Violation Code|count|
+--------------+-----+
|            14|45952|
|            69|30465|
|            31|22651|
|            47|18692|
|            42|10027|
|            46| 8469|
|            19| 7459|
|            84| 6749|
|            82| 5052|
|            40| 3662|
+--------------+-----+
only showing top 10 rows



In [152]:
NYC_park.filter((col('Violation Precinct')==19) | (col('issuer Precinct')==19)).\
groupBy(NYC_park['Violation Code']).count().sort(col('count').desc()).show(10)

+--------------+-----+
|Violation Code|count|
+--------------+-----+
|            46|50865|
|            38|37484|
|            37|36468|
|            14|30395|
|            21|29415|
|            20|15137|
|            40|11520|
|            16|10007|
|            71| 7569|
|            19| 7069|
+--------------+-----+
only showing top 10 rows



In [154]:
## above queries shows violation code frequencies of the top3 precincts.

#### so common violation code across all the three precincts are 14 and 46, but among top5 violation codes, common violation code is 14.  These precinct zones doesn't have an exceptionally high frequency of certain violation codes.

####       

In [155]:
## Using SQL view

In [160]:
## minor change in column name
for col in NYC_park.columns:
    NYC_park=NYC_park.withColumnRenamed(col,col.replace(" ","_"))

In [161]:
## creating SQL tempview

NYC_park.createOrReplaceTempView("NYC_park_table")

In [162]:
spark.sql("SELECT issuer_precinct,violation_code,count(violation_code) FROM nyc_park_table where issuer_precinct=1"
          + " or issuer_precinct=14 or issuer_precinct=19 group by issuer_precinct,violation_code"
          + " order by count(violation_code) desc").show()

+---------------+--------------+---------------------+
|issuer_precinct|violation_code|count(violation_code)|
+---------------+--------------+---------------------+
|             19|            46|                48445|
|             14|            14|                45036|
|              1|            14|                38354|
|             19|            38|                36386|
|             19|            37|                36056|
|             14|            69|                30464|
|             19|            14|                29797|
|             19|            21|                28415|
|             14|            31|                22555|
|              1|            16|                19081|
|             14|            47|                18364|
|              1|            20|                15408|
|             19|            20|                14629|
|              1|            46|                12745|
|             19|            40|                11416|
|         

In [166]:
## Precinct code = 19
spark.sql('SELECT Violation_Code,count(Violation_Code) as count from NYC_park_table\
 where (Issuer_Precinct = "19" OR Violation_Precinct = "19") group by Violation_Code order by count desc').\
show(5)

## Precinct code = 14
spark.sql('SELECT Violation_Code,count(Violation_Code) as count from NYC_park_table\
 where (Issuer_Precinct = "14" OR Violation_Precinct = "14") group by Violation_Code order by count desc').\
show(5)

## Precinct code = 1
spark.sql('SELECT Violation_Code,count(Violation_Code) as count from NYC_park_table\
 where (Issuer_Precinct = "1" OR Violation_Precinct = "1") group by Violation_Code order by count desc').\
show(5)

+--------------+-----+
|Violation_Code|count|
+--------------+-----+
|            46|50865|
|            38|37484|
|            37|36468|
|            14|30395|
|            21|29415|
+--------------+-----+
only showing top 5 rows

+--------------+-----+
|Violation_Code|count|
+--------------+-----+
|            14|45952|
|            69|30465|
|            31|22651|
|            47|18692|
|            42|10027|
+--------------+-----+
only showing top 5 rows

+--------------+-----+
|Violation_Code|count|
+--------------+-----+
|            14|40469|
|            16|19279|
|            20|16363|
|            46|13583|
|            38| 8592|
+--------------+-----+
only showing top 5 rows



*top 5 violation codes for precinct code 19 are 46,38,37,14 and 21*<br>
*top 5 violation codes for precinct code 14 are 14,69,31,47 and 42*<br>
*top 5 violation codes for precinct code 1 are 14,16,20,46 and 38*<br>

*common violation among top 5 is violation code 14*<br>

In [170]:
## so obtaining same result through SQL also.

### 5. dealing with missing values

In [171]:
spark.sql("SELECT * from NYC_park_table where issuer_precinct is null").count()

0

In [172]:
spark.sql("SELECT * from NYC_park_table where plate_id is null").count()

0

In [173]:
spark.sql("SELECT * from NYC_park_table where summons_number is null or Registration_state is null"
         + " or Issue_date is null or Violation_code is null or vehicle_body_type is null"
         + " or vehicle_make is null or violation_precinct is null or violation_time is null").count()

0

#### so we got count as 0 which means there are no null values in data.

In [174]:
## But from below query, we found there are few rows with nan values. Spark didnt or found them as missing values as they
## present as string nan in csv file and not as blank(empty space). So, spark is giving count 0 for query of missing values.

In [176]:
spark.sql("SELECT * from NYC_park_table where summons_number ='nan' or Registration_state ='nan' "
         + " or Issue_date ='nan' or Violation_code ='nan' or vehicle_body_type = 'nan' "
         + " or vehicle_make= 'nan' or violation_precinct ='nan' or violation_time ='nan' ").show(4)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons_Number|Plate_ID|Registration_State|         Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Violation_Precinct|Issuer_Precinct|Violation_Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    1418425369| JYW5248|                PA|2017-03-21 00:00:00|            21|              SDN|         nan|                48|             48|         0952A|
|    1423945281| 2240653|                IN|2017-05-13 00:00:00|            14|              nan|       FRUEH|                14|            420|         1159A|
|    8546468965| 37489BB|                NY|2017-06-12 00:00:00|            46|              BUS|         nan|                68|             68|         0216P|
|    8583406443| 10217AV|         

*there are rows with nan as string type and not found by spark as missing values.<br>
*so if we need we can deal with those values by dropping them using filter option if we need to drop those because dropna() command doesnt work in this case.<br>

#### The Violation Time field is specified in a strange format. will convert into a time attribute

In [179]:
## Let's check for the unique values of Hours

spark.sql('select distinct(substring(Violation_Time, 1, 2)) FROM NYC_park_table').show(24)

+-------------------------------+
|substring(Violation_Time, 1, 2)|
+-------------------------------+
|                             07|
|                             51|
|                             54|
|                             15|
|                             11|
|                             73|
|                             87|
|                             64|
|                             30|
|                             59|
|                             01|
|                             22|
|                             28|
|                             85|
|                             16|
|                             18|
|                             27|
|                             61|
|                             00|
|                             17|
|                             26|
|                             09|
|                             46|
|                             78|
+-------------------------------+
only showing top 24 rows



In [180]:
## There are some weird hours, so will filter those.

In [183]:
## Filter data with valid violation hour

NYC_park_1 = spark.sql('select * from NYC_park_table where substring(violation_time, 1, 2)\
in ("01","02","03","04","05","06","07","08","09","10","11","12")')

In [184]:
#Create temp sql table
NYC_park_1.createOrReplaceTempView("park_tab1")

In [186]:
## Let's check for the count vs hour of the violation now

spark.sql('select substring(violation_time, 1, 2),count(substring(violation_time, 1, 2))\
from park_tab1 group by (substring(violation_time, 1, 2)) order by count(substring(violation_time, 1, 2)) desc').show(13)

+-------------------------------+--------------------------------------+
|substring(violation_time, 1, 2)|count(substring(violation_time, 1, 2))|
+-------------------------------+--------------------------------------+
|                             09|                                650953|
|                             11|                                603909|
|                             01|                                595356|
|                             08|                                553068|
|                             10|                                531998|
|                             12|                                527252|
|                             02|                                506380|
|                             03|                                346922|
|                             04|                                310530|
|                             07|                                296729|
|                             05|                  

In [187]:
## now valid hours

In [192]:
## Let's derive a new column with valid time format
## For example 0143A is converted to 01:43 AM
## This column is just for reference and not at all needed for seggregation

NYC_park_2 = spark.sql('SELECT *, CONCAT(substring(Violation_Time, 1, 2),":",substring(Violation_Time, 3, 2),\
" ",substring(Violation_Time, 5, 1),"M") as violat_time from park_tab1')

In [193]:
NYC_park_2.show(2)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------+
|Summons_Number|Plate_ID|Registration_State|         Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Violation_Precinct|Issuer_Precinct|Violation_Time|violat_time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------+
|    8478629828| 66623ME|                NY|2017-06-14 00:00:00|            47|             REFG|       MITSU|                14|             14|         1120A|   11:20 AM|
|    5096917368| FZD8593|                NY|2017-06-13 00:00:00|             7|             SUBN|       ME/BE|                 0|              0|         0852P|   08:52 PM|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+------

In [194]:
#Create temp sql table
NYC_park_2.createOrReplaceTempView("park_tab2")

In [196]:
## Now let's seggregate a day into 6 time bins
## We are taking 12AM to 4AM as 1st, 4AM to 8AM as 2nd, 8AM to 12PM 3rd, 12PM to 4PM 4th, 4PM to 8PM 5th and 8Pm to 12 last
##Below is for extracting hour, am or pm info and writing to separate cols

NYC_park_3 = spark.sql('select *, substring(violat_time, 1, 2) as hour_vio,substring(violat_time, 6, 3) as am_pm from park_tab2')

NYC_park_3.createOrReplaceTempView("park_tab3") ## Temp sql 

NYC_park_4 = spark.sql('select *,\
(case when ((hour_vio in ("12","01","02","03")) and (substring(Violation_Time, 5, 1) = "A")) then "12 AM to 04 AM" else\
(case when ((hour_vio in ("12","01","02","03")) and (substring(Violation_Time, 5, 1) = "P")) then "12 PM to 04 PM" else\
(case when ((hour_vio in ("04","05","06","07")) and (substring(Violation_Time, 5, 1) = "A")) then "04 AM to 08 AM" else\
(case when ((hour_vio in ("04","05","06","07")) and (substring(Violation_Time, 5, 1) = "P")) then "04 PM to 08 PM" else\
(case when ((hour_vio in ("08","09","10","11")) and (substring(Violation_Time, 5, 1) = "A")) then "08 AM to 12 PM" else\
"08 PM to 12 AM" end) end) end) end) end) as time_bin from park_tab3')

In [209]:
NYC_park_4.show(2)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------+--------+-----+--------------+
|Summons_Number|Plate_ID|Registration_State|         Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Violation_Precinct|Issuer_Precinct|Violation_Time|violat_time|hour_vio|am_pm|      time_bin|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------+--------+-----+--------------+
|    8478629828| 66623ME|                NY|2017-06-14 00:00:00|            47|             REFG|       MITSU|                14|             14|         1120A|   11:20 AM|      11|   AM|08 AM to 12 PM|
|    5096917368| FZD8593|                NY|2017-06-13 00:00:00|             7|             SUBN|       ME/BE|                 0|              0|         0852P|   08:52 PM|      08|   PM|0

In [203]:
### For each of these groups, find the three most commonly occurring violations.

NYC_park_4.createOrReplaceTempView("park_tab4") ## Temp sql 

##Below is the temp table to make our process easier
time_segg = spark.sql('select time_bin,Violation_Code,count(*) as total_cnt from park_tab4 group by time_bin,Violation_Code \
order by time_bin,total_cnt desc')

time_segg.filter((time_segg["time_bin"] == "04 AM to 08 AM")).sort(desc("total_cnt")).show(3)
time_segg.filter((time_segg["time_bin"] == "04 PM to 08 PM")).sort(desc("total_cnt")).show(3)
time_segg.filter((time_segg["time_bin"] == "12 AM to 04 AM")).sort(desc("total_cnt")).show(3)
time_segg.filter((time_segg["time_bin"] == "12 PM to 04 PM")).sort(desc("total_cnt")).show(3)
time_segg.filter((time_segg["time_bin"] == "08 AM to 12 PM")).sort(desc("total_cnt")).show(3)
time_segg.filter((time_segg["time_bin"] == "08 PM to 12 AM")).sort(desc("total_cnt")).show(3)

+--------------+--------------+---------+
|      time_bin|Violation_Code|total_cnt|
+--------------+--------------+---------+
|04 AM to 08 AM|            14|    74114|
|04 AM to 08 AM|            40|    60652|
|04 AM to 08 AM|            21|    57897|
+--------------+--------------+---------+
only showing top 3 rows

+--------------+--------------+---------+
|      time_bin|Violation_Code|total_cnt|
+--------------+--------------+---------+
|04 PM to 08 PM|            38|   102855|
|04 PM to 08 PM|            14|    75902|
|04 PM to 08 PM|            37|    70345|
+--------------+--------------+---------+
only showing top 3 rows

+--------------+--------------+---------+
|      time_bin|Violation_Code|total_cnt|
+--------------+--------------+---------+
|12 AM to 04 AM|            21|    26444|
|12 AM to 04 AM|            40|    22421|
|12 AM to 04 AM|            78|    13737|
+--------------+--------------+---------+
only showing top 3 rows

+--------------+--------------+---------+
|

#### common violations are 14,21,36,38 among few time_bins but not in all. All time bins have different common violation codes

#### For the three most commonly occurring violation codes, find the most common time of the day

In [206]:
## We know top 3 violation codes are 21,36 and 48 (from above queries)

time_segg.filter((time_segg["Violation_Code"] == "21")).sort(desc("total_cnt")).show(3)
time_segg.filter((time_segg["Violation_Code"] == "36")).sort(desc("total_cnt")).show(3)
time_segg.filter((time_segg["Violation_Code"] == "38")).sort(desc("total_cnt")).show(3)

+--------------+--------------+---------+
|      time_bin|Violation_Code|total_cnt|
+--------------+--------------+---------+
|08 AM to 12 PM|            21|   598069|
|12 PM to 04 PM|            21|    74693|
|04 AM to 08 AM|            21|    57897|
+--------------+--------------+---------+
only showing top 3 rows

+--------------+--------------+---------+
|      time_bin|Violation_Code|total_cnt|
+--------------+--------------+---------+
|08 AM to 12 PM|            36|   348165|
|12 PM to 04 PM|            36|   286284|
|04 AM to 08 AM|            36|    14782|
+--------------+--------------+---------+
only showing top 3 rows

+--------------+--------------+---------+
|      time_bin|Violation_Code|total_cnt|
+--------------+--------------+---------+
|12 PM to 04 PM|            38|   240721|
|08 AM to 12 PM|            38|   176570|
|04 PM to 08 PM|            38|   102855|
+--------------+--------------+---------+
only showing top 3 rows



#### Most common timings for violation code 21,36 are from 8AM to 12PM and for 38, it is from 12PM to 4PM

### 6. Let’s try and find some seasonality in this data:

In [211]:
## using issue_date to segregate into seasons. I am dividing data into 3 seasons- rainy, winter and summer

NYC_park_5 = spark.sql('select *,\
(case when (month(Issue_Date) in ("9","10","11")) then "Fall" else\
((case when (month(Issue_Date) in ("12","1","2")) then "Winter" else\
((case when (month(Issue_Date) in ("3","4","5")) then "Spring" else "Summer" end)) end)) end)\
as season from park_tab4')

In [235]:
## creating/replacing/updating the SQL view

NYC_park_5.createOrReplaceTempView("park_tab5") 

In [236]:
NYC_park_5.show(2)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------+--------+-----+--------------+------+
|Summons_Number|Plate_ID|Registration_State|         Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Violation_Precinct|Issuer_Precinct|Violation_Time|violat_time|hour_vio|am_pm|      time_bin|season|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----------+--------+-----+--------------+------+
|    8478629828| 66623ME|                NY|2017-06-14 00:00:00|            47|             REFG|       MITSU|                14|             14|         1120A|   11:20 AM|      11|   AM|08 AM to 12 PM|Summer|
|    5096917368| FZD8593|                NY|2017-06-13 00:00:00|             7|             SUBN|       ME/BE|                 0|              0|         0852P|

#### frequencies of tickets for each season. 

In [218]:
NYC_park_5.groupBy(NYC_park_5['season']).count().show()

+------+-------+
|season|  count|
+------+-------+
|Spring|2858117|
|Summer| 848440|
|  Fall|    970|
|Winter|1695736|
+------+-------+



#### three most common violations for each of these seasons.

In [251]:
## below are few rows

(NYC_park_5.filter((NYC_park_5["season"] == "Fall"))).groupBy("Violation_Code").agg({"Violation_Code" : "count"}).\
.sort().show(3)

(NYC_park_5.filter((NYC_park_5["season"] == "Spring"))).groupBy("Violation_Code").agg({"Violation_Code" : "count"}).\
show(3)

(NYC_park_5.filter((NYC_park_5["season"] == "Summer"))).groupBy("Violation_Code").agg({"Violation_Code" : "count"}).\
show(3)

(NYC_park_5.filter((NYC_park_5["season"] == "Winter"))).groupBy("Violation_Code").agg({"Violation_Code" : "count"}).\
show(3)

+--------------+---------------------+
|Violation_Code|count(Violation_Code)|
+--------------+---------------------+
|            31|                    2|
|            85|                    3|
|            53|                    6|
+--------------+---------------------+
only showing top 3 rows

+--------------+---------------------+
|Violation_Code|count(Violation_Code)|
+--------------+---------------------+
|            31|                42061|
|            85|                 4929|
|            65|                   14|
+--------------+---------------------+
only showing top 3 rows

+--------------+---------------------+
|Violation_Code|count(Violation_Code)|
+--------------+---------------------+
|            31|                12740|
|            85|                 1165|
|            53|                 3166|
+--------------+---------------------+
only showing top 3 rows

+--------------+---------------------+
|Violation_Code|count(Violation_Code)|
+--------------+------------

In [83]:
spark.sql("SELECT season,violation_code,count(violation_code) FROM Park_tab5"
          + " group by season,violation_code"
          + " order by count(violation_code) desc").show()

+------+--------------+---------------------+
|season|violation_code|count(violation_code)|
+------+--------------+---------------------+
|summer|            21|               767759|
|summer|            36|               662765|
|summer|            38|               542064|
|winter|            36|               497970|
|summer|            14|               476492|
| rainy|            21|               385410|
|winter|            21|               375419|
|summer|            20|               319512|
|summer|            46|               311892|
|summer|            37|               293142|
|summer|            40|               276954|
|winter|            38|               275268|
|summer|            71|               263351|
| rainy|            38|               244972|
| rainy|            36|               239879|
|winter|            14|               215487|
|summer|             7|               210176|
| rainy|            14|               201519|
|winter|             7|           

*We see top3 violation codes are same for spring,summer an dwinter seasons. so we can see that for each of the seasons i.e. winter,summer and rainy 3 most common violation codes are 21,36 and 38*

#### Find the total occurrences of the three most common violation codes.

In [259]:
spark.sql("select violation_code,count(*) as total_occurences from park_tab5 \
group by violation_code order by count(violation_code) desc").show(5)

+--------------+----------------+
|violation_code|total_occurences|
+--------------+----------------+
|            21|          757547|
|            36|          662765|
|            38|          542073|
|            14|          472616|
|            20|          317474|
+--------------+----------------+
only showing top 5 rows



In [254]:
## we know from above that 3 most common violaiton codes are 21,36 and 38

spark.sql("select count(*) as total_occurences from park_tab5 where violation_code = 21 or violation_code =36 or"
         + " violation_code = 38").show()

+----------------+
|total_occurences|
+----------------+
|         1962385|
+----------------+



#### total amount collected for the three violation codes with the maximum tickets

In [260]:
### we know top3 violation codes with max. tickets are 21,36 and 38

In [261]:
### from website given, we know about the fines for each of the violation code

### violation code 21 - average fine is $54.5
### violation code 36 and 38 - average fine is $50

In [265]:
spark.sql("SELECT violation_code,count(violation_code),count(violation_code)*54.5 as fee FROM park_tab5 where violation_code=21 "
          +" group by violation_code").show()

+--------------+---------------------+----------+
|violation_code|count(violation_code)|       fee|
+--------------+---------------------+----------+
|            21|               757547|41286311.5|
+--------------+---------------------+----------+



In [266]:
spark.sql("SELECT violation_code,count(violation_code),count(violation_code)*50 as fee FROM park_tab5 where violation_code=36 "
          +" group by violation_code").show()

+--------------+---------------------+--------+
|violation_code|count(violation_code)|     fee|
+--------------+---------------------+--------+
|            36|               662765|33138250|
+--------------+---------------------+--------+



In [267]:
spark.sql("SELECT violation_code,count(violation_code)*50 as fee FROM park_tab5 where violation_code=38 "
          +" group by violation_code").show()

+--------------+--------+
|violation_code|     fee|
+--------------+--------+
|            38|27103650|
+--------------+--------+



### We can see that violation code 21 has highest total collection

*Final inferences*<br>
As the count of the violation code 21 is high so amount collected also is high eventhough the average fineis 50 whereas for<br>
violation codes 36 and 38 it is 55.<br>
Spring season contains highest tickets followed by winter.<br>
8AM to 12PM seems to be bad one having high tickets*