## NYC Parking Tickets: An Exploratory Analysis using Apache Spark

 The purpose of this case study is to conduct an exploratory data analysis that will help you understand the data that NYC Police Department has collected for parking tickets. Spark will allow us to analyse the full files at high speeds as opposed to taking a series of random samples that will approximate the population.
#### For the scope of this analysis, we will analyse the parking tickets over the year 2017.

In [63]:
__author__ = 'Sweta Singh & Sudhanshu Singh'

## Creating a PySpark Session

In [64]:
# class pyspark.sql.SparkSession, The entry point to programming Spark with the Dataset and DataFrame API.
#A SparkSession can be used create DataFrame and perform several actions on it.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Group Case Study - NYC Parking").getOrCreate()

In [65]:
spark

## Reading Parking Ticket data into a PySpark DataFrame

In [66]:
# Importing data from a csv file located in the HDFS into a PySpark dataframe
NYCparkingDF = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")
# cache dataframe for better performance
NYCparkingDF.cache() 

DataFrame[Summons Number: bigint, Plate ID: string, Registration State: string, Issue Date: timestamp, Violation Code: int, Vehicle Body Type: string, Vehicle Make: string, Violation Precinct: int, Issuer Precinct: int, Violation Time: string]

In [67]:
# printSchema returns schema in tree format
NYCparkingDF.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Violation Time: string (nullable = true)



In [68]:
# summary statistics
NYCparkingDF.describe().show()

+-------+--------------------+--------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+
|summary|      Summons Number|Plate ID|Registration State|    Violation Code| Vehicle Body Type|      Vehicle Make|Violation Precinct|  Issuer Precinct|   Violation Time|
+-------+--------------------+--------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+
|  count|            10803028|10803028|          10803028|          10803028|          10803028|          10803028|          10803028|         10803028|         10803028|
|   mean| 6.817447029065661E9|Infinity|              99.0|34.599430455979565|3.9258887134586864| 6519.974025974026| 45.01216260848347|46.82931211508477|909.2857142857143|
| stddev|2.3202339623282285E9|     NaN|               0.0|19.359868716323483|0.5013415469252523|18091.257389147086|40.552560268435805|62.66703577

In [69]:
# Check the no of rows .
NYCparkingDF.count()

10803028

In [70]:
# Check the Length of columns .
len(NYCparkingDF.columns)

10

In [71]:
# Displaying the first 5 rows of the dataframe
NYCparkingDF.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|         

In [72]:
# Drop Duplicates values if exists. 
NYCparkingDF = NYCparkingDF.dropDuplicates()
NYCparkingDF.count()

10803028

There are no duplicates in this dataframe

## Examine the data

## 1. Find the total number of tickets for the year.

##### Filtering data to select only the tickets which belong to 2017
##### For the scope of this analysis, we will analyse the parking tickets over the year 2017. So we will filter the dataframe for tickets belonging to only 2017.

In [73]:
 #we will also register this as a temporary view so that we can query it with SQL and show off basic transformations in SQL
NYCparkingDF.createOrReplaceTempView("parking_2017")

# Creating a new column called Issue_Yr by taking year part from 'Issue Date'
NYCparking2017DF = spark.sql("SELECT *, substr(`Issue Date`, 1,4) as Issue_Year FROM parking_2017 where substr(`Issue Date`, 1,4) = '2017'")
NYCparking2017DF.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Issue_Year|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|    8523552492|  BY917D|                NY|2017-03-07 00:00:00|            38|             4DSD|       NISSA|                52|             52|         1055A|      2017|
|    8539069866| HHN2773|                NY|2017-05-30 00:00:00|            21|             SUBN|        FORD|                94|             94|         1206P|      2017|
|    8486072323|  XADG70|                NJ|2017-06-19 00:00:00|            14|              VAN|        FORD|                14|           

In [74]:
# Printing total no of tickets before and after filtering for Issue Yr = 2017
from pyspark.sql.functions import count
from pyspark.sql.functions import countDistinct
NYCparkingDF.select(countDistinct("Summons Number").alias("total_tickets_before_filter")).show()
NYCparking2017DF.select(countDistinct("Summons Number").alias("total_tickets_in_2017")).show()

+---------------------------+
|total_tickets_before_filter|
+---------------------------+
|                   10803028|
+---------------------------+

+---------------------+
|total_tickets_in_2017|
+---------------------+
|              5431918|
+---------------------+



#### Total no of tickets in the dataframe = 10803028
#### Total no of tickets in the dataframe for the year 2017 = 5431918

## 2. Find out the number of unique states from where the cars that got parking tickets came.

### Finding out the top 5 States with cars that got highest parking tickets

In [75]:
# Calculating count of tickets after grouping the dataframe by "Registration State" and sorting in descending order
from pyspark.sql.functions import expr, col, column
from pyspark.sql.functions import col, asc, desc
NYCparking2017DF.groupBy("Registration State").agg(expr("count(`Summons Number`)")\
                                                .alias("total_tickets")).sort(desc("total_tickets")).show(5)

+------------------+-------------+
|Registration State|total_tickets|
+------------------+-------------+
|                NY|      4273951|
|                NJ|       475825|
|                PA|       140286|
|                CT|        70403|
|                FL|        69468|
+------------------+-------------+
only showing top 5 rows



 New York has got the highest number of cars with parking tickets.

#### Finding out the distinct values of such States(unique states where the cars getting parking tickets are coming from)
#### and total no of States

In [76]:
spark.sql("SELECT distinct `Registration State` FROM parking_2017").sort(asc("Registration State")).show()
NYCparking2017DF.select(countDistinct("Registration State")).show()

+------------------+
|Registration State|
+------------------+
|                99|
|                AB|
|                AK|
|                AL|
|                AR|
|                AZ|
|                BC|
|                CA|
|                CO|
|                CT|
|                DC|
|                DE|
|                DP|
|                FL|
|                FO|
|                GA|
|                GV|
|                HI|
|                IA|
|                ID|
+------------------+
only showing top 20 rows

+----------------------------------+
|count(DISTINCT Registration State)|
+----------------------------------+
|                                65|
+----------------------------------+



#### Note that there is a numeric entry '99' in the Registration State column, which should be corrected and as we have found it to be NY, replaced with the state having the maximum entries - in this case which is NY

In [77]:
#Replacing '99' by 'NY'
from pyspark.sql.functions import when

NYCparking2017DF = NYCparking2017DF.withColumn("Registration State", \
                                       when(NYCparking2017DF["Registration State"] == 99, 'NY').\
                                       otherwise(NYCparking2017DF["Registration State"]))

NYCparking2017DF.select(countDistinct("Registration State")).show()

+----------------------------------+
|count(DISTINCT Registration State)|
+----------------------------------+
|                                64|
+----------------------------------+



#### After replacing '99' by NY, total no of unique states = 64

In [78]:
# update the SQL view to record the changes done to the dataframe
NYCparking2017DF.createOrReplaceTempView("parking_2017")

#### Deleting null values

In [79]:
dropNullDF = NYCparking2017DF.na.drop()

In [80]:
#from pyspark.sql.functions import countDistinct
NYCparking2017DF.select(countDistinct("Summons Number")).show()
dropNullDF.select(countDistinct("Summons Number")).show()

+------------------------------+
|count(DISTINCT Summons Number)|
+------------------------------+
|                       5431918|
+------------------------------+

+------------------------------+
|count(DISTINCT Summons Number)|
+------------------------------+
|                       5431918|
+------------------------------+



There are no null values in the dataframe.

## Aggregation tasks

### 1. How often does each violation code occur? Display the frequency of the top five violation codes.

#### Grouping the data based on Violation Code and displaying the top 5 occurences

In [81]:
from pyspark.sql.functions import sum, count, avg, expr
NYCparking2017DF.groupBy("Violation Code").agg(expr("count(`Summons Number`)").\
                                           alias("total_violations")).sort(desc("total_violations")).show(5)

+--------------+----------------+
|Violation Code|total_violations|
+--------------+----------------+
|            21|          768087|
|            36|          662765|
|            38|          542079|
|            14|          476664|
|            20|          319646|
+--------------+----------------+
only showing top 5 rows



#### Above table shows the total violations against each code, top 5 are present in the answer. Violation Code 21 occured the maximum number of times

### 2. How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? (Hint: Find the top 5 for both.)

#### Grouping the data based on Vehicle Body Type and Vehicle Make and displaying the top 5 results for both

In [82]:
NYCparking2017DF.groupBy("Vehicle Body Type").agg(expr("count(`Summons Number`)").\
                                           alias("total_violations")).sort(desc("total_violations")).show(5)
                                           
NYCparking2017DF.groupBy("Vehicle Make").agg(expr("count(`Summons Number`)").\
                                           alias("total_violations")).sort(desc("total_violations")).show(5)                                           

+-----------------+----------------+
|Vehicle Body Type|total_violations|
+-----------------+----------------+
|             SUBN|         1883954|
|             4DSD|         1547312|
|              VAN|          724029|
|             DELV|          358984|
|              SDN|          194197|
+-----------------+----------------+
only showing top 5 rows

+------------+----------------+
|Vehicle Make|total_violations|
+------------+----------------+
|        FORD|          636844|
|       TOYOT|          605291|
|       HONDA|          538884|
|       NISSA|          462017|
|       CHEVR|          356032|
+------------+----------------+
only showing top 5 rows



#### Above shows the top results for both Vehicle Body Type and Vehicle Make with total number of violations each made for the top 5 kinds of each.
'SUBN' and 'FORD' are responsible for creating maximum violations in Vehicle Body make and vehicle make categories.

### 3. A precinct is a police station that has a certain zone of the city under its command. Find the (5 highest) frequencies of tickets for each of the following: 
###### 1.'Violation Precinct' (This is the precinct of the zone where the violation occurred). Using this, can you draw any insights for parking violations in any specific areas of the city?
###### 2.'Issuer Precinct' (This is the precinct that issued the ticket.)
Here, you would have noticed that the dataframe has the'Violating Precinct' or 'Issuing Precinct' as '0'. These are erroneous entries. Hence, you need to provide the records for five correct precincts. (Hint: Print the top six entries after sorting.)

##### Grouping the data based on 'Violation Precinct' & 'Issuer Precinct' and displaying the top 6 occurences

In [83]:
NYCparking2017DF.groupBy("Violation Precinct").agg(expr("count(`Summons Number`)").\
                                           alias("total_violations")).sort(desc("total_violations")).show(6)
                                           
NYCparking2017DF.groupBy("Issuer Precinct").agg(expr("count(`Summons Number`)").\
                                           alias("total_violations")).sort(desc("total_violations")).show(6)

+------------------+----------------+
|Violation Precinct|total_violations|
+------------------+----------------+
|                 0|          925596|
|                19|          274445|
|                14|          203553|
|                 1|          174702|
|                18|          169131|
|               114|          147444|
+------------------+----------------+
only showing top 6 rows

+---------------+----------------+
|Issuer Precinct|total_violations|
+---------------+----------------+
|              0|         1078406|
|             19|          266961|
|             14|          200495|
|              1|          168740|
|             18|          162994|
|            114|          144054|
+---------------+----------------+
only showing top 6 rows



#### 19th Violation Precinct recoreded the maximum number of violations i.e. 925596 and 19th Precinct itself issued the maximum violations. 0 simply mean that the data wasn't filled so 19th tops as per the true data we have.

### 4. Find the violation code frequencies for three precincts that have issued the most number of tickets. Do these precinct zones have an exceptionally high frequency of certain violation codes? Are these codes common across precincts? 
(Hint: In the SQL view, use the 'where' attribute to filter among three precincts.)

##### Displaying the three precincts that have issued the most number of tickets.

In [84]:
spark.sql("SELECT `Issuer Precinct`, COUNT(*) AS total_violation \
FROM parking_2017 \
WHERE `Issuer Precinct` != 0 \
group by `Issuer Precinct` \
order by total_violation desc limit 3").show()

+---------------+---------------+
|Issuer Precinct|total_violation|
+---------------+---------------+
|             19|         266961|
|             14|         200495|
|              1|         168740|
+---------------+---------------+



##### Displaying the 5 most commonly occuring violation codes from the three precincts that have issued the most number of tickets.

In [85]:
spark.sql("select `Issuer Precinct`, `Violation Code`, total_violation, RANK  from \
          (select *, row_number() over (PARTITION by `Issuer Precinct` order by total_violation desc) as RANK from \
            (SELECT b.`Violation Code`, b.`Issuer Precinct`, COUNT(*) AS total_violation \
            FROM (SELECT `Issuer Precinct`, COUNT(*) AS total_violation \
            FROM parking_2017 \
            WHERE `Issuer Precinct` != 0 \
            group by `Issuer Precinct` \
            order by total_violation desc limit 3) as a \
            inner join parking_2017 as b on a.`Issuer Precinct` = b.`Issuer Precinct` \
            group by b.`Violation Code`,b.`Issuer Precinct`) as c) where RANK < 6").show()

+---------------+--------------+---------------+----+
|Issuer Precinct|Violation Code|total_violation|RANK|
+---------------+--------------+---------------+----+
|              1|            14|          38354|   1|
|              1|            16|          19081|   2|
|              1|            20|          15408|   3|
|              1|            46|          12745|   4|
|              1|            38|           8535|   5|
|             19|            46|          48445|   1|
|             19|            38|          36386|   2|
|             19|            37|          36056|   3|
|             19|            14|          29797|   4|
|             19|            21|          28415|   5|
|             14|            14|          45036|   1|
|             14|            69|          30464|   2|
|             14|            31|          22555|   3|
|             14|            47|          18364|   4|
|             14|            42|          10027|   5|
+---------------+-----------

##### The most commonly occuring violation code for different Precincts are -
 - Precinct 1 -> 14 (Much higher than the second ranking violation code, almost twice)
 - Precinct 14 -> 14 
 - Precinct 19 -> 46 ( Quite higher than the second ranhked violation code)
 
### ---*Note from above that the violation code - 14 seems to be a common among the top 5 most occuring violation codes for the 3 precincts

### 5. Find out the properties of parking violations across different times of the day:
 - Find a way to deal with missing values, if any.
(Hint: Check for the null values using 'isNull' under the SQL. Also, to remove the null values, check the 'dropna' command in the API documentation.)

 - The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to divide into groups.

 - Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. For each of these groups, find the three most commonly occurring violations.
(Hint: Use the CASE-WHEN in SQL view to segregate into bins. To find the most commonly occurring violations, you can use an approach similar to the one mentioned in the hint for question 4.)

 - Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).

##### Finding out the number of occurences of the following in the dataframe -

 - NULL
 - BLANK
 - Nan/nan

In [86]:
dropNullDF1 = NYCparking2017DF.filter(col("Violation Time").isNull())
dropNullDF2 = NYCparking2017DF.where("`Violation Time` == ''")
dropNullDF3 = NYCparking2017DF.where((col("Violation Time") == "nan") | (col("Violation Time") == "Nan"))
dropNullDF1.select(countDistinct("Violation Time"), count("Violation Time")).show()
dropNullDF2.select(countDistinct("Violation Time"), count("Violation Time")).show()
dropNullDF3.select(countDistinct("Violation Time"), count("Violation Time")).show()


+------------------------------+---------------------+
|count(DISTINCT Violation Time)|count(Violation Time)|
+------------------------------+---------------------+
|                             0|                    0|
+------------------------------+---------------------+

+------------------------------+---------------------+
|count(DISTINCT Violation Time)|count(Violation Time)|
+------------------------------+---------------------+
|                             0|                    0|
+------------------------------+---------------------+

+------------------------------+---------------------+
|count(DISTINCT Violation Time)|count(Violation Time)|
+------------------------------+---------------------+
|                             1|                   16|
+------------------------------+---------------------+



There are 16 records in the dataframe that have the value 'Nan' or 'nan' in the column 'Violation Time'. We will be dropping these records from the dataframe.

In [87]:
NYCparking2017DF = NYCparking2017DF.filter((col("Violation Time") != "nan") & (col("Violation Time") != "Nan"))

NYCparking2017DF.select(countDistinct("Violation Time"), count("Violation Time")).show()

+------------------------------+---------------------+
|count(DISTINCT Violation Time)|count(Violation Time)|
+------------------------------+---------------------+
|                          1625|              5431902|
+------------------------------+---------------------+



##### After dropping 'Nan/nan' records from the dataframe, there are 5431902 records left.

##### The Violation Time field is specified in a strange format (Eg. 1120A). We will be  transforming it as per the below logic in a new column Violation Time Fixed:

 - If the `Violation Time` value ends with 'A', then we will ignore the alphabet 'A'. Eg. 1120A -> 1120 as A here is AM.
 
 - If the `Violation Time` value ends with 'P', then there are 2 scenarios: 
 
  1. If he first 2 digits are '12' then we concatenate '00' with the 3rd and 4th digit and ignore the alphabet 'P'. Eg. 1220P ->      0020.
  2. For all other types of occurences, ie. if the `Violation Time` value ends with 'P', and the first 2 digits aren't '12' then      we add '12' to the first 2 digits and concatenate it with the 3rd and 4th digit and ignore the alphabet 'P'. Eg. 0852P ->        2052.

In [88]:
NYCparking2017DF = spark.sql("SELECT *, \
                        CASE WHEN substr(`Violation Time`, -1,1) == 'A' \
                            THEN substr(`Violation Time`, 1,4) \
                            WHEN substr(`Violation Time`, -1,1) == 'P' AND substr(`Violation Time`, 1,2) == '12'\
                            THEN CONCAT('00',substr(`Violation Time`, 3,2)) \
                            ELSE CONCAT(CAST((substr(`Violation Time`, 1,2) + 12) as int),substr(`Violation Time`, 3,2)) \
                            END AS `Violation Time Fixed` \
FROM parking_2017")
NYCparking2017DF.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+--------------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Issue_Year|Violation Time Fixed|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+--------------------+
|    8523552492|  BY917D|                NY|2017-03-07 00:00:00|            38|             4DSD|       NISSA|                52|             52|         1055A|      2017|                1055|
|    8539069866| HHN2773|                NY|2017-05-30 00:00:00|            21|             SUBN|        FORD|                94|             94|         1206P|      2017|                0006|
|    8486072323|  XADG70|          

In [89]:
# update the SQL view to record the changes done to the dataframe
NYCparking2017DF.createOrReplaceTempView("parking_2017")

##### Dividing the 24 hours into six equal discrete bins of time based on the new column Violation Time Fixed:
 - Between 0 and 400 --> Dawn
 - Between 400 and 800 --> Early Morning
 - Between 800 and 1200 --> Morning 
 - Between 1200 and 1600 --> Afternoon
 - Between 1600 and 2000 --> Evening
 - Between 2000 and 2400 --> Night
 - For everything else --> unknown

In [90]:
NYCparking2017DF = spark.sql("SELECT *, \
                        CASE WHEN `Violation Time Fixed` >= 0 and `Violation Time Fixed` < 400\
                            THEN 'Dawn' \
                            WHEN `Violation Time Fixed` >= 400 and `Violation Time Fixed` < 800\
                            THEN 'Early Morning' \
                            WHEN `Violation Time Fixed` >= 800 and `Violation Time Fixed` < 1200\
                            THEN 'Morning' \
                            WHEN `Violation Time Fixed` >= 1200 and `Violation Time Fixed` < 1600\
                            THEN 'Afternoon' \
                            WHEN `Violation Time Fixed` >= 1600 and `Violation Time Fixed` < 2000\
                            THEN 'Evening' \
                            WHEN `Violation Time Fixed` >= 2000 and `Violation Time Fixed` <= 2400\
                            THEN 'Night' \
                            ELSE 'unknown' \
                            END AS `Violation Time Span` \
FROM parking_2017")
NYCparking2017DF.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+--------------------+-------------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Issue_Year|Violation Time Fixed|Violation Time Span|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+--------------------+-------------------+
|    4635192880| GGB4757|                NY|2017-06-16 00:00:00|            36|             4DSD|       TOYOT|                 0|              0|         1017A|      2017|                1017|            Morning|
|    4635220096| AHV5126|                NY|2017-06-16 00:00:00|            36|             SUBN|         KIA|                 0|              0|   

In [91]:
# update the SQL view to record the changes done to the dataframe
NYCparking2017DF.createOrReplaceTempView("parking_2017")

##### Checking the count of records where Violation Time Span = unknown

In [92]:
spark.sql("select `Violation Time`,`Violation Time Fixed`, `Violation Time Span`  \
from parking_2017 where `Violation Time Span` == 'unknown'").show(5)

spark.sql("select count(*) from parking_2017 where `Violation Time Span` == 'unknown'").show(5)

+--------------+--------------------+-------------------+
|Violation Time|Violation Time Fixed|Violation Time Span|
+--------------+--------------------+-------------------+
|         2203P|                3403|            unknown|
|         5028P|                6228|            unknown|
|         115+A|                115+|            unknown|
|           nan|                null|            unknown|
|         5620P|                6820|            unknown|
+--------------+--------------------+-------------------+
only showing top 5 rows

+--------+
|count(1)|
+--------+
|      89|
+--------+



##### There are 89 records where Violation Time Span = unknown. These are essentially faulty records and need to be removed. For eg, Violation Time values for some of these records are `6815P`, `110+A`, `093+A`, etc. Thus these records could not be binned into Violation Time Span.

In [93]:
NYCparking2017DF = NYCparking2017DF.filter(col("`Violation Time Span`") != "unknown")
NYCparking2017DF.show(5)
NYCparking2017DF.select(countDistinct("Violation Time Span"), count("Violation Time Span")).show()

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+--------------------+-------------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Issue_Year|Violation Time Fixed|Violation Time Span|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+--------------------+-------------------+
|    8497054040| 31416MG|                NY|2017-03-13 00:00:00|            47|             DELV|       FRUEH|                13|             13|         0148P|      2017|                1348|          Afternoon|
|    8510966837| 60978MC|                NY|2017-06-01 00:00:00|            69|              VAN|       CHEVR|                19|             19|   

##### After removal of faulty records, the no of rows in the dataframe  = 5431829

In [94]:
# update the SQL view to record the changes done to the dataframe
NYCparking2017DF.createOrReplaceTempView("parking_2017")

##### Displaying the 3 most commonly occuring violation codes for each of the 6 different Violation Time Spans

In [95]:
spark.sql("select `Violation Time Span`, `Violation Code`, no_of_violations from \
         (select *, row_number() over (PARTITION by `Violation Time Span` order by no_of_violations desc) as num from \
         (select `Violation Time Span`, `Violation Code`, count(*) as no_of_violations \
         from parking_2017 group by `Violation Time Span`, `Violation Code`) as a) \
         where num < 4").show()

+-------------------+--------------+----------------+
|Violation Time Span|Violation Code|no_of_violations|
+-------------------+--------------+----------------+
|            Evening|            38|          102855|
|            Evening|            14|           75902|
|            Evening|            37|           70345|
|               Dawn|            21|          107065|
|               Dawn|            36|          101991|
|               Dawn|            38|           56204|
|            Morning|            21|          598062|
|            Morning|            36|          348165|
|            Morning|            38|          176570|
|      Early Morning|            14|           74113|
|      Early Morning|            40|           60652|
|      Early Morning|            21|           57894|
|          Afternoon|            38|          184829|
|          Afternoon|            36|          184293|
|          Afternoon|            37|          130692|
|              Night|       

##### Displaying the most common Violation Time Span of the day for the three most commonly occurring violation codes

In [96]:
spark.sql("select`Violation Code`,`Violation Time Span`,total_violation from \
          (select *, row_number() over (PARTITION by `Violation Code` order by total_violation desc) as num from \
            (SELECT b.`Violation Code`, b.`Violation Time Span`, COUNT(*) AS total_violation \
            FROM (SELECT `Violation Code`, COUNT(*) AS total_violation \
            FROM parking_2017 \
            group by `Violation Code` \
            order by total_violation desc limit 3) as a \
            inner join parking_2017 as b on a.`Violation Code` = b.`Violation Code` \
            group by b.`Violation Code`,b.`Violation Time Span`) as c) where num < 2").show()

+--------------+-------------------+---------------+
|Violation Code|Violation Time Span|total_violation|
+--------------+-------------------+---------------+
|            38|          Afternoon|         184829|
|            21|            Morning|         598062|
|            36|            Morning|         348165|
+--------------+-------------------+---------------+



### 6. Let’s try and find some seasonality in this data:

 - First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season. (Hint: Use Issue Date to segregate into seasons.)

 - Then, find the three most common violations for each of these seasons. (Hint: You can use an approach similar to the one mentioned in the hint for question 4.)

In [97]:
# update the SQL view to record the changes done to the dataframe
NYCparking2017DF.createOrReplaceTempView("parking_2017")

##### Dividing the year into following number of seasons based on the column Issue Date into a new column  Issue Season:
 - Month between 3 and 5 --> Spring
 - Month between 6 and 8 --> Summer
 - Month between 9 and 11 --> Fall
 - Else --> Winter

In [98]:
NYCparking2017DF = spark.sql("SELECT *, \
CASE WHEN substr(`Issue Date`,6,2) between 3 and 5 \
    THEN 'Spring' \
    WHEN substr(`Issue Date`,6,2) between 6 and 8 \
    THEN 'Summer' \
    WHEN substr(`Issue Date`,6,2) between 9 and 11 \
    THEN 'Fall' \
    ELSE 'Winter' \
    END AS `Issue Season` \
                        FROM parking_2017")
NYCparking2017DF.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+--------------------+-------------------+------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Issue_Year|Violation Time Fixed|Violation Time Span|Issue Season|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+--------------------+-------------------+------------+
|    8509712827| HNP9733|                NY|2017-03-19 00:00:00|            53|             SUBN|       TOYOT|                 5|              5|         1109A|      2017|                1109|            Morning|      Spring|
|    8550208723| 42181JM|                NY|2017-06-20 00:00:00|            10|              VAN

In [99]:
# update the SQL view to record the changes done to the dataframe
NYCparking2017DF.createOrReplaceTempView("parking_2017")

In [100]:
spark.sql("select `Issue Season`, `Violation Code`, no_of_violations from \
         (select *, row_number() over (PARTITION by `Issue Season` order by no_of_violations desc) as num from \
         (select `Issue Season`, `Violation Code`, count(*) as no_of_violations \
         from parking_2017 group by `Issue Season`, `Violation Code`) as a) \
         where num < 4").show()

+------------+--------------+----------------+
|Issue Season|Violation Code|no_of_violations|
+------------+--------------+----------------+
|      Spring|            21|          402401|
|      Spring|            36|          344834|
|      Spring|            38|          271167|
|      Summer|            21|          127345|
|      Summer|            36|           96663|
|      Summer|            38|           83518|
|        Fall|            46|             231|
|        Fall|            21|             128|
|        Fall|            40|             116|
|      Winter|            21|          238180|
|      Winter|            36|          221268|
|      Winter|            38|          187385|
+------------+--------------+----------------+



### 7. The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:
 - Find the total occurrences of the three most common violation codes.
 - Then, visit the website: http://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page . It lists the fines associated with different violation codes. They’re divided into two categories: one for the highest-density locations in the city and the other for the rest of the city. For the sake of simplicity, take the average of the two.
 - Using this information, find the total amount collected for the three violation codes with the maximum tickets. State the code that has the highest total collection.
 - What can you intuitively infer from these findings?

##### Finding the total occurrences of the three most common violation codes.

In [101]:
top3violationcodes = spark.sql("select `Violation Code`, count(*) as no_of_violations \
         from parking_2017 group by `Violation Code` order by no_of_violations desc limit 3")
top3violationcodes.show()

+--------------+----------------+
|Violation Code|no_of_violations|
+--------------+----------------+
|            21|          768054|
|            36|          662765|
|            38|          542078|
+--------------+----------------+



In [102]:
# register DataFrame as temp table
top3violationcodes.createOrReplaceTempView("violation_codes_df") 


##### Listing the fines associated with the 3 different violation codes : one for the highest-density locations in the city and the other for the rest of the city.

In [103]:
top3violationcodes = spark.sql("SELECT *, \
                                CASE WHEN `Violation Code` == 21 \
                                THEN  65 \
                                WHEN `Violation Code` == 36 \
                                THEN 50 \
                                ELSE 65 \
                                END AS `High_Density_Loc_Fine`, \
                                CASE WHEN `Violation Code` == 21 \
                                THEN  45 \
                                WHEN `Violation Code` == 36 \
                                THEN 50 \
                                ELSE 35 \
                                END AS `Rest_of_City_Fine` \
                                FROM violation_codes_df")
top3violationcodes.show(5)           

+--------------+----------------+---------------------+-----------------+
|Violation Code|no_of_violations|High_Density_Loc_Fine|Rest_of_City_Fine|
+--------------+----------------+---------------------+-----------------+
|            21|          768054|                   65|               45|
|            36|          662765|                   50|               50|
|            38|          542078|                   65|               35|
+--------------+----------------+---------------------+-----------------+



In [104]:
# update the SQL view to record the changes done to the dataframe

top3violationcodes.createOrReplaceTempView("violation_codes_df") 


##### Taking the average of the 2 categories of fines for each of the 3 different violation codes.

In [105]:
top3violationcodes = spark.sql("SELECT *, \
                                 cast((`High_Density_Loc_Fine` + `Rest_of_City_Fine`)/2 as int) AS `Average Fine` \
                                FROM violation_codes_df")
top3violationcodes.show(5)           

+--------------+----------------+---------------------+-----------------+------------+
|Violation Code|no_of_violations|High_Density_Loc_Fine|Rest_of_City_Fine|Average Fine|
+--------------+----------------+---------------------+-----------------+------------+
|            21|          768054|                   65|               45|          55|
|            36|          662765|                   50|               50|          50|
|            38|          542078|                   65|               35|          50|
+--------------+----------------+---------------------+-----------------+------------+



In [106]:
# update the SQL view to record the changes done to the dataframe

top3violationcodes.createOrReplaceTempView("violation_codes_df") 


##### Finding the total amount collected for the three violation codes with the maximum tickets by multiplying the average fine per violation to the total occurences of each violation

In [107]:
spark.sql("SELECT *, `Average Fine` * no_of_violations as `Total Fine` FROM violation_codes_df").show()

+--------------+----------------+---------------------+-----------------+------------+----------+
|Violation Code|no_of_violations|High_Density_Loc_Fine|Rest_of_City_Fine|Average Fine|Total Fine|
+--------------+----------------+---------------------+-----------------+------------+----------+
|            21|          768054|                   65|               45|          55|  42242970|
|            36|          662765|                   50|               50|          50|  33138250|
|            38|          542078|                   65|               35|          50|  27103900|
+--------------+----------------+---------------------+-----------------+------------+----------+



### Violation Code - 21 has the highest total collection in terms of fine

### Stopping the SparkContext Object

In [108]:
spark.stop()