# NY Parking Assignment

## Setting up environment variables and parameters

In [1]:
# Below parameters needs to be validated with respective environments

import os
import sys
os.environ["PYSPARK_PYTHON"] = "/opt/cloudera/parcels/Anaconda/bin/python"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_232-cloudera/jre" 
os.environ["SPARK_HOME"]="/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/" 
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] +"/py4j-0.10.6-src.zip")
sys.path.insert(0, os.environ["PYLIB"] +"/pyspark.zip")

## Setting up Spark Session and Context

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('NY_Parking_Assignment').master("local").getOrCreate()
spark

## Pulling the data from S3 bucket to local dataframe will bring the default datatypes as shown below

In [3]:
s3path = 's3a://upgrad-data/Parking_Violation_Tickets.csv'
tickets = spark.read.format("csv").option("header", "true").load(s3path)


In [4]:
tickets.printSchema()

root
 |-- Summons Number: string (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: string (nullable = true)
 |-- Street Code2: string (nullable = true)
 |-- Street Code3: string (nullable = true)
 |-- Vehicle Expiration Date: string (nullable = true)
 |-- Violation Location: string (nullable = true)
 |-- Violation Precinct: string (nullable = true)
 |-- Issuer Precinct: string (nullable = true)
 |-- Issuer Code: string (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation County: str

## As shown above the metadata extracted will have the default datatype , and above all columns may not be needed so a new data frame will be created based on the requirement of the analysis 

In [5]:
#Maintaing oly relevant columns for analysis
tickets_with_relevant_columns=tickets.select("Summons Number","Plate ID","Registration State","Issue Date",
                                             "Violation Code","Vehicle Body Type","Vehicle Make")

## Let's keep the datatype in sync with the data for the relevant columns

In [6]:
from pyspark.sql.types import LongType,IntegerType,StringType;
from pyspark.sql.functions import *

tickets_with_relevant_columns = tickets_with_relevant_columns \
.withColumn("Summons Number",tickets_with_relevant_columns["Summons Number"].cast(LongType())) \
.withColumn("Violation Code",tickets_with_relevant_columns["Violation Code"].cast(IntegerType())) \
.withColumn("Issue Date",unix_timestamp(tickets_with_relevant_columns["Issue Date"],'MM/dd/yyyy').cast('timestamp'))



## Updated datatype of the analysis dataset 

In [7]:
tickets_with_relevant_columns.printSchema()
ny_analysis=tickets_with_relevant_columns



root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)



### Below is executed for performance improvement

In [8]:

#Caching to improve performance for the remainder of the queries as this is the base
#When this cell is run the entire notebook takes 6 min and 23 seconds on average to be executed successfully
##When this cell is not run the entire notebook takes 23 min and 54 seconds on average to be executed successfully
ny_analysis.cache()
ny_analysis.show(3)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUBN|        FORD|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+
only showing top 3 rows



## Creating Temporary table


In [8]:
ny_analysis.registerTempTable("ny_parking");

## Below are the Analysis done for the exploitary Questions
## Examine the data

### Question 1:Find the total number of tickets for the year.

In [9]:
# Using dataframes -->Result 10803028
ny_analysis.count()

10803028

In [10]:
# Using sparkSQL
spark.sql("select count(1) as count from ny_parking").show();

+--------+
|   count|
+--------+
|10803028|
+--------+



### Solution 1 : The total number of tickets for the year was 10803028 

### Question 2: Find out the number of unique states from where the cars that got parking tickets came. (Hint: Use the column 'Registration  State'.) There is a numeric entry '99' in the column, which should be corrected. Replace it with the state having the maximum entries. Provide the number of unique states again.

#### Number of unique states from where the cars that got parking tickets came before data cleaning

In [11]:
#grouping the dataset based onthe state and counting the distinct values
ny_analysis.select(ny_analysis["Registration State"]) \
            .agg(countDistinct(ny_analysis["Registration State"]).alias("Total_States_Before_Cleanup")) \
            .show()

+---------------------------+
|Total_States_Before_Cleanup|
+---------------------------+
|                         67|
+---------------------------+



### Solution : Before Data Clean up ,the total number of unique states from where the cars that got parking tickets came was 67

### Below tasks are done to clean the data as per the expectation

#### Below script is written to find the State with maximum tickets before cleanup

In [12]:
#grouping the states based on tickets aquired and sorting it to find state with max tickets 

ny_analysis.groupby(ny_analysis["Registration State"]) \
            .agg(count(ny_analysis["Summons Number"]).alias("Tickets Count")) \
            .sort(col("Tickets Count"),ascending=False) \
            .show();




+------------------+-------------+
|Registration State|Tickets Count|
+------------------+-------------+
|                NY|      8481061|
|                NJ|       925965|
|                PA|       285419|
|                FL|       144556|
|                CT|       141088|
|                MA|        85547|
|                IN|        80749|
|                VA|        72626|
|                MD|        61800|
|                NC|        55806|
|                IL|        37329|
|                GA|        36852|
|                99|        36625|
|                TX|        36516|
|                AZ|        26426|
|                OH|        25302|
|                CA|        24260|
|                SC|        21836|
|                ME|        21574|
|                MN|        18227|
+------------------+-------------+
only showing top 20 rows



#### Updating the datapoint of Registration State of 99 with NY as it has the most number of tickets

In [13]:
# Without When and otherwise
# ny_analysis_data_updated = ny_analysis.withColumn("New Registration State", \
#                                      regexp_replace(ny_analysis["Registration State"],"99","NY"));

# With When and otherwise
# ny_analysis_data_updated = ny_analysis.withColumn("New Registration State", \
#                                     when(ny_analysis["Registration State"]=="99", \
#                                      regexp_replace(ny_analysis["Registration State"],"99","NY")) \
#                                     .otherwise(ny_analysis["Registration State"]))


#Using below as getting the results a lil faster
ny_analysis_data_updated = ny_analysis.withColumn("Registration State", \
                                    when(ny_analysis["Registration State"]=="99", \
                                     regexp_replace(ny_analysis["Registration State"],"99","NY")) \
                                    .otherwise(ny_analysis["Registration State"]))

#### The datapoint is updated and now total count of NY cases has increased

In [14]:
ny_analysis_data_updated.filter(ny_analysis_data_updated["Registration State"] =="NY").count()

8517686

#### Number of unique states from where the cars that got parking tickets came after data cleaning

In [15]:
ny_analysis_data_updated.select(ny_analysis_data_updated["Registration State"]).agg(countDistinct(ny_analysis_data_updated["Registration State"]).alias("Total_States_After_Cleanup")).show()

+--------------------------+
|Total_States_After_Cleanup|
+--------------------------+
|                        66|
+--------------------------+



### Solution 2: After Data Clean up ,the total number of unique states from where the cars that got parking tickets came was 66

### Question 3 :Display the top 20 states with the most number of tickets along with their ticket count.(After Cleanup)

In [16]:
#Same script as line 69 only running it post data cleanup
ny_analysis_data_updated.groupby(ny_analysis_data_updated["Registration State"]) \
            .agg(count(ny_analysis_data_updated["Summons Number"]).alias("Tickets Count")) \
            .sort(col("Tickets Count"),ascending=False) \
            .show();


+------------------+-------------+
|Registration State|Tickets Count|
+------------------+-------------+
|                NY|      8517686|
|                NJ|       925965|
|                PA|       285419|
|                FL|       144556|
|                CT|       141088|
|                MA|        85547|
|                IN|        80749|
|                VA|        72626|
|                MD|        61800|
|                NC|        55806|
|                IL|        37329|
|                GA|        36852|
|                TX|        36516|
|                AZ|        26426|
|                OH|        25302|
|                CA|        24260|
|                SC|        21836|
|                ME|        21574|
|                MN|        18227|
|                OK|        18165|
+------------------+-------------+
only showing top 20 rows



### Solution 3 : Above is the list of states with most number of tickets 

In [17]:
#### Setting up temporary table 
ny_analysis_data_updated.registerTempTable("ny_parking_updated");

## Aggregation Tasks

### Question 1: How often does each violation code occur? Display the frequency of the top five violation codes.

In [18]:
# Using Dataframes : Grouping the violation code based on the number of ticket to get tickets per code
ny_analysis_data_updated.groupby(ny_analysis_data_updated["Violation Code"]) \
                    .agg(count(ny_analysis_data_updated["Summons Number"]).alias("Frequency_Violation_Code")) \
                    .sort(col("Frequency_Violation_Code"),ascending=False) \
                    .show(5)

+--------------+------------------------+
|Violation Code|Frequency_Violation_Code|
+--------------+------------------------+
|            21|                 1528588|
|            36|                 1400614|
|            38|                 1062304|
|            14|                  893498|
|            20|                  618593|
+--------------+------------------------+
only showing top 5 rows



In [19]:
# Using sparkSQL : : Grouping the violation code based on the number of ticket to get tickets per code
spark.sql("select `Violation Code`,count(`Summons Number`) as Frequency_Violation_Code from ny_parking_updated group by `Violation Code` order by count(`Summons Number`) desc ").show(5);

+--------------+------------------------+
|Violation Code|Frequency_Violation_Code|
+--------------+------------------------+
|            21|                 1528588|
|            36|                 1400614|
|            38|                 1062304|
|            14|                  893498|
|            20|                  618593|
+--------------+------------------------+
only showing top 5 rows



### Solution 1: Above the top 5 violation codes among which code 21 has occured the most number of times

### Question 2: How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? Find the top 5 for both.

#### Vehicle Body Type Ticket Frequency

In [20]:
# Using Dataframes based on vehicle body type tickets recieved frequency
ny_analysis_data_updated.groupby(ny_analysis_data_updated["Vehicle Body Type"]) \
                .agg(count(ny_analysis_data_updated["Summons Number"]).alias("Tickets_Recieved")) \
                .sort(col("Tickets_Recieved"),ascending=False) \
                .show(5)
    

+-----------------+----------------+
|Vehicle Body Type|Tickets_Recieved|
+-----------------+----------------+
|             SUBN|         3719802|
|             4DSD|         3082020|
|              VAN|         1411970|
|             DELV|          687330|
|              SDN|          438191|
+-----------------+----------------+
only showing top 5 rows



In [21]:
# Using SparkSQL based on vehicle body type tickets recieved frequency
spark.sql("select `Vehicle Body Type`,count(`Summons Number`) as Tickets_Recieved from ny_parking_updated group by `Vehicle Body Type` order by count(`Summons Number`) desc").show(5);



+-----------------+----------------+
|Vehicle Body Type|Tickets_Recieved|
+-----------------+----------------+
|             SUBN|         3719802|
|             4DSD|         3082020|
|              VAN|         1411970|
|             DELV|          687330|
|              SDN|          438191|
+-----------------+----------------+
only showing top 5 rows



#### Vehicle Make Ticket Frequency

In [22]:
#Using Dataframes based on vehicle make tickets recieved frequency
ny_analysis_data_updated.groupby(ny_analysis_data_updated["Vehicle Make"]) \
                .agg(count(ny_analysis_data_updated["Summons Number"]).alias("Tickets_Recieved")) \
                .sort(col("Tickets_Recieved"),ascending=False) \
                .show(5)
    
# ny_analysis_data_updated.printSchema()

+------------+----------------+
|Vehicle Make|Tickets_Recieved|
+------------+----------------+
|        FORD|         1280958|
|       TOYOT|         1211451|
|       HONDA|         1079238|
|       NISSA|          918590|
|       CHEVR|          714655|
+------------+----------------+
only showing top 5 rows



In [23]:
# Using SparkSQL based on vehicle Make tickets recieved frequency
spark.sql("select `Vehicle Make`,count(`Summons Number`) as Tickets_Recieved from ny_parking_updated group by `Vehicle Make` order by count(`Summons Number`) desc").show(5);


+------------+----------------+
|Vehicle Make|Tickets_Recieved|
+------------+----------------+
|        FORD|         1280958|
|       TOYOT|         1211451|
|       HONDA|         1079238|
|       NISSA|          918590|
|       CHEVR|          714655|
+------------+----------------+
only showing top 5 rows



### Solution 2 : Above we have both the tickets recieved based on both Vehicle Make and Vehicle body type wherein, the FORD Make has the most number of tickets of 1280958 and the SUBN body type has the most ticket number of 3719802

### : Question 3 Let’s try and find some seasonality in this data:
### Question 3.1: First, divide the year into 4 seasons, and find the frequencies of tickets for each season. (Hints: Use Issue Date to segregate into seasons. You may use a UDF or when-otherwise statement to do so. You have to cast the date format before you can get the month of IssueDate. )

#### Assumptions based on the seasons taken from https://www.nyc.com/visitor_guide/weather_facts.75835/
#### Fall Season          ->  September, October, November (9,10,11)
#### Winter Season     ->  December, January, February (12,1,2)
#### Spring Season     ->  March, April, May (3,4,5)
#### Summer Season  ->  June, July, August (6,7,8)

##### Creating udf for setting seasons and creating new df

In [24]:
## Creating function to return Seasons based on the month passed as input

from pyspark.sql.functions import *
from pyspark.sql.types import *

def seasons(month):
    if month in (9,10,11):
        return "Fall"
    elif month in (12,1,2):
        return "Winter"
    elif month in (3,4,5):
        return "Spring"
    else:
        return "Summer"
        
seasons = udf(seasons, StringType())

ny_analysis_seasonal=ny_analysis_data_updated.withColumn("Seasons",seasons(month(ny_analysis_data_updated["Issue Date"])))



In [25]:
#Using Data frames extracting the frequencies of tickets based on seasons
ny_analysis_seasonal.groupby(ny_analysis_seasonal["Seasons"]) \
                    .agg(count(ny_analysis_seasonal["Summons Number"]).alias("Ticket Frequency")) \
                    .sort(col("Ticket Frequency"),ascending=False) \
                    .show()



+-------+----------------+
|Seasons|Ticket Frequency|
+-------+----------------+
| Spring|         2880687|
|   Fall|         2830802|
| Summer|         2606208|
| Winter|         2485331|
+-------+----------------+



In [26]:
# Using SparkSQL extracting the frequencies of tickets based on seasons

spark.sql('select case \
                    when Month in (9,10,11) then "Fall" \
                    when Month in (12,1,2) then "Winter" \
                    when Month in (3,4,5) then "Spring" \
                    else "Summer" end as Seasons \
                    ,count(`Summons Number`) \
                from \
              (select `Summons Number`,month(`Issue Date`) as Month \
              from ny_parking_updated) \
          group by Seasons \
          order by count(`Summons Number`) desc') \
         .show()

+-------+---------------------+
|Seasons|count(Summons Number)|
+-------+---------------------+
| Spring|              2880687|
|   Fall|              2830802|
| Summer|              2606208|
| Winter|              2485331|
+-------+---------------------+



### Solution 3.1 : The Spring season has the most tickets and winter has the least

### Question 3.2 : Find the three most common violations for each of these seasons. 

In [27]:
# Use Dataframes and list the top 3 violations for each seasons
from pyspark.sql.window import Window
ny_analysis_seasonal.groupby("Seasons","Violation Code") \
                    .agg(count("Summons Number").alias("Ticket Frequency")) \
                    .withColumn("rank",dense_rank().over(Window.partitionBy("Seasons").orderBy(col("Ticket Frequency").desc()))) \
                    .filter("rank <=3") \
                    .show()

+-------+--------------+----------------+----+
|Seasons|Violation Code|Ticket Frequency|rank|
+-------+--------------+----------------+----+
| Spring|            21|          402807|   1|
| Spring|            36|          344834|   2|
| Spring|            38|          271192|   3|
| Summer|            21|          405961|   1|
| Summer|            38|          247561|   2|
| Summer|            36|          240396|   3|
|   Fall|            36|          456046|   1|
|   Fall|            21|          357479|   2|
|   Fall|            38|          283828|   3|
| Winter|            21|          362341|   1|
| Winter|            36|          359338|   2|
| Winter|            38|          259723|   3|
+-------+--------------+----------------+----+



In [28]:
#Use SparkSQL and list the top 3 violations for each seasons

spark.sql(' \
    select * from ( \
        select * ,dense_rank() over (partition by Seasons order by Ticket_Frequency desc) as rank \
        from \
         ( \
            select case \
            when Month in (9,10,11) then "Fall" \
            when Month in (12,1,2) then "Winter" \
            when Month in (3,4,5) then "Spring" \
            else "Summer" end as Seasons \
            ,violation_code\
            ,count(tickets) as Ticket_frequency \
              from \
              (select `Summons Number` tickets,`Violation Code` violation_code,month(`Issue Date`) as Month \
                  from ny_parking_updated ) \
            group by Seasons,violation_code \
            order by count(tickets) desc \
           ) \
    ) where rank <=3 \
          ') \
    .show()

+-------+--------------+----------------+----+
|Seasons|violation_code|Ticket_frequency|rank|
+-------+--------------+----------------+----+
| Spring|            21|          402807|   1|
| Spring|            36|          344834|   2|
| Spring|            38|          271192|   3|
| Summer|            21|          405961|   1|
| Summer|            38|          247561|   2|
| Summer|            36|          240396|   3|
|   Fall|            36|          456046|   1|
|   Fall|            21|          357479|   2|
|   Fall|            38|          283828|   3|
| Winter|            21|          362341|   1|
| Winter|            36|          359338|   2|
| Winter|            38|          259723|   3|
+-------+--------------+----------------+----+



### Solution 3.2 : Violation codes 21,36, 38 are th most occuring violations across the seasons where in violation code 21 is the top most during spring, summer, winter and during fall its violation 36 that tops the chart

### Question 4: The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:

### Question 4.1 : Find the total occurrences of the three most common violation codes.

In [29]:
#Use dataframes to fetch total occurrences of the three most common violation codes. 
ny_analysis_data_updated.groupby("Violation Code") \
                    .agg(count("Summons Number").alias("Ticket Frequecy")) \
                    .sort(col("Ticket Frequecy").desc()) \
                    .show(3)


+--------------+---------------+
|Violation Code|Ticket Frequecy|
+--------------+---------------+
|            21|        1528588|
|            36|        1400614|
|            38|        1062304|
+--------------+---------------+
only showing top 3 rows



In [30]:
#Use sparkSQL to fetch total occurrences of the three most common violation codes. 

spark.sql("select `Violation Code`,count(`Summons Number`) as `Ticket Frequecy` from ny_parking_updated group by `Violation Code` order by count(`Summons Number`) desc").show(3)

+--------------+---------------+
|Violation Code|Ticket Frequecy|
+--------------+---------------+
|            21|        1528588|
|            36|        1400614|
|            38|        1062304|
+--------------+---------------+
only showing top 3 rows



### Solution 4.1  : Violation codes 21, 36 and 38 are the top 3 most common violation codes

### Expectation 4.2:
### Then, visit the website:
### http://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page
### It lists the fines associated with different violation codes. They’re divided into two categories: one for the highest-density locations in the city and the other for the rest of the city. For the sake of simplicity, take the average of the two.

### Question 4.3 : Using this information, find the total amount collected for each of the three violation codes with the maximum tickets. State the code that has the highest total collection (only based on the top 3 tickets). (Hint: It may be a wise idea to store the fines in a separate column, based on the violation code)

### Utilization based on 4.2 expectation
#### Violation Code Fines based on above question
#### Violation Code 21 -> 65 dollars
#### Violation Code 36 -> 50 dollars
#### Violation Code 38 -> 50 dollars

In [31]:
ny_analysis_fine_df=ny_analysis_data_updated.groupby("Violation Code") \
                    .agg(count("Summons Number").alias("Ticket Frequecy")) \
                    .withColumn("Fine",when(ny_analysis_data_updated["Violation Code"]==21 ,65) \
                                        .when(ny_analysis_data_updated["Violation Code"]==36 ,50) \
                                        .when(ny_analysis_data_updated["Violation Code"]==38 ,50)) \
                    .withColumn("Fine_Amount",col("Ticket Frequecy")*col("Fine")) \
                    .sort(col("Ticket Frequecy").desc()).limit(3)

ny_analysis_fine_df.agg(sum("Fine_Amount")).show()

+----------------+
|sum(Fine_Amount)|
+----------------+
|       222504120|
+----------------+



### Solution 4.3 :  Total amount collected was 222504120

### Question 4.4: Find the top 3 states that have the highest ticket revenue based on the top 3 violation codes alone. (Hint: Use the column 'Registration State'.)

In [32]:
ny_analysis_fineamount_statewise = ny_analysis_data_updated.filter((ny_analysis_data_updated["Violation Code"]==21) | \
                                (ny_analysis_data_updated["Violation Code"]==36) | \
                                (ny_analysis_data_updated["Violation Code"]==38)) \
                    .groupby("Registration State","Violation Code") \
                    .agg(count("Summons Number").alias("Ticket Frequecy")) \
                    .withColumn("Fine",when(ny_analysis_data_updated["Violation Code"]==21 ,65) \
                                        .when(ny_analysis_data_updated["Violation Code"]==36 ,50) \
                                        .when(ny_analysis_data_updated["Violation Code"]==38 ,50)) \
                    .withColumn("Fine_Amount",col("Ticket Frequecy")*col("Fine"))

ny_analysis_fineamount_statewise.groupby("Registration State").agg(sum("Fine_Amount").alias("Total_fine")) \
                                .sort(col("Total_fine").desc()).show(3)


+------------------+----------+
|Registration State|Total_fine|
+------------------+----------+
|                NY| 177078880|
|                NJ|  14753770|
|                PA|   6944560|
+------------------+----------+
only showing top 3 rows



### Solution 4.4: NY , NJ and PA pay the most fines when considering only violations of 21, 36 and 38

### Observations (Question 4.5)

#### 1.No Parking, Over speeding or Exceeding Parking Time Limit was the violation that occured most frequently 
#### 2.Total occurence of the violations based on the Seasons did not vary much
#### 3.Revenue collected in NY is way greater on a cumulative scale compared to the other states, actions needed to be taken here to mitigate this 
