# **INITIAL SETUP**

To automate the download of the dataset, we are connnecting our notebook with the Kaggle API through username and API key.

In [1]:
import os
os.environ['KAGGLE_USERNAME'] = "bron91" # username from the json file
os.environ['KAGGLE_KEY'] = "31537cf8741c703bf2fdc0fd193418ed" # key from the json file

Then we use shell commands to download the dataset file/s using the API link to the dataset, which in this case will be downlaoded in zip format. Our dataset consists of NYC parking tickets being issued between 2013 - 2017 along with all the related features recorded during the dataset.

In [2]:
!kaggle datasets download -d new-york-city/nyc-parking-tickets

Downloading nyc-parking-tickets.zip to /content
100% 2.02G/2.02G [00:31<00:00, 90.7MB/s]
100% 2.02G/2.02G [00:31<00:00, 69.6MB/s]


Here, we are trying to have a glance at our directory location where the dataset has been downloaded.

In [3]:
!ls -la

total 2120748
drwxr-xr-x 1 root root       4096 May 16 07:47 .
drwxr-xr-x 1 root root       4096 May 16 07:36 ..
drwxr-xr-x 4 root root       4096 May  6 13:43 .config
-rw-r--r-- 1 root root 2171622722 May 16 07:47 nyc-parking-tickets.zip
drwxr-xr-x 1 root root       4096 May  6 13:44 sample_data


Then we install the unzip package to unzip the zip file downloaded.

In [4]:
!sudo apt-get install unzip

Reading package lists... Done
Building dependency tree       
Reading state information... Done
unzip is already the newest version (6.0-21ubuntu1.1).
The following package was automatically installed and is no longer required:
  libnvidia-common-460
Use 'sudo apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 34 not upgraded.


The dataset zip file is being unzipped to extract the data files. Here the file format extracted is in csv format.

In [5]:
!unzip nyc-parking-tickets.zip

Archive:  nyc-parking-tickets.zip
  inflating: Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv  
  inflating: Parking_Violations_Issued_-_Fiscal_Year_2015.csv  
  inflating: Parking_Violations_Issued_-_Fiscal_Year_2016.csv  
  inflating: Parking_Violations_Issued_-_Fiscal_Year_2017.csv  


Then we remove the zip file as it is not required anymore.

In [6]:
!rm *.zip

Checking to see if the zip file is removed.

In [7]:
!ls -la

total 8761712
drwxr-xr-x 1 root root       4096 May 16 07:48 .
drwxr-xr-x 1 root root       4096 May 16 07:36 ..
drwxr-xr-x 4 root root       4096 May  6 13:43 .config
-rw-r--r-- 1 root root 1869025315 May 10  2020 Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv
-rw-r--r-- 1 root root 2864071408 May 10  2020 Parking_Violations_Issued_-_Fiscal_Year_2015.csv
-rw-r--r-- 1 root root 2151937808 May 10  2020 Parking_Violations_Issued_-_Fiscal_Year_2016.csv
-rw-r--r-- 1 root root 2086913576 May 10  2020 Parking_Violations_Issued_-_Fiscal_Year_2017.csv
drwxr-xr-x 1 root root       4096 May  6 13:44 sample_data


In [8]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [9]:
!wget -q https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz

In [10]:
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

In [11]:
!pip install -q findspark

In [12]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [13]:
import findspark
findspark.init()

In [14]:
findspark.find()

'/content/spark-3.1.1-bin-hadoop3.2'

In [15]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [16]:
spark

# **2013-2014 FISCAL YEAR**

Here, we load the dataset csv file into a dataframe on the cluster by importing all the necessary spark sql functions and then using the "read.csv" function.

In [17]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

df1 = spark.read.csv("Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv", inferSchema=True, header=True)

Through the printSchema function we are trying to get an idea of all the data types for all the columns.

In [18]:
df1.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Coun

We use the describe function to give a summary statistic of the loaded dataset file to help us even more with our exploratory analysis.

In [19]:
#df1.describe().show()

There seems to be 9,100,278 rows and 51 columns.

In [20]:
(df1.count(), len(df1.columns))

(9100278, 51)

Here we try to find the null and missing values in each column.

In [21]:
df1_mis = df1.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df1.columns])
df1_mis.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+--------+---------+---------------+------------------+------------+-------+-------+-------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street Code

We use dropDuplicate function to remove all duplicate rows. There seems to be no duplicate values in the 2013-2014 fiscal year.

In [22]:
df1 = df1.dropDuplicates()
df1.count()

9100278

We remove all the columns having high number null or missing values.

In [23]:
df1_dropped = df1.drop('Time First Observed', 'Intersecting Street', 'Violation Legal Code', 'Unregistered Vehicle?', 'Meter Number', 'No Standing or Stopping Violation', 'Hydrant Violation', 'Double Parking Violation', 'Latitude', 'Longitude', 'Community Board', 'Community Council', 'Census Tract', 'BIN', 'BBL', 'NTA')
(len(df1_dropped.columns))

36

Now we try to answer 5 real-world questions. We remember a student once asked you in the class whether to use DataFrame APIs or Spark SQL in the project and you said to use at least one DataFrame API. Thus, we have used the DataFrame API method of answering our first question.

**QUESTION 1**

When are tickets most likely to be issued? Any seasonality?

To answer this question, we will need to analyze 2 columns, "Issue Date" and "Summons Number". First we convert the "Issue Date" datatype from string to date datatype. We then select the 2 colums and filter the dates in this fiscal year which is from July 1 to June 30 as specified in the Kaggle overview of this dataset. Then, we use groupBy, aggregate and sorting functions to create a DataFrame by counting the "Summons Number" column values as Ticket Frequency and ordering them in descending order.

In [24]:
from datetime import datetime
from pyspark.sql.functions import col, udf
import pyspark.sql.functions as sql_fn

func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())

dfv1 = df1_dropped.withColumn('d_Issue Date', func(col('Issue Date')))

dfv1_select1 = dfv1.filter(dfv1["d_Issue Date"] >= lit('2013-07-01')) \
                    .filter(dfv1["d_Issue Date"] <= lit('2014-06-30')) \
                    .select(dfv1["d_Issue Date"], dfv1["Summons Number"].alias("Ticket Frequency"))
df_agg_sort1 = dfv1_select1.groupBy(dfv1_select1["d_Issue Date"]) \
                            .agg(sql_fn.count("Ticket Frequency").alias("Ticket Frequency")) \
                            .sort(sql_fn.col("Ticket Frequency").desc())
df_agg_sort1.show()

+------------+----------------+
|d_Issue Date|Ticket Frequency|
+------------+----------------+
|  2013-11-29|           46024|
|  2013-10-03|           41359|
|  2013-10-08|           41104|
|  2013-10-01|           40664|
|  2014-03-06|           40640|
|  2014-01-16|           40414|
|  2014-05-06|           40276|
|  2014-01-09|           39989|
|  2014-03-04|           39879|
|  2014-02-25|           39872|
|  2014-05-01|           39289|
|  2013-11-14|           39266|
|  2014-01-17|           38840|
|  2014-03-20|           38807|
|  2014-03-07|           38806|
|  2013-10-04|           38707|
|  2013-10-22|           38700|
|  2013-10-29|           38590|
|  2013-11-19|           38541|
|  2014-05-02|           38490|
+------------+----------------+
only showing top 20 rows



Here we are checking to see datatypes of each column in the newly created dataframe.

In [25]:
df_agg_sort1.printSchema()

root
 |-- d_Issue Date: date (nullable = true)
 |-- Ticket Frequency: long (nullable = false)



We then extract the Year and Month values from the "Issue Date" column into 2 new columns.

In [26]:
from pyspark.sql.functions import year
from pyspark.sql.functions import to_date
from pyspark.sql.functions import col

df_date_Y = df_agg_sort1.withColumn('Year',year(col("d_Issue Date")))
df_date_YM = df_date_Y.withColumn('Month' ,month(col("d_Issue Date")))
df_date_YM.show()

+------------+----------------+----+-----+
|d_Issue Date|Ticket Frequency|Year|Month|
+------------+----------------+----+-----+
|  2013-11-29|           46024|2013|   11|
|  2013-10-03|           41359|2013|   10|
|  2013-10-08|           41104|2013|   10|
|  2013-10-01|           40664|2013|   10|
|  2014-03-06|           40640|2014|    3|
|  2014-01-16|           40414|2014|    1|
|  2014-05-06|           40276|2014|    5|
|  2014-01-09|           39989|2014|    1|
|  2014-03-04|           39879|2014|    3|
|  2014-02-25|           39872|2014|    2|
|  2014-05-01|           39289|2014|    5|
|  2013-11-14|           39266|2013|   11|
|  2014-01-17|           38840|2014|    1|
|  2014-03-20|           38807|2014|    3|
|  2014-03-07|           38806|2014|    3|
|  2013-10-04|           38707|2013|   10|
|  2013-10-22|           38700|2013|   10|
|  2013-10-29|           38590|2013|   10|
|  2013-11-19|           38541|2013|   11|
|  2014-05-02|           38490|2014|    5|
+----------

Now we add another column by specifying the season category for each month and assign them the values. For NYC, March to May (3 to 5) is considered Spring, June to August (6 to 8) Summer, September to November (9 to 11) Fall and Decemebr to February (12 to 2) Winter.

In [27]:
df_seasons = df_date_YM.withColumn("Seasons", \
                                     when((df_date_YM['Month']==3) | (df_date_YM['Month']==4) | (df_date_YM['Month']==5), lit("Spring")) 
                                     .when((df_date_YM['Month']==6) | (df_date_YM['Month']==7) | (df_date_YM['Month']==8), lit("Summer")) 
                                     .when((df_date_YM['Month']==9) | (df_date_YM['Month']==10) | (df_date_YM['Month']==11), lit("Fall"))
                                     .when((df_date_YM['Month']==12) | (df_date_YM['Month']==1) | (df_date_YM['Month']==2), lit("Winter"))
                                     .otherwise(lit("Faulty")))
df_seasons.show(10)

+------------+----------------+----+-----+-------+
|d_Issue Date|Ticket Frequency|Year|Month|Seasons|
+------------+----------------+----+-----+-------+
|  2013-11-29|           46024|2013|   11|   Fall|
|  2013-10-03|           41359|2013|   10|   Fall|
|  2013-10-08|           41104|2013|   10|   Fall|
|  2013-10-01|           40664|2013|   10|   Fall|
|  2014-03-06|           40640|2014|    3| Spring|
|  2014-01-16|           40414|2014|    1| Winter|
|  2014-05-06|           40276|2014|    5| Spring|
|  2014-01-09|           39989|2014|    1| Winter|
|  2014-03-04|           39879|2014|    3| Spring|
|  2014-02-25|           39872|2014|    2| Winter|
+------------+----------------+----+-----+-------+
only showing top 10 rows



Then we create a new dataframe and consolidate the tickets by seasons. 

For the fiscal year 2013-2014, Spring has the highest number of tickets issued.

In [28]:
import pyspark.sql.functions as sql_fn
from pyspark.sql.functions import col

dfv1_seasons = df_seasons.select(sql_fn.col('Seasons'), sql_fn.col("Ticket Frequency")) \
                          .groupBy(sql_fn.col("Seasons")) \
                          .agg(sql_fn.sum(sql_fn.col("Ticket Frequency")).alias("Tickets by Seasons")) \
                          .sort(sql_fn.col("Tickets by Seasons").desc())

dfv1_seasons.show()

+-------+------------------+
|Seasons|Tickets by Seasons|
+-------+------------------+
| Spring|           2663265|
|   Fall|           2627552|
| Winter|           2107796|
| Summer|           1688708|
+-------+------------------+



Here, we are replacing the empty strings in column names to an underscore ("_") as for passing sql queries the naming convention is to avoid empty strings or spaces. We then display top 5 rows to inspect.

In [29]:
df1_ns= df1_dropped.toDF(*(c.replace(' ', '_') for c in df1_dropped.columns))
df1_ns.show(5)

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+----------------+---------------------------------+------------+------------------+-------------------+-----------+------------+--------------------------+--------------------+------------------+-------------+------------+--------------+-------------------+---------------------+------------------+
|Summons_Number|Plate_ID|Registration_State|Plate_Type|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Issuing_Agency|Street_Code1|Street_Code2|Street_Code3|Vehicle_Expiration_Date|Violation_Location|Violation_Precinct|Issuer_Precinct|Issuer_Code|Issuer_Command|Issuer_Squad|Violation_Time|Violation_County|Violation_In_Front_Of_Or_Opposite|House_Number|       Street_Name|Date_First_Observed|

We now create a temporary table using the dataframe so that we can query data from it for further analysis.

In [30]:
df1_ns.createOrReplaceTempView("tickets")

tbl_output = spark.sql("""
  SELECT * FROM tickets LIMIT 5
  """)

tbl_output.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+----------------+---------------------------------+------------+------------------+-------------------+-----------+------------+--------------------------+--------------------+------------------+-------------+------------+--------------+-------------------+---------------------+------------------+
|Summons_Number|Plate_ID|Registration_State|Plate_Type|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Issuing_Agency|Street_Code1|Street_Code2|Street_Code3|Vehicle_Expiration_Date|Violation_Location|Violation_Precinct|Issuer_Precinct|Issuer_Code|Issuer_Command|Issuer_Squad|Violation_Time|Violation_County|Violation_In_Front_Of_Or_Opposite|House_Number|       Street_Name|Date_First_Observed|

**QUESTION 2**

Out of all the vehicles issued tickets in this fiscal year, which states were they most registered to?

To answer this question, we wrote an sql query to select Registration_State and Summons_Number colummns and add their column values to a dataframe where all the values of Registration_State belonged to the list of all the US states and the dataframe was in descending order with respect to the count of Summons_Number values as TicketFrequency.

Most vehicles ticketed were registered to New York state.

In [31]:
tickets_by_registration_state = spark.sql("""
  SELECT Registration_State, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Registration_State IN ('AL', 'AK', 'AS', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'DC', 'FM', 'FL', 'GA', 'GU', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MH', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'MP', 'OH', 'OK', 'OR', 'PW', 'PA', 'PR', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VI', 'VA', 'WA', 'WV', 'WI', 'WY')
  AND Registration_State is not null
  GROUP BY Registration_State
  ORDER BY count(Summons_Number) desc
""")

tickets_by_registration_state.collect()

[Row(Registration_State='NY', TicketFrequency=7029804),
 Row(Registration_State='NJ', TicketFrequency=878677),
 Row(Registration_State='PA', TicketFrequency=225760),
 Row(Registration_State='CT', TicketFrequency=136973),
 Row(Registration_State='FL', TicketFrequency=111887),
 Row(Registration_State='MA', TicketFrequency=78650),
 Row(Registration_State='VA', TicketFrequency=60951),
 Row(Registration_State='MD', TicketFrequency=50407),
 Row(Registration_State='IN', TicketFrequency=49126),
 Row(Registration_State='NC', TicketFrequency=47117),
 Row(Registration_State='IL', TicketFrequency=31763),
 Row(Registration_State='GA', TicketFrequency=30837),
 Row(Registration_State='AZ', TicketFrequency=24245),
 Row(Registration_State='TX', TicketFrequency=24092),
 Row(Registration_State='OH', TicketFrequency=21995),
 Row(Registration_State='CA', TicketFrequency=20100),
 Row(Registration_State='OK', TicketFrequency=19688),
 Row(Registration_State='SC', TicketFrequency=19529),
 Row(Registration_Stat

**QUESTION 3**

Which color vehicles were the most issued tickets for the given fiscal year?

To answer this question, we wrote a sql query to select 2 columns to create a DataFrame, Vehicle_Color and the count of Summons_Number as TicketFrequency in its descending order. We found the Vehicle_Color to have high cardinality where the different color labels were used to denote the same color.

In [32]:
tickets_by_vehicle_color = spark.sql("""
  SELECT Vehicle_Color, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Vehicle_Color is not null
  GROUP BY Vehicle_Color
  ORDER BY count(Summons_Number) desc
""")

tickets_by_vehicle_color.show(100)

+-------------+---------------+
|Vehicle_Color|TicketFrequency|
+-------------+---------------+
|        WHITE|        1349234|
|           GY|        1214514|
|           WH|        1192996|
|           BK|         941231|
|        BLACK|         665519|
|           BL|         442464|
|         GREY|         417286|
|        SILVE|         313929|
|         BLUE|         301353|
|           RD|         272836|
|        BROWN|         271122|
|          RED|         265215|
|           GR|         201825|
|        GREEN|         151007|
|           TN|         132612|
|        OTHER|          91436|
|           BR|          71921|
|          TAN|          70600|
|           YW|          70001|
|          BLK|          69288|
|         GRAY|          60667|
|           GL|          53681|
|         GOLD|          51034|
|        YELLO|          43884|
|          WHT|          38530|
|           MR|          30991|
|          GRY|          29071|
|           WT|          14703|
|       

In order to tackle this cardinality, we closely examined the first 100 columns and picked up the color categories as well as theor different labels as closely as possible and then replaced the values with the full color names using when and lit. We did however included a feature to replace all other values which didn't fit our conditions with the value "OTHER". The output showed that due to high cardinality a big chunk of labels couldn't satisfy our conditions and were labeled "OTHER". 

If we choose to ignore "OTHER", black turned out to be the color of the most ticketed vehicles for this fiscal year. 

In [33]:
tickets_by_vehicle_color_coding = tickets_by_vehicle_color.withColumn("Vehicle_Color", \
                                      when((tickets_by_vehicle_color['Vehicle_Color']=='RD') | (tickets_by_vehicle_color['Vehicle_Color']=='RE') , lit("RED")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='BUE') | (tickets_by_vehicle_color['Vehicle_Color']=='BLU') | (tickets_by_vehicle_color['Vehicle_Color']=='BU') | (tickets_by_vehicle_color['Vehicle_Color']=='BE'), lit("BLUE")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='GR') | (tickets_by_vehicle_color['Vehicle_Color']=='GN') | (tickets_by_vehicle_color['Vehicle_Color']=='GRN') | (tickets_by_vehicle_color['Vehicle_Color']=='GRE') | (tickets_by_vehicle_color['Vehicle_Color']=='GREE'), lit("GREEN")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='Y') | (tickets_by_vehicle_color['Vehicle_Color']=='YE') | (tickets_by_vehicle_color['Vehicle_Color']=='YW') | (tickets_by_vehicle_color['Vehicle_Color']=='YELLO') | (tickets_by_vehicle_color['Vehicle_Color']=='YEL') | (tickets_by_vehicle_color['Vehicle_Color']=='YELL'), lit("YELLOW")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='BLA') | (tickets_by_vehicle_color['Vehicle_Color']=='BLK') | (tickets_by_vehicle_color['Vehicle_Color']=='BLAC') | (tickets_by_vehicle_color['Vehicle_Color']=='BK') | (tickets_by_vehicle_color['Vehicle_Color']=='BL'), lit("BLACK")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='W') | (tickets_by_vehicle_color['Vehicle_Color']=='WT') | (tickets_by_vehicle_color['Vehicle_Color']=='WHT') | (tickets_by_vehicle_color['Vehicle_Color']=='WHI') | (tickets_by_vehicle_color['Vehicle_Color']=='WHIT'), lit("WHITE")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='GY') | (tickets_by_vehicle_color['Vehicle_Color']=='GRAY') | (tickets_by_vehicle_color['Vehicle_Color']=='GRA') | (tickets_by_vehicle_color['Vehicle_Color']=='GRY'), lit("GREY")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='T') | (tickets_by_vehicle_color['Vehicle_Color']=='TA') | (tickets_by_vehicle_color['Vehicle_Color']=='TN'), lit("TAN")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='O') | (tickets_by_vehicle_color['Vehicle_Color']=='OR') | (tickets_by_vehicle_color['Vehicle_Color']=='ORA') | (tickets_by_vehicle_color['Vehicle_Color']=='ORAN') | (tickets_by_vehicle_color['Vehicle_Color']=='ORANG'), lit("ORANGE")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='BR') | (tickets_by_vehicle_color['Vehicle_Color']=='BRO') | (tickets_by_vehicle_color['Vehicle_Color']=='BRN') | (tickets_by_vehicle_color['Vehicle_Color']=='BROW') | (tickets_by_vehicle_color['Vehicle_Color']=='BWN') | (tickets_by_vehicle_color['Vehicle_Color']=='BN'), lit("BROWN")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='SI') | (tickets_by_vehicle_color['Vehicle_Color']=='SIL') | (tickets_by_vehicle_color['Vehicle_Color']=='SILV') | (tickets_by_vehicle_color['Vehicle_Color']=='SILVE') | (tickets_by_vehicle_color['Vehicle_Color']=='SR') | (tickets_by_vehicle_color['Vehicle_Color']=='SL') | (tickets_by_vehicle_color['Vehicle_Color']=='SILVR') | (tickets_by_vehicle_color['Vehicle_Color']=='SLVR'), lit("SILVER")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='GL') | (tickets_by_vehicle_color['Vehicle_Color']=='GLD') | (tickets_by_vehicle_color['Vehicle_Color']=='GOL') | (tickets_by_vehicle_color['Vehicle_Color']=='GD'), lit("GOLD")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='MAROO') | (tickets_by_vehicle_color['Vehicle_Color']=='MA') | (tickets_by_vehicle_color['Vehicle_Color']=='MR') | (tickets_by_vehicle_color['Vehicle_Color']=='MO') | (tickets_by_vehicle_color['Vehicle_Color']=='MAR'), lit("MAROON")) \
                                     .otherwise(lit("OTHER")))

tickets_by_vehicle_color_coding.createOrReplaceTempView("colors")

tickets_by_vehicle_color_updated = spark.sql("""
  SELECT Vehicle_Color, sum(TicketFrequency) as TotalTickets
  FROM colors
  WHERE Vehicle_Color is not null
  GROUP BY Vehicle_Color
  ORDER BY sum(TicketFrequency) desc
""")

tickets_by_vehicle_color_updated.show()


+-------------+------------+
|Vehicle_Color|TotalTickets|
+-------------+------------+
|        OTHER|     4931045|
|        BLACK|     1453126|
|         GREY|     1304287|
|       SILVER|      335084|
|          RED|      272869|
|        GREEN|      213617|
|          TAN|      132632|
|       YELLOW|      116062|
|        BROWN|       89606|
|        WHITE|       59541|
|         GOLD|       55408|
|       MAROON|       33196|
|       ORANGE|       24434|
|         BLUE|        6929|
+-------------+------------+



QUESTION 4

In which county were most tickets issued in the given fiscal year?

We query the 2 columns, Violation_County and Summons_Number by counting the later column values into the column TicketFrequency.

NYC has 5 counties, namely New York County (NYC), Kings County (KINGS), Queens County (QUEENS), Richmond County (RICHMOND) and Bronx County (BRONX).

From the output, we find that some values have been mislabelled.

In [34]:
tickets_by_violation_county = spark.sql("""
  SELECT Violation_County, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Violation_County is not null
  GROUP BY Violation_County
  ORDER BY count(Summons_Number) desc
""")


tickets_by_violation_county.collect()

[Row(Violation_County='NY', TicketFrequency=3547196),
 Row(Violation_County='K', TicketFrequency=1979048),
 Row(Violation_County='Q', TicketFrequency=1825974),
 Row(Violation_County='BX', TicketFrequency=943549),
 Row(Violation_County='R', TicketFrequency=99290),
 Row(Violation_County='RICH', TicketFrequency=4),
 Row(Violation_County='BRONX', TicketFrequency=3),
 Row(Violation_County='QUEEN', TicketFrequency=3),
 Row(Violation_County='RC', TicketFrequency=2),
 Row(Violation_County='NYC', TicketFrequency=2),
 Row(Violation_County='KINGS', TicketFrequency=1),
 Row(Violation_County='103', TicketFrequency=1)]

We then check the datatypes of each column.

In [35]:
tickets_by_violation_county.printSchema()

root
 |-- Violation_County: string (nullable = true)
 |-- TicketFrequency: long (nullable = false)



We now consolidate the mislabelled values into 5 properly labeled county names.

From the output, NYC seems to be the county where most tickets were issued in the fiscal year 2013-2014.

In [36]:
tickets_by_violation_county_updated = tickets_by_violation_county.withColumn('Violation_County', 
                                                                  when((tickets_by_violation_county['Violation_County'] =='NY') ,regexp_replace(tickets_by_violation_county['Violation_County'],'NY','NYC')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='103'),regexp_replace(tickets_by_violation_county['Violation_County'],'103','NYC')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='K'),regexp_replace(tickets_by_violation_county['Violation_County'],'K','KINGS')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='BX'),regexp_replace(tickets_by_violation_county['Violation_County'],'BX','BRONX')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='R'),regexp_replace(tickets_by_violation_county['Violation_County'],'R','RICHMOND')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='RICH'),regexp_replace(tickets_by_violation_county['Violation_County'],'RICH','RICHMOND')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='RC'),regexp_replace(tickets_by_violation_county['Violation_County'],'RC','RICHMOND')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='Q'),regexp_replace(tickets_by_violation_county['Violation_County'],'Q','QUEENS')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='QUEEN'),regexp_replace(tickets_by_violation_county['Violation_County'],'QUEEN','QUEENS')) )

tickets_by_violation_county_updated.createOrReplaceTempView("counties")

tickets_by_violation_county_consolidated = spark.sql("""
  SELECT Violation_County, sum(TicketFrequency) as TotalTickets_Counties
  FROM counties
  WHERE Violation_County is not null
  GROUP BY Violation_County
  ORDER BY sum(TicketFrequency) desc
""")


tickets_by_violation_county_consolidated.collect()


[Row(Violation_County='NYC', TotalTickets_Counties=3547197),
 Row(Violation_County='KINGS', TotalTickets_Counties=1979048),
 Row(Violation_County='QUEENS', TotalTickets_Counties=1825977),
 Row(Violation_County='BRONX', TotalTickets_Counties=943549),
 Row(Violation_County='RICHMOND', TotalTickets_Counties=99296)]

**QUESTION 5**

What was the year-of-manufacture for most vehicles ticketed in the given fiscal year?

As done earlier, we use sql query to create dataframe by selecting Vehicle_Year and count of Summons_Number as TicketFrequency. We also have to make sure that all the vehicle years are 2014 or older for this fiscal year as any manufacture years recorded later than 2014 are erroneous.

2013 seems to be year of manufacture for most vehicles ticketed. From the output, it also seems newer vehicles have a disadvantage over older vehicles for getting ticketed more.

In [37]:
tickets_by_vehicle_year = spark.sql("""
  SELECT Vehicle_Year, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Vehicle_Year is not null
  AND Vehicle_Year != '0'
  AND Vehicle_Year <= '2014'
  GROUP BY Vehicle_Year
  ORDER BY count(Summons_Number) desc
""")


tickets_by_vehicle_year.collect()

[Row(Vehicle_Year='2013', TicketFrequency=784427),
 Row(Vehicle_Year='2012', TicketFrequency=647424),
 Row(Vehicle_Year='2011', TicketFrequency=511126),
 Row(Vehicle_Year='2007', TicketFrequency=482137),
 Row(Vehicle_Year='2006', TicketFrequency=417380),
 Row(Vehicle_Year='2008', TicketFrequency=405123),
 Row(Vehicle_Year='2005', TicketFrequency=397236),
 Row(Vehicle_Year='2010', TicketFrequency=380297),
 Row(Vehicle_Year='2004', TicketFrequency=359990),
 Row(Vehicle_Year='2003', TicketFrequency=329742),
 Row(Vehicle_Year='2009', TicketFrequency=319685),
 Row(Vehicle_Year='2002', TicketFrequency=302565),
 Row(Vehicle_Year='2001', TicketFrequency=265094),
 Row(Vehicle_Year='2014', TicketFrequency=238863),
 Row(Vehicle_Year='1999', TicketFrequency=184449),
 Row(Vehicle_Year='1998', TicketFrequency=143733),
 Row(Vehicle_Year='1997', TicketFrequency=128593),
 Row(Vehicle_Year='1995', TicketFrequency=84712),
 Row(Vehicle_Year='1996', TicketFrequency=83098),
 Row(Vehicle_Year='1994', TicketF

**QUESTION 6**

Which issuing agency issued the most number of tickets this fiscal year?

Here we use the same concepts as earlier and query 2 columns, Issuing_Agency and the count of Summons_Number as TicketFrequency. 

By running the query and ordering by TicketFrequency in the descending order, we get that the issuing agency with the label "T" has issued most tickets in this fiscal year.

In [38]:
tickets_by_issuing_agency = spark.sql("""
  SELECT Issuing_Agency, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Issuing_Agency is not null
  GROUP BY Issuing_Agency
  ORDER BY count(Summons_Number) desc
""")


tickets_by_issuing_agency.collect()

[Row(Issuing_Agency='T', TicketFrequency=7258568),
 Row(Issuing_Agency='P', TicketFrequency=962220),
 Row(Issuing_Agency='V', TicketFrequency=654437),
 Row(Issuing_Agency='S', TicketFrequency=109434),
 Row(Issuing_Agency='X', TicketFrequency=106649),
 Row(Issuing_Agency='K', TicketFrequency=6644),
 Row(Issuing_Agency='A', TicketFrequency=1138),
 Row(Issuing_Agency='C', TicketFrequency=262),
 Row(Issuing_Agency='F', TicketFrequency=206),
 Row(Issuing_Agency='O', TicketFrequency=201),
 Row(Issuing_Agency='H', TicketFrequency=139),
 Row(Issuing_Agency='M', TicketFrequency=132),
 Row(Issuing_Agency='D', TicketFrequency=112),
 Row(Issuing_Agency='R', TicketFrequency=68),
 Row(Issuing_Agency='B', TicketFrequency=31),
 Row(Issuing_Agency='E', TicketFrequency=21),
 Row(Issuing_Agency='N', TicketFrequency=16)]

# **2014-2015 FISCAL YEAR**

For the fiscal year of 2014-2015, we use the same concepts and methods as earlier to analyze the dataset.

In [39]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

df2 = spark.read.csv("Parking_Violations_Issued_-_Fiscal_Year_2015.csv", inferSchema=True, header=True)

In [40]:
df2.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: string (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Count

In [41]:
#df2.describe().show()

In [42]:
(df2.count(), len(df2.columns))

(11809233, 51)

In [43]:
df2_mis = df2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df2.columns])
df2_mis.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+--------+---------+---------------+------------------+------------+--------+--------+--------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street C

Unlike last fiscal year, we did seem to remove some duplicate rows.

In [44]:
df2 = df2.dropDuplicates()
df2.count()

10951257

We also confirmed the same by checking for missing values again.

In [45]:
df2_mis = df2.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df2.columns])
df2_mis.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+--------+---------+---------------+------------------+------------+--------+--------+--------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street C

In [46]:
df2_dropped = df2.drop('Time First Observed', 'Intersecting Street', 'Violation Legal Code', 'Unregistered Vehicle?', 'Meter Number', 'No Standing or Stopping Violation', 'Hydrant Violation', 'Double Parking Violation', 'Latitude', 'Longitude', 'Community Board', 'Community Council', 'Census Tract', 'BIN', 'BBL', 'NTA')
(len(df2_dropped.columns))

36

**QUESTION 1**

When are tickets most likely to be issued? Any seasonality?

In [47]:
from datetime import datetime
from pyspark.sql.functions import col, udf
import pyspark.sql.functions as sql_fn

func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())

dfv2 = df2_dropped.withColumn('d_Issue Date', func(col('Issue Date')))

dfv2_select1 = dfv2.filter(dfv2["d_Issue Date"] >= lit('2014-07-01')) \
                    .filter(dfv2["d_Issue Date"] <= lit('2015-06-30')) \
                    .select(dfv2["d_Issue Date"], dfv2["Summons Number"].alias("Ticket Frequency"))
df_agg_sort2 = dfv2_select1.groupBy(dfv2_select1["d_Issue Date"]) \
                            .agg(sql_fn.count("Ticket Frequency").alias("Ticket Frequency")) \
                            .sort(sql_fn.col("Ticket Frequency").desc())
df_agg_sort2.show()

+------------+----------------+
|d_Issue Date|Ticket Frequency|
+------------+----------------+
|  2015-03-12|           46493|
|  2015-03-03|           45820|
|  2014-09-18|           44444|
|  2015-05-21|           44304|
|  2015-02-10|           44253|
|  2014-10-14|           44199|
|  2015-03-09|           43964|
|  2015-05-07|           43788|
|  2015-03-13|           43654|
|  2015-06-05|           43531|
|  2015-03-10|           43494|
|  2015-06-11|           43475|
|  2014-09-12|           43469|
|  2014-10-02|           43331|
|  2015-06-09|           43209|
|  2015-06-16|           43150|
|  2014-09-11|           43057|
|  2014-09-19|           42973|
|  2014-09-09|           42910|
|  2015-06-18|           42899|
+------------+----------------+
only showing top 20 rows



In [48]:
df_agg_sort2.printSchema()

root
 |-- d_Issue Date: date (nullable = true)
 |-- Ticket Frequency: long (nullable = false)



In [49]:
from pyspark.sql.functions import year
from pyspark.sql.functions import to_date
from pyspark.sql.functions import col

df_date_Y = df_agg_sort2.withColumn('Year',year(col("d_Issue Date")))
df_date_YM = df_date_Y.withColumn('Month' ,month(col("d_Issue Date")))
df_date_YM.show()

+------------+----------------+----+-----+
|d_Issue Date|Ticket Frequency|Year|Month|
+------------+----------------+----+-----+
|  2015-03-12|           46493|2015|    3|
|  2015-03-03|           45820|2015|    3|
|  2014-09-18|           44444|2014|    9|
|  2015-05-21|           44304|2015|    5|
|  2015-02-10|           44253|2015|    2|
|  2014-10-14|           44199|2014|   10|
|  2015-03-09|           43964|2015|    3|
|  2015-05-07|           43788|2015|    5|
|  2015-03-13|           43654|2015|    3|
|  2015-06-05|           43531|2015|    6|
|  2015-03-10|           43494|2015|    3|
|  2015-06-11|           43475|2015|    6|
|  2014-09-12|           43469|2014|    9|
|  2014-10-02|           43331|2014|   10|
|  2015-06-09|           43209|2015|    6|
|  2015-06-16|           43150|2015|    6|
|  2014-09-11|           43057|2014|    9|
|  2014-09-19|           42973|2014|    9|
|  2014-09-09|           42910|2014|    9|
|  2015-06-18|           42899|2015|    6|
+----------

In [50]:
df_seasons = df_date_YM.withColumn("Seasons", \
                                     when((df_date_YM['Month']==3) | (df_date_YM['Month']==4) | (df_date_YM['Month']==5), lit("Spring")) 
                                     .when((df_date_YM['Month']==6) | (df_date_YM['Month']==7) | (df_date_YM['Month']==8), lit("Summer")) 
                                     .when((df_date_YM['Month']==9) | (df_date_YM['Month']==10) | (df_date_YM['Month']==11), lit("Fall"))
                                     .when((df_date_YM['Month']==12) | (df_date_YM['Month']==1) | (df_date_YM['Month']==2), lit("Winter"))
                                     .otherwise(lit("Faulty")))
df_seasons.show(10)

+------------+----------------+----+-----+-------+
|d_Issue Date|Ticket Frequency|Year|Month|Seasons|
+------------+----------------+----+-----+-------+
|  2015-03-12|           46493|2015|    3| Spring|
|  2015-03-03|           45820|2015|    3| Spring|
|  2014-09-18|           44444|2014|    9|   Fall|
|  2015-05-21|           44304|2015|    5| Spring|
|  2015-02-10|           44253|2015|    2| Winter|
|  2014-10-14|           44199|2014|   10|   Fall|
|  2015-03-09|           43964|2015|    3| Spring|
|  2015-05-07|           43788|2015|    5| Spring|
|  2015-03-13|           43654|2015|    3| Spring|
|  2015-06-05|           43531|2015|    6| Summer|
+------------+----------------+----+-----+-------+
only showing top 10 rows



Spring season was the season when most tickets were issued.

In [51]:
import pyspark.sql.functions as sql_fn
from pyspark.sql.functions import col

dfv2_seasons = df_seasons.select(sql_fn.col('Seasons'), sql_fn.col("Ticket Frequency")) \
                          .groupBy(sql_fn.col("Seasons")) \
                          .agg(sql_fn.sum(sql_fn.col("Ticket Frequency")).alias("Tickets by Seasons")) \
                          .sort(sql_fn.col("Tickets by Seasons").desc())

dfv2_seasons.show()

+-------+------------------+
|Seasons|Tickets by Seasons|
+-------+------------------+
| Spring|           2860987|
| Summer|           2838306|
|   Fall|           2718502|
| Winter|           2180241|
+-------+------------------+



In [52]:
df2_ns= df2_dropped.toDF(*(c.replace(' ', '_') for c in df2_dropped.columns))
df2_ns.show(5)

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+----------------+---------------------------------+------------+-----------+--------------------+-----------+------------+--------------------------+--------------------+------------------+-------------+------------+--------------+-------------------+---------------------+------------------+
|Summons_Number|Plate_ID|Registration_State|Plate_Type|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Issuing_Agency|Street_Code1|Street_Code2|Street_Code3|Vehicle_Expiration_Date|Violation_Location|Violation_Precinct|Issuer_Precinct|Issuer_Code|Issuer_Command|Issuer_Squad|Violation_Time|Violation_County|Violation_In_Front_Of_Or_Opposite|House_Number|Street_Name| Date_First_Observed|Law_Section|

In [53]:
df2_ns.createOrReplaceTempView("tickets")

tbl_output = spark.sql("""
  SELECT * FROM tickets LIMIT 5
  """)

tbl_output.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+----------------+---------------------------------+------------+-----------+--------------------+-----------+------------+--------------------------+--------------------+------------------+-------------+------------+--------------+-------------------+---------------------+------------------+
|Summons_Number|Plate_ID|Registration_State|Plate_Type|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Issuing_Agency|Street_Code1|Street_Code2|Street_Code3|Vehicle_Expiration_Date|Violation_Location|Violation_Precinct|Issuer_Precinct|Issuer_Code|Issuer_Command|Issuer_Squad|Violation_Time|Violation_County|Violation_In_Front_Of_Or_Opposite|House_Number|Street_Name| Date_First_Observed|Law_Section|

**QUESTION 2**

Out of all the vehicles issued tickets in this fiscal year, which states were they most registered to?

New York state registered vehicles were issued the most tickets for the fiscal year 2014-2015.

In [54]:
tickets_by_registration_state = spark.sql("""
  SELECT Registration_State, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Registration_State IN ('AL', 'AK', 'AS', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'DC', 'FM', 'FL', 'GA', 'GU', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MH', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'MP', 'OH', 'OK', 'OR', 'PW', 'PA', 'PR', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VI', 'VA', 'WA', 'WV', 'WI', 'WY')
  AND Registration_State is not null
  GROUP BY Registration_State
  ORDER BY count(Summons_Number) desc
""")

tickets_by_registration_state.collect()

[Row(Registration_State='NY', TicketFrequency=8533146),
 Row(Registration_State='NJ', TicketFrequency=996951),
 Row(Registration_State='PA', TicketFrequency=271388),
 Row(Registration_State='CT', TicketFrequency=149462),
 Row(Registration_State='FL', TicketFrequency=138892),
 Row(Registration_State='MA', TicketFrequency=94262),
 Row(Registration_State='IN', TicketFrequency=78789),
 Row(Registration_State='VA', TicketFrequency=73361),
 Row(Registration_State='MD', TicketFrequency=59185),
 Row(Registration_State='NC', TicketFrequency=55062),
 Row(Registration_State='IL', TicketFrequency=39827),
 Row(Registration_State='GA', TicketFrequency=36084),
 Row(Registration_State='TX', TicketFrequency=31697),
 Row(Registration_State='AZ', TicketFrequency=28760),
 Row(Registration_State='OH', TicketFrequency=25703),
 Row(Registration_State='ME', TicketFrequency=23170),
 Row(Registration_State='CA', TicketFrequency=23120),
 Row(Registration_State='SC', TicketFrequency=23113),
 Row(Registration_Stat

**QUESTION 3**

Which color vehicles were the most issued tickets for the given fiscal year?

In [55]:
tickets_by_vehicle_color = spark.sql("""
  SELECT Vehicle_Color, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Vehicle_Color is not null
  GROUP BY Vehicle_Color
  ORDER BY count(Summons_Number) desc
""")

tickets_by_vehicle_color.show(100)

+-------------+---------------+
|Vehicle_Color|TicketFrequency|
+-------------+---------------+
|           GY|        1591551|
|           WH|        1540376|
|        WHITE|        1517019|
|           BK|        1257497|
|        BLACK|         714945|
|           BL|         558017|
|         GREY|         435758|
|           RD|         349244|
|        BROWN|         333803|
|        SILVE|         330494|
|         BLUE|         314180|
|          RED|         285486|
|           GR|         240506|
|        GREEN|         156805|
|           TN|         155774|
|        OTHER|         115816|
|           YW|          89019|
|           BR|          88346|
|          BLK|          77253|
|         GRAY|          70808|
|          TAN|          66454|
|           GL|          63803|
|         GOLD|          52203|
|        YELLO|          48808|
|          GRY|          38237|
|           MR|          36286|
|          WHT|          33310|
|           WT|          21160|
|       

Ignoring OTHER as discussed earlier, Black seems to be the color of vehicles getting ticketed the most in 2014-2015 fiscal year.

In [56]:
tickets_by_vehicle_color_coding = tickets_by_vehicle_color.withColumn("Vehicle_Color", \
                                      when((tickets_by_vehicle_color['Vehicle_Color']=='RD') | (tickets_by_vehicle_color['Vehicle_Color']=='RE') , lit("RED")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='BUE') | (tickets_by_vehicle_color['Vehicle_Color']=='BLU') | (tickets_by_vehicle_color['Vehicle_Color']=='BU') | (tickets_by_vehicle_color['Vehicle_Color']=='BE'), lit("BLUE")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='GR') | (tickets_by_vehicle_color['Vehicle_Color']=='GN') | (tickets_by_vehicle_color['Vehicle_Color']=='GRN') | (tickets_by_vehicle_color['Vehicle_Color']=='GRE') | (tickets_by_vehicle_color['Vehicle_Color']=='GREE'), lit("GREEN")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='Y') | (tickets_by_vehicle_color['Vehicle_Color']=='YE') | (tickets_by_vehicle_color['Vehicle_Color']=='YW') | (tickets_by_vehicle_color['Vehicle_Color']=='YELLO') | (tickets_by_vehicle_color['Vehicle_Color']=='YEL') | (tickets_by_vehicle_color['Vehicle_Color']=='YELL'), lit("YELLOW")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='BLA') | (tickets_by_vehicle_color['Vehicle_Color']=='BLK') | (tickets_by_vehicle_color['Vehicle_Color']=='BLAC') | (tickets_by_vehicle_color['Vehicle_Color']=='BK') | (tickets_by_vehicle_color['Vehicle_Color']=='BL'), lit("BLACK")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='W') | (tickets_by_vehicle_color['Vehicle_Color']=='WT') | (tickets_by_vehicle_color['Vehicle_Color']=='WHT') | (tickets_by_vehicle_color['Vehicle_Color']=='WHI') | (tickets_by_vehicle_color['Vehicle_Color']=='WHIT'), lit("WHITE")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='GY') | (tickets_by_vehicle_color['Vehicle_Color']=='GRAY') | (tickets_by_vehicle_color['Vehicle_Color']=='GRA') | (tickets_by_vehicle_color['Vehicle_Color']=='GRY'), lit("GREY")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='T') | (tickets_by_vehicle_color['Vehicle_Color']=='TA') | (tickets_by_vehicle_color['Vehicle_Color']=='TN'), lit("TAN")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='O') | (tickets_by_vehicle_color['Vehicle_Color']=='OR') | (tickets_by_vehicle_color['Vehicle_Color']=='ORA') | (tickets_by_vehicle_color['Vehicle_Color']=='ORAN') | (tickets_by_vehicle_color['Vehicle_Color']=='ORANG'), lit("ORANGE")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='BR') | (tickets_by_vehicle_color['Vehicle_Color']=='BRO') | (tickets_by_vehicle_color['Vehicle_Color']=='BRN') | (tickets_by_vehicle_color['Vehicle_Color']=='BROW') | (tickets_by_vehicle_color['Vehicle_Color']=='BWN') | (tickets_by_vehicle_color['Vehicle_Color']=='BN'), lit("BROWN")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='SI') | (tickets_by_vehicle_color['Vehicle_Color']=='SIL') | (tickets_by_vehicle_color['Vehicle_Color']=='SILV') | (tickets_by_vehicle_color['Vehicle_Color']=='SILVE') | (tickets_by_vehicle_color['Vehicle_Color']=='SR') | (tickets_by_vehicle_color['Vehicle_Color']=='SL') | (tickets_by_vehicle_color['Vehicle_Color']=='SILVR') | (tickets_by_vehicle_color['Vehicle_Color']=='SLVR'), lit("SILVER")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='GL') | (tickets_by_vehicle_color['Vehicle_Color']=='GLD') | (tickets_by_vehicle_color['Vehicle_Color']=='GOL') | (tickets_by_vehicle_color['Vehicle_Color']=='GD'), lit("GOLD")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='MAROO') | (tickets_by_vehicle_color['Vehicle_Color']=='MA') | (tickets_by_vehicle_color['Vehicle_Color']=='MR') | (tickets_by_vehicle_color['Vehicle_Color']=='MO') | (tickets_by_vehicle_color['Vehicle_Color']=='MAR'), lit("MAROON")) \
                                     .otherwise(lit("OTHER")))

tickets_by_vehicle_color_coding.createOrReplaceTempView("colors")

tickets_by_vehicle_color_updated = spark.sql("""
  SELECT Vehicle_Color, sum(TicketFrequency) as TotalTickets
  FROM colors
  WHERE Vehicle_Color is not null
  GROUP BY Vehicle_Color
  ORDER BY sum(TicketFrequency) desc
""")

tickets_by_vehicle_color_updated.show()


+-------------+------------+
|Vehicle_Color|TotalTickets|
+-------------+------------+
|        OTHER|     5663650|
|        BLACK|     1892909|
|         GREY|     1700634|
|       SILVER|      359325|
|          RED|      349279|
|        GREEN|      253607|
|          TAN|      155787|
|       YELLOW|      139656|
|        BROWN|      104713|
|        WHITE|       66472|
|         GOLD|       66270|
|       MAROON|       38843|
|       ORANGE|       30067|
|         BLUE|       10461|
+-------------+------------+



**QUESTION 4**

In which county were most tickets issued in the given fiscal year?

In [57]:
tickets_by_violation_county = spark.sql("""
  SELECT Violation_County, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Violation_County is not null
  GROUP BY Violation_County
  ORDER BY count(Summons_Number) desc
""")


tickets_by_violation_county.collect()

[Row(Violation_County='NY', TicketFrequency=3844967),
 Row(Violation_County='K', TicketFrequency=2222069),
 Row(Violation_County='Q', TicketFrequency=2015864),
 Row(Violation_County='BX', TicketFrequency=1093346),
 Row(Violation_County='R', TicketFrequency=106024),
 Row(Violation_County='KINGS', TicketFrequency=10),
 Row(Violation_County='MAN', TicketFrequency=4),
 Row(Violation_County='QUEEN', TicketFrequency=3),
 Row(Violation_County='NEW Y', TicketFrequency=2),
 Row(Violation_County='NEWY', TicketFrequency=2),
 Row(Violation_County='KING', TicketFrequency=1),
 Row(Violation_County='BRONX', TicketFrequency=1),
 Row(Violation_County='MH', TicketFrequency=1),
 Row(Violation_County='QU', TicketFrequency=1)]

NYC issued the most tickets in 2014-2015.

In [58]:
tickets_by_violation_county_updated = tickets_by_violation_county.withColumn('Violation_County', 
                                                                  when((tickets_by_violation_county['Violation_County'] =='NY') ,regexp_replace(tickets_by_violation_county['Violation_County'],'NY','NYC')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='103'),regexp_replace(tickets_by_violation_county['Violation_County'],'103','NYC')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='K'),regexp_replace(tickets_by_violation_county['Violation_County'],'K','KINGS')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='BX'),regexp_replace(tickets_by_violation_county['Violation_County'],'BX','BRONX')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='R'),regexp_replace(tickets_by_violation_county['Violation_County'],'R','RICHMOND')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='RICH'),regexp_replace(tickets_by_violation_county['Violation_County'],'RICH','RICHMOND')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='RC'),regexp_replace(tickets_by_violation_county['Violation_County'],'RC','RICHMOND')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='Q'),regexp_replace(tickets_by_violation_county['Violation_County'],'Q','QUEENS')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='QUEEN'),regexp_replace(tickets_by_violation_county['Violation_County'],'QUEEN','QUEENS')) )

tickets_by_violation_county_updated.createOrReplaceTempView("counties")

tickets_by_violation_county_consolidated = spark.sql("""
  SELECT Violation_County, sum(TicketFrequency) as TotalTickets_Counties
  FROM counties
  WHERE Violation_County is not null
  GROUP BY Violation_County
  ORDER BY sum(TicketFrequency) desc
""")


tickets_by_violation_county_consolidated.collect()


[Row(Violation_County='NYC', TotalTickets_Counties=3844967),
 Row(Violation_County='KINGS', TotalTickets_Counties=2222069),
 Row(Violation_County='QUEENS', TotalTickets_Counties=2015867),
 Row(Violation_County='BRONX', TotalTickets_Counties=1093346),
 Row(Violation_County='RICHMOND', TotalTickets_Counties=106024)]

**QUESTION 5**

What was the year-of-manufacture for most vehicles ticketed in the given fiscal year?

2013 seems to be year of manufacture for most vehicles ticketed. From the output, it also seems newer vehicles have a disadvantage over older vehicles for getting ticketed more.

In [59]:
tickets_by_vehicle_year = spark.sql("""
  SELECT Vehicle_Year, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Vehicle_Year is not null
  AND Vehicle_Year != '0'
  AND Vehicle_Year <= '2015'
  GROUP BY Vehicle_Year
  ORDER BY count(Summons_Number) desc
""")


tickets_by_vehicle_year.collect()

[Row(Vehicle_Year=2013, TicketFrequency=968588),
 Row(Vehicle_Year=2014, TicketFrequency=883941),
 Row(Vehicle_Year=2012, TicketFrequency=675949),
 Row(Vehicle_Year=2007, TicketFrequency=561403),
 Row(Vehicle_Year=2011, TicketFrequency=524553),
 Row(Vehicle_Year=2006, TicketFrequency=492125),
 Row(Vehicle_Year=2008, TicketFrequency=468234),
 Row(Vehicle_Year=2005, TicketFrequency=459390),
 Row(Vehicle_Year=2010, TicketFrequency=427214),
 Row(Vehicle_Year=2004, TicketFrequency=416556),
 Row(Vehicle_Year=2009, TicketFrequency=369387),
 Row(Vehicle_Year=2003, TicketFrequency=365411),
 Row(Vehicle_Year=2002, TicketFrequency=324329),
 Row(Vehicle_Year=2015, TicketFrequency=308244),
 Row(Vehicle_Year=2001, TicketFrequency=282016),
 Row(Vehicle_Year=1999, TicketFrequency=180873),
 Row(Vehicle_Year=1998, TicketFrequency=142886),
 Row(Vehicle_Year=1997, TicketFrequency=128555),
 Row(Vehicle_Year=1996, TicketFrequency=77114),
 Row(Vehicle_Year=1995, TicketFrequency=73354),
 Row(Vehicle_Year=1994

**QUESTION 6**

Which issuing agency issued the most number of tickets this fiscal year?

The issuing agency with the label "T" has issued most tickets in this fiscal year.

In [60]:
tickets_by_issuing_agency = spark.sql("""
  SELECT Issuing_Agency, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Issuing_Agency is not null
  GROUP BY Issuing_Agency
  ORDER BY count(Summons_Number) desc
""")


tickets_by_issuing_agency.collect()

[Row(Issuing_Agency='T', TicketFrequency=8122285),
 Row(Issuing_Agency='V', TicketFrequency=1619124),
 Row(Issuing_Agency='P', TicketFrequency=899355),
 Row(Issuing_Agency='S', TicketFrequency=181802),
 Row(Issuing_Agency='X', TicketFrequency=117042),
 Row(Issuing_Agency='K', TicketFrequency=7348),
 Row(Issuing_Agency='R', TicketFrequency=784),
 Row(Issuing_Agency='H', TicketFrequency=680),
 Row(Issuing_Agency='A', TicketFrequency=652),
 Row(Issuing_Agency='C', TicketFrequency=527),
 Row(Issuing_Agency='M', TicketFrequency=350),
 Row(Issuing_Agency='F', TicketFrequency=344),
 Row(Issuing_Agency='D', TicketFrequency=300),
 Row(Issuing_Agency='O', TicketFrequency=238),
 Row(Issuing_Agency='B', TicketFrequency=166),
 Row(Issuing_Agency='U', TicketFrequency=132),
 Row(Issuing_Agency='E', TicketFrequency=71),
 Row(Issuing_Agency='N', TicketFrequency=55),
 Row(Issuing_Agency='L', TicketFrequency=2)]

# **2015-2016 FISCAL YEAR**

For the fiscal year of 2015-2016, we use the same concepts and methods as earlier to analyze the dataset.

In [61]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

df3 = spark.read.csv("Parking_Violations_Issued_-_Fiscal_Year_2016.csv", inferSchema=True, header=True)

In [62]:
df3.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Coun

In [63]:
#df3.describe().show()

In [64]:
(df3.count(), len(df3.columns))

(10626899, 51)

In [65]:
df3_mis = df3.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df3.columns])
df3_mis.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+--------+---------+---------------+------------------+------------+--------+--------+--------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street C

In [66]:
df3 = df3.dropDuplicates()
df3.count()

10626899

In [67]:
df3_dropped = df3.drop('Time First Observed', 'Intersecting Street', 'Violation Legal Code', 'Unregistered Vehicle?', 'Meter Number', 'No Standing or Stopping Violation', 'Hydrant Violation', 'Double Parking Violation', 'Latitude', 'Longitude', 'Community Board', 'Community Council', 'Census Tract', 'BIN', 'BBL', 'NTA')
(len(df3_dropped.columns))

36

**QUESTION 1**

When are tickets most likely to be issued? Any seasonality?

In [68]:
from datetime import datetime
from pyspark.sql.functions import col, udf
import pyspark.sql.functions as sql_fn

func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())

dfv3 = df3_dropped.withColumn('d_Issue Date', func(col('Issue Date')))

dfv3_select1 = dfv3.filter(dfv3["d_Issue Date"] >= lit('2015-07-01')) \
                    .filter(dfv3["d_Issue Date"] <= lit('2016-06-30')) \
                    .select(dfv3["d_Issue Date"], dfv3["Summons Number"].alias("Ticket Frequency"))
df_agg_sort3 = dfv3_select1.groupBy(dfv3_select1["d_Issue Date"]) \
                            .agg(sql_fn.count("Ticket Frequency").alias("Ticket Frequency")) \
                            .sort(sql_fn.col("Ticket Frequency").desc())
df_agg_sort3.show()

+------------+----------------+
|d_Issue Date|Ticket Frequency|
+------------+----------------+
|  2015-09-22|           49425|
|  2015-10-16|           48146|
|  2015-10-13|           48055|
|  2015-10-01|           47864|
|  2015-10-08|           47649|
|  2015-09-17|           47496|
|  2015-10-20|           47358|
|  2015-10-27|           46660|
|  2015-09-18|           46617|
|  2015-10-15|           46333|
|  2015-10-22|           46204|
|  2015-10-23|           45836|
|  2015-10-19|           45193|
|  2015-10-07|           45145|
|  2015-09-21|           45132|
|  2016-03-01|           45087|
|  2015-10-29|           45051|
|  2015-11-06|           45000|
|  2015-10-30|           44994|
|  2015-09-11|           44870|
+------------+----------------+
only showing top 20 rows



In [69]:
df_agg_sort3.printSchema()

root
 |-- d_Issue Date: date (nullable = true)
 |-- Ticket Frequency: long (nullable = false)



In [70]:
from pyspark.sql.functions import year
from pyspark.sql.functions import to_date
from pyspark.sql.functions import col

df_date_Y = df_agg_sort3.withColumn('Year',year(col("d_Issue Date")))
df_date_YM = df_date_Y.withColumn('Month' ,month(col("d_Issue Date")))
df_date_YM.show()

+------------+----------------+----+-----+
|d_Issue Date|Ticket Frequency|Year|Month|
+------------+----------------+----+-----+
|  2015-09-22|           49425|2015|    9|
|  2015-10-16|           48146|2015|   10|
|  2015-10-13|           48055|2015|   10|
|  2015-10-01|           47864|2015|   10|
|  2015-10-08|           47649|2015|   10|
|  2015-09-17|           47496|2015|    9|
|  2015-10-20|           47358|2015|   10|
|  2015-10-27|           46660|2015|   10|
|  2015-09-18|           46617|2015|    9|
|  2015-10-15|           46333|2015|   10|
|  2015-10-22|           46204|2015|   10|
|  2015-10-23|           45836|2015|   10|
|  2015-10-19|           45193|2015|   10|
|  2015-10-07|           45145|2015|   10|
|  2015-09-21|           45132|2015|    9|
|  2016-03-01|           45087|2016|    3|
|  2015-10-29|           45051|2015|   10|
|  2015-11-06|           45000|2015|   11|
|  2015-10-30|           44994|2015|   10|
|  2015-09-11|           44870|2015|    9|
+----------

In [71]:
df_seasons = df_date_YM.withColumn("Seasons", \
                                     when((df_date_YM['Month']==3) | (df_date_YM['Month']==4) | (df_date_YM['Month']==5), lit("Spring")) 
                                     .when((df_date_YM['Month']==6) | (df_date_YM['Month']==7) | (df_date_YM['Month']==8), lit("Summer")) 
                                     .when((df_date_YM['Month']==9) | (df_date_YM['Month']==10) | (df_date_YM['Month']==11), lit("Fall"))
                                     .when((df_date_YM['Month']==12) | (df_date_YM['Month']==1) | (df_date_YM['Month']==2), lit("Winter"))
                                     .otherwise(lit("Faulty")))
df_seasons.show(10)

+------------+----------------+----+-----+-------+
|d_Issue Date|Ticket Frequency|Year|Month|Seasons|
+------------+----------------+----+-----+-------+
|  2015-09-22|           49425|2015|    9|   Fall|
|  2015-10-16|           48146|2015|   10|   Fall|
|  2015-10-13|           48055|2015|   10|   Fall|
|  2015-10-01|           47864|2015|   10|   Fall|
|  2015-10-08|           47649|2015|   10|   Fall|
|  2015-09-17|           47496|2015|    9|   Fall|
|  2015-10-20|           47358|2015|   10|   Fall|
|  2015-10-27|           46660|2015|   10|   Fall|
|  2015-09-18|           46617|2015|    9|   Fall|
|  2015-10-15|           46333|2015|   10|   Fall|
+------------+----------------+----+-----+-------+
only showing top 10 rows



Fall season was the season when most tickets were issued.

In [72]:
import pyspark.sql.functions as sql_fn
from pyspark.sql.functions import col

dfv3_seasons = df_seasons.select(sql_fn.col('Seasons'), sql_fn.col("Ticket Frequency")) \
                          .groupBy(sql_fn.col("Seasons")) \
                          .agg(sql_fn.sum(sql_fn.col("Ticket Frequency")).alias("Tickets by Seasons")) \
                          .sort(sql_fn.col("Tickets by Seasons").desc())

dfv3_seasons.show()

+-------+------------------+
|Seasons|Tickets by Seasons|
+-------+------------------+
|   Fall|           2971672|
| Spring|           2789066|
| Winter|           2421620|
| Summer|           2214536|
+-------+------------------+



In [73]:
df3_ns= df3_dropped.toDF(*(c.replace(' ', '_') for c in df3_dropped.columns))
df3_ns.show(5)

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+----------------+---------------------------------+------------+-------------+-------------------+-----------+------------+--------------------------+--------------------+------------------+-------------+------------+--------------+-------------------+---------------------+------------------+
|Summons_Number|Plate_ID|Registration_State|Plate_Type|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Issuing_Agency|Street_Code1|Street_Code2|Street_Code3|Vehicle_Expiration_Date|Violation_Location|Violation_Precinct|Issuer_Precinct|Issuer_Code|Issuer_Command|Issuer_Squad|Violation_Time|Violation_County|Violation_In_Front_Of_Or_Opposite|House_Number|  Street_Name|Date_First_Observed|Law_Sectio

In [74]:
df3_ns.createOrReplaceTempView("tickets")

tbl_output = spark.sql("""
  SELECT * FROM tickets LIMIT 5
  """)

tbl_output.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+----------------+---------------------------------+------------+-------------+-------------------+-----------+------------+--------------------------+--------------------+------------------+-------------+------------+--------------+-------------------+---------------------+------------------+
|Summons_Number|Plate_ID|Registration_State|Plate_Type|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Issuing_Agency|Street_Code1|Street_Code2|Street_Code3|Vehicle_Expiration_Date|Violation_Location|Violation_Precinct|Issuer_Precinct|Issuer_Code|Issuer_Command|Issuer_Squad|Violation_Time|Violation_County|Violation_In_Front_Of_Or_Opposite|House_Number|  Street_Name|Date_First_Observed|Law_Sectio

**QUESTION 2**

Out of all the vehicles issued tickets in this fiscal year, which states were they most registered to?

New York state registered vehicles were issued the most tickets for the fiscal year 2015-2016.

In [75]:
tickets_by_registration_state = spark.sql("""
  SELECT Registration_State, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Registration_State IN ('AL', 'AK', 'AS', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'DC', 'FM', 'FL', 'GA', 'GU', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MH', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'MP', 'OH', 'OK', 'OR', 'PW', 'PA', 'PR', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VI', 'VA', 'WA', 'WV', 'WI', 'WY')
  AND Registration_State is not null
  GROUP BY Registration_State
  ORDER BY count(Summons_Number) desc
""")

tickets_by_registration_state.collect()

[Row(Registration_State='NY', TicketFrequency=8260189),
 Row(Registration_State='NJ', TicketFrequency=968839),
 Row(Registration_State='PA', TicketFrequency=259177),
 Row(Registration_State='CT', TicketFrequency=145153),
 Row(Registration_State='FL', TicketFrequency=138647),
 Row(Registration_State='MA', TicketFrequency=99115),
 Row(Registration_State='IN', TicketFrequency=81141),
 Row(Registration_State='VA', TicketFrequency=75093),
 Row(Registration_State='MD', TicketFrequency=60240),
 Row(Registration_State='NC', TicketFrequency=55629),
 Row(Registration_State='IL', TicketFrequency=37222),
 Row(Registration_State='GA', TicketFrequency=35300),
 Row(Registration_State='TX', TicketFrequency=32635),
 Row(Registration_State='AZ', TicketFrequency=26603),
 Row(Registration_State='ME', TicketFrequency=24277),
 Row(Registration_State='OH', TicketFrequency=23784),
 Row(Registration_State='CA', TicketFrequency=23532),
 Row(Registration_State='OK', TicketFrequency=21782),
 Row(Registration_Stat

**QUESTION 3**

Which color vehicles were the most issued tickets for the given fiscal year?

In [76]:
tickets_by_vehicle_color = spark.sql("""
  SELECT Vehicle_Color, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Vehicle_Color is not null
  GROUP BY Vehicle_Color
  ORDER BY count(Summons_Number) desc
""")

tickets_by_vehicle_color.show(100)

+-------------+---------------+
|Vehicle_Color|TicketFrequency|
+-------------+---------------+
|           GY|        1603940|
|           WH|        1547279|
|        WHITE|        1399108|
|           BK|        1312004|
|        BLACK|         681848|
|           BL|         551057|
|         GREY|         405448|
|           RD|         349860|
|        SILVE|         304719|
|        BROWN|         298121|
|         BLUE|         284579|
|          RED|         257919|
|           GR|         213001|
|           TN|         139866|
|        GREEN|         134586|
|        OTHER|         111617|
|           YW|          88487|
|           BR|          86364|
|          BLK|          77177|
|         GRAY|          67427|
|           GL|          58006|
|          TAN|          55465|
|        YELLO|          50625|
|         GOLD|          45949|
|          GRY|          35539|
|           MR|          34463|
|          WHT|          30702|
|           WT|          23678|
|       

Ignoring OTHER as discussed earlier, Black seems to be the color of vehicles getting ticketed the most in 2014-2015 fiscal year.

In [77]:
tickets_by_vehicle_color_coding = tickets_by_vehicle_color.withColumn("Vehicle_Color", \
                                      when((tickets_by_vehicle_color['Vehicle_Color']=='RD') | (tickets_by_vehicle_color['Vehicle_Color']=='RE') , lit("RED")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='BUE') | (tickets_by_vehicle_color['Vehicle_Color']=='BLU') | (tickets_by_vehicle_color['Vehicle_Color']=='BU') | (tickets_by_vehicle_color['Vehicle_Color']=='BE'), lit("BLUE")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='GR') | (tickets_by_vehicle_color['Vehicle_Color']=='GN') | (tickets_by_vehicle_color['Vehicle_Color']=='GRN') | (tickets_by_vehicle_color['Vehicle_Color']=='GRE') | (tickets_by_vehicle_color['Vehicle_Color']=='GREE'), lit("GREEN")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='Y') | (tickets_by_vehicle_color['Vehicle_Color']=='YE') | (tickets_by_vehicle_color['Vehicle_Color']=='YW') | (tickets_by_vehicle_color['Vehicle_Color']=='YELLO') | (tickets_by_vehicle_color['Vehicle_Color']=='YEL') | (tickets_by_vehicle_color['Vehicle_Color']=='YELL'), lit("YELLOW")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='BLA') | (tickets_by_vehicle_color['Vehicle_Color']=='BLK') | (tickets_by_vehicle_color['Vehicle_Color']=='BLAC') | (tickets_by_vehicle_color['Vehicle_Color']=='BK') | (tickets_by_vehicle_color['Vehicle_Color']=='BL'), lit("BLACK")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='W') | (tickets_by_vehicle_color['Vehicle_Color']=='WT') | (tickets_by_vehicle_color['Vehicle_Color']=='WHT') | (tickets_by_vehicle_color['Vehicle_Color']=='WHI') | (tickets_by_vehicle_color['Vehicle_Color']=='WHIT'), lit("WHITE")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='GY') | (tickets_by_vehicle_color['Vehicle_Color']=='GRAY') | (tickets_by_vehicle_color['Vehicle_Color']=='GRA') | (tickets_by_vehicle_color['Vehicle_Color']=='GRY'), lit("GREY")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='T') | (tickets_by_vehicle_color['Vehicle_Color']=='TA') | (tickets_by_vehicle_color['Vehicle_Color']=='TN'), lit("TAN")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='O') | (tickets_by_vehicle_color['Vehicle_Color']=='OR') | (tickets_by_vehicle_color['Vehicle_Color']=='ORA') | (tickets_by_vehicle_color['Vehicle_Color']=='ORAN') | (tickets_by_vehicle_color['Vehicle_Color']=='ORANG'), lit("ORANGE")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='BR') | (tickets_by_vehicle_color['Vehicle_Color']=='BRO') | (tickets_by_vehicle_color['Vehicle_Color']=='BRN') | (tickets_by_vehicle_color['Vehicle_Color']=='BROW') | (tickets_by_vehicle_color['Vehicle_Color']=='BWN') | (tickets_by_vehicle_color['Vehicle_Color']=='BN'), lit("BROWN")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='SI') | (tickets_by_vehicle_color['Vehicle_Color']=='SIL') | (tickets_by_vehicle_color['Vehicle_Color']=='SILV') | (tickets_by_vehicle_color['Vehicle_Color']=='SILVE') | (tickets_by_vehicle_color['Vehicle_Color']=='SR') | (tickets_by_vehicle_color['Vehicle_Color']=='SL') | (tickets_by_vehicle_color['Vehicle_Color']=='SILVR') | (tickets_by_vehicle_color['Vehicle_Color']=='SLVR'), lit("SILVER")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='GL') | (tickets_by_vehicle_color['Vehicle_Color']=='GLD') | (tickets_by_vehicle_color['Vehicle_Color']=='GOL') | (tickets_by_vehicle_color['Vehicle_Color']=='GD'), lit("GOLD")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='MAROO') | (tickets_by_vehicle_color['Vehicle_Color']=='MA') | (tickets_by_vehicle_color['Vehicle_Color']=='MR') | (tickets_by_vehicle_color['Vehicle_Color']=='MO') | (tickets_by_vehicle_color['Vehicle_Color']=='MAR'), lit("MAROON")) \
                                     .otherwise(lit("OTHER")))

tickets_by_vehicle_color_coding.createOrReplaceTempView("colors")

tickets_by_vehicle_color_updated = spark.sql("""
  SELECT Vehicle_Color, sum(TicketFrequency) as TotalTickets
  FROM colors
  WHERE Vehicle_Color is not null
  GROUP BY Vehicle_Color
  ORDER BY sum(TicketFrequency) desc
""")

tickets_by_vehicle_color_updated.show()


+-------------+------------+
|Vehicle_Color|TotalTickets|
+-------------+------------+
|        OTHER|     5349132|
|        BLACK|     1940317|
|         GREY|     1706938|
|          RED|      349876|
|       SILVER|      336621|
|        GREEN|      224297|
|       YELLOW|      140733|
|          TAN|      139877|
|        BROWN|      102331|
|        WHITE|       68832|
|         GOLD|       60264|
|       MAROON|       37055|
|       ORANGE|       30490|
|         BLUE|       10225|
+-------------+------------+



**QUESTION 4**

In which county were most tickets issued in the given fiscal year?

In [78]:
tickets_by_violation_county = spark.sql("""
  SELECT Violation_County, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Violation_County is not null
  GROUP BY Violation_County
  ORDER BY count(Summons_Number) desc
""")


tickets_by_violation_county.collect()

[Row(Violation_County='NY', TicketFrequency=3539259),
 Row(Violation_County='K', TicketFrequency=2198318),
 Row(Violation_County='Q', TicketFrequency=1836567),
 Row(Violation_County='BX', TicketFrequency=1080118),
 Row(Violation_County='R', TicketFrequency=93811),
 Row(Violation_County='BK', TicketFrequency=82330),
 Row(Violation_County='QN', TicketFrequency=61302),
 Row(Violation_County='ST', TicketFrequency=10401),
 Row(Violation_County='MN', TicketFrequency=9736),
 Row(Violation_County='KINGS', TicketFrequency=8),
 Row(Violation_County='QNS', TicketFrequency=4),
 Row(Violation_County='QUEEN', TicketFrequency=3),
 Row(Violation_County='KING', TicketFrequency=1),
 Row(Violation_County='BRONX', TicketFrequency=1),
 Row(Violation_County='QU', TicketFrequency=1),
 Row(Violation_County='00000', TicketFrequency=1)]


NYC issued the most tickets in 2015-2016.

In [79]:
tickets_by_violation_county_updated = tickets_by_violation_county.withColumn('Violation_County', 
                                                                  when((tickets_by_violation_county['Violation_County'] =='NY') ,regexp_replace(tickets_by_violation_county['Violation_County'],'NY','NYC')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='103'),regexp_replace(tickets_by_violation_county['Violation_County'],'103','NYC')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='K'),regexp_replace(tickets_by_violation_county['Violation_County'],'K','KINGS')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='BX'),regexp_replace(tickets_by_violation_county['Violation_County'],'BX','BRONX')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='R'),regexp_replace(tickets_by_violation_county['Violation_County'],'R','RICHMOND')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='RICH'),regexp_replace(tickets_by_violation_county['Violation_County'],'RICH','RICHMOND')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='RC'),regexp_replace(tickets_by_violation_county['Violation_County'],'RC','RICHMOND')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='Q'),regexp_replace(tickets_by_violation_county['Violation_County'],'Q','QUEENS')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='QUEEN'),regexp_replace(tickets_by_violation_county['Violation_County'],'QUEEN','QUEENS')) )

tickets_by_violation_county_updated.createOrReplaceTempView("counties")

tickets_by_violation_county_consolidated = spark.sql("""
  SELECT Violation_County, sum(TicketFrequency) as TotalTickets_Counties
  FROM counties
  WHERE Violation_County is not null
  GROUP BY Violation_County
  ORDER BY sum(TicketFrequency) desc
""")


tickets_by_violation_county_consolidated.collect()


[Row(Violation_County='NYC', TotalTickets_Counties=3539259),
 Row(Violation_County='KINGS', TotalTickets_Counties=2198318),
 Row(Violation_County='QUEENS', TotalTickets_Counties=1836570),
 Row(Violation_County='BRONX', TotalTickets_Counties=1080118),
 Row(Violation_County='RICHMOND', TotalTickets_Counties=93811)]

**QUESTION 5**

What was the year-of-manufacture for most vehicles ticketed in the given fiscal year?

2015 seems to be year of manufacture for most vehicles ticketed. From the output, it also seems newer vehicles have a disadvantage over older vehicles for getting ticketed more.

In [80]:
tickets_by_vehicle_year = spark.sql("""
  SELECT Vehicle_Year, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Vehicle_Year is not null
  AND Vehicle_Year != '0'
  AND Vehicle_Year <= '2016'
  GROUP BY Vehicle_Year
  ORDER BY count(Summons_Number) desc
""")


tickets_by_vehicle_year.collect()

[Row(Vehicle_Year=2015, TicketFrequency=1019168),
 Row(Vehicle_Year=2014, TicketFrequency=871868),
 Row(Vehicle_Year=2013, TicketFrequency=764576),
 Row(Vehicle_Year=2012, TicketFrequency=537226),
 Row(Vehicle_Year=2007, TicketFrequency=499047),
 Row(Vehicle_Year=2011, TicketFrequency=454452),
 Row(Vehicle_Year=2006, TicketFrequency=452176),
 Row(Vehicle_Year=2008, TicketFrequency=424819),
 Row(Vehicle_Year=2005, TicketFrequency=408521),
 Row(Vehicle_Year=2004, TicketFrequency=373878),
 Row(Vehicle_Year=2010, TicketFrequency=372865),
 Row(Vehicle_Year=2009, TicketFrequency=336425),
 Row(Vehicle_Year=2003, TicketFrequency=314888),
 Row(Vehicle_Year=2002, TicketFrequency=269069),
 Row(Vehicle_Year=2016, TicketFrequency=258734),
 Row(Vehicle_Year=2001, TicketFrequency=230282),
 Row(Vehicle_Year=1999, TicketFrequency=140700),
 Row(Vehicle_Year=1998, TicketFrequency=108599),
 Row(Vehicle_Year=1997, TicketFrequency=97224),
 Row(Vehicle_Year=1996, TicketFrequency=54857),
 Row(Vehicle_Year=199

**QUESTION 6**

Which issuing agency issued the most number of tickets this fiscal year?

The issuing agency with the label "T" has issued most tickets in this fiscal year.

In [81]:
tickets_by_issuing_agency = spark.sql("""
  SELECT Issuing_Agency, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Issuing_Agency is not null
  GROUP BY Issuing_Agency
  ORDER BY count(Summons_Number) desc
""")


tickets_by_issuing_agency.collect()

[Row(Issuing_Agency='T', TicketFrequency=7630736),
 Row(Issuing_Agency='V', TicketFrequency=1858356),
 Row(Issuing_Agency='P', TicketFrequency=812562),
 Row(Issuing_Agency='S', TicketFrequency=250030),
 Row(Issuing_Agency='X', TicketFrequency=63809),
 Row(Issuing_Agency='K', TicketFrequency=7397),
 Row(Issuing_Agency='R', TicketFrequency=656),
 Row(Issuing_Agency='C', TicketFrequency=636),
 Row(Issuing_Agency='H', TicketFrequency=585),
 Row(Issuing_Agency='F', TicketFrequency=560),
 Row(Issuing_Agency='D', TicketFrequency=485),
 Row(Issuing_Agency='M', TicketFrequency=352),
 Row(Issuing_Agency='O', TicketFrequency=196),
 Row(Issuing_Agency='A', TicketFrequency=186),
 Row(Issuing_Agency='B', TicketFrequency=180),
 Row(Issuing_Agency='E', TicketFrequency=96),
 Row(Issuing_Agency='N', TicketFrequency=77)]

# **2016-2017 FISCAL YEAR**

For the fiscal year of 2016-2017, we use the same concepts and methods as earlier to analyze the dataset.

In [82]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

df4 = spark.read.csv("Parking_Violations_Issued_-_Fiscal_Year_2017.csv", inferSchema=True, header=True)

In [83]:
df4.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Coun

In [84]:
#df4.describe().show()

In [85]:
(df4.count(), len(df4.columns))

(10803028, 43)

In [86]:
df4_mis = df4.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df4.columns])
df4_mis.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+---------------------+---------------------------------+-----------------+------------------------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street Code3|Vehicle Expiration Date|Violation Location|Violation Precinct|Issuer Precinct|Issuer Code

In [87]:
df4 = df4.dropDuplicates()
df4.count()

10803028

In [88]:
df4_dropped = df4.drop('Time First Observed', 'Intersecting Street', 'Violation Legal Code', 'Unregistered Vehicle?', 'Meter Number', 'No Standing or Stopping Violation', 'Hydrant Violation', 'Double Parking Violation')
(len(df4_dropped.columns))

35

**QUESTION 1**

When are tickets most likely to be issued? Any seasonality?

In [89]:
from datetime import datetime
from pyspark.sql.functions import col, udf
import pyspark.sql.functions as sql_fn

func =  udf (lambda x: datetime.strptime(x, '%m/%d/%Y'), DateType())

dfv4 = df4_dropped.withColumn('d_Issue Date', func(col('Issue Date')))

dfv4_select1 = dfv4.filter(dfv4["d_Issue Date"] >= lit('2016-07-01')) \
                    .filter(dfv4["d_Issue Date"] <= lit('2017-06-30')) \
                    .select(dfv4["d_Issue Date"], dfv4["Summons Number"].alias("Ticket Frequency"))
df_agg_sort4 = dfv4_select1.groupBy(dfv4_select1["d_Issue Date"]) \
                            .agg(sql_fn.count("Ticket Frequency").alias("Ticket Frequency")) \
                            .sort(sql_fn.col("Ticket Frequency").desc())
df_agg_sort4.show()

+------------+----------------+
|d_Issue Date|Ticket Frequency|
+------------+----------------+
|  2016-09-16|           46860|
|  2016-09-27|           46271|
|  2016-10-07|           45892|
|  2016-10-06|           45870|
|  2016-10-11|           45820|
|  2017-03-02|           45792|
|  2017-03-21|           45792|
|  2017-05-11|           45592|
|  2017-03-23|           45464|
|  2016-09-15|           45444|
|  2016-10-13|           45435|
|  2016-09-29|           45389|
|  2017-05-02|           45139|
|  2016-11-10|           45060|
|  2017-06-02|           45005|
|  2016-10-20|           44911|
|  2017-05-23|           44545|
|  2017-03-09|           44436|
|  2016-11-17|           44274|
|  2017-06-15|           44265|
+------------+----------------+
only showing top 20 rows



In [90]:
df_agg_sort4.printSchema()

root
 |-- d_Issue Date: date (nullable = true)
 |-- Ticket Frequency: long (nullable = false)



In [91]:
from pyspark.sql.functions import year
from pyspark.sql.functions import to_date
from pyspark.sql.functions import col

df_date_Y = df_agg_sort4.withColumn('Year',year(col("d_Issue Date")))
df_date_YM = df_date_Y.withColumn('Month' ,month(col("d_Issue Date")))
df_date_YM.show()

+------------+----------------+----+-----+
|d_Issue Date|Ticket Frequency|Year|Month|
+------------+----------------+----+-----+
|  2016-09-16|           46860|2016|    9|
|  2016-09-27|           46271|2016|    9|
|  2016-10-07|           45892|2016|   10|
|  2016-10-06|           45870|2016|   10|
|  2016-10-11|           45820|2016|   10|
|  2017-03-02|           45792|2017|    3|
|  2017-03-21|           45792|2017|    3|
|  2017-05-11|           45592|2017|    5|
|  2017-03-23|           45464|2017|    3|
|  2016-09-15|           45444|2016|    9|
|  2016-10-13|           45435|2016|   10|
|  2016-09-29|           45389|2016|    9|
|  2017-05-02|           45139|2017|    5|
|  2016-11-10|           45060|2016|   11|
|  2017-06-02|           45005|2017|    6|
|  2016-10-20|           44911|2016|   10|
|  2017-05-23|           44545|2017|    5|
|  2017-03-09|           44436|2017|    3|
|  2016-11-17|           44274|2016|   11|
|  2017-06-15|           44265|2017|    6|
+----------

In [92]:
df_seasons = df_date_YM.withColumn("Seasons", \
                                     when((df_date_YM['Month']==3) | (df_date_YM['Month']==4) | (df_date_YM['Month']==5), lit("Spring")) 
                                     .when((df_date_YM['Month']==6) | (df_date_YM['Month']==7) | (df_date_YM['Month']==8), lit("Summer")) 
                                     .when((df_date_YM['Month']==9) | (df_date_YM['Month']==10) | (df_date_YM['Month']==11), lit("Fall"))
                                     .when((df_date_YM['Month']==12) | (df_date_YM['Month']==1) | (df_date_YM['Month']==2), lit("Winter"))
                                     .otherwise(lit("Faulty")))
df_seasons.show(10)

+------------+----------------+----+-----+-------+
|d_Issue Date|Ticket Frequency|Year|Month|Seasons|
+------------+----------------+----+-----+-------+
|  2016-09-16|           46860|2016|    9|   Fall|
|  2016-09-27|           46271|2016|    9|   Fall|
|  2016-10-07|           45892|2016|   10|   Fall|
|  2016-10-06|           45870|2016|   10|   Fall|
|  2016-10-11|           45820|2016|   10|   Fall|
|  2017-03-21|           45792|2017|    3| Spring|
|  2017-03-02|           45792|2017|    3| Spring|
|  2017-05-11|           45592|2017|    5| Spring|
|  2017-03-23|           45464|2017|    3| Spring|
|  2016-09-15|           45444|2016|    9|   Fall|
+------------+----------------+----+-----+-------+
only showing top 10 rows



Spring season was the season when most tickets were issued.

In [93]:
import pyspark.sql.functions as sql_fn
from pyspark.sql.functions import col

dfv4_seasons = df_seasons.select(sql_fn.col('Seasons'), sql_fn.col("Ticket Frequency")) \
                          .groupBy(sql_fn.col("Seasons")) \
                          .agg(sql_fn.sum(sql_fn.col("Ticket Frequency")).alias("Tickets by Seasons")) \
                          .sort(sql_fn.col("Tickets by Seasons").desc())

dfv4_seasons.show()

+-------+------------------+
|Seasons|Tickets by Seasons|
+-------+------------------+
| Spring|           2873383|
|   Fall|           2829224|
| Winter|           2483036|
| Summer|           2353920|
+-------+------------------+



In [94]:
df4_ns= df4_dropped.toDF(*(c.replace(' ', '_') for c in df4_dropped.columns))
df4_ns.show(5)

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+----------------+---------------------------------+------------+------------+-------------------+-----------+------------+--------------------------+--------------------+------------------+-------------+------------+--------------+-------------------+---------------------+
|Summons_Number|Plate_ID|Registration_State|Plate_Type|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Issuing_Agency|Street_Code1|Street_Code2|Street_Code3|Vehicle_Expiration_Date|Violation_Location|Violation_Precinct|Issuer_Precinct|Issuer_Code|Issuer_Command|Issuer_Squad|Violation_Time|Violation_County|Violation_In_Front_Of_Or_Opposite|House_Number| Street_Name|Date_First_Observed|Law_Section|Sub_Division|Days_P

In [95]:
df4_ns.createOrReplaceTempView("tickets")

tbl_output = spark.sql("""
  SELECT * FROM tickets LIMIT 5
  """)

tbl_output.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+----------------+---------------------------------+------------+------------+-------------------+-----------+------------+--------------------------+--------------------+------------------+-------------+------------+--------------+-------------------+---------------------+
|Summons_Number|Plate_ID|Registration_State|Plate_Type|Issue_Date|Violation_Code|Vehicle_Body_Type|Vehicle_Make|Issuing_Agency|Street_Code1|Street_Code2|Street_Code3|Vehicle_Expiration_Date|Violation_Location|Violation_Precinct|Issuer_Precinct|Issuer_Code|Issuer_Command|Issuer_Squad|Violation_Time|Violation_County|Violation_In_Front_Of_Or_Opposite|House_Number| Street_Name|Date_First_Observed|Law_Section|Sub_Division|Days_P

**QUESTION 2**

Out of all the vehicles issued tickets in this fiscal year, which states were they most registered to?

New York state registered vehicles were issued the most tickets for the fiscal year 2016-2017.

In [96]:
tickets_by_registration_state = spark.sql("""
  SELECT Registration_State, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Registration_State IN ('AL', 'AK', 'AS', 'AZ', 'AR', 'CA', 'CO', 'CT', 'DE', 'DC', 'FM', 'FL', 'GA', 'GU', 'HI', 'ID', 'IL', 'IN', 'IA', 'KS', 'KY', 'LA', 'ME', 'MH', 'MD', 'MA', 'MI', 'MN', 'MS', 'MO', 'MT', 'NE', 'NV', 'NH', 'NJ', 'NM', 'NY', 'NC', 'ND', 'MP', 'OH', 'OK', 'OR', 'PW', 'PA', 'PR', 'RI', 'SC', 'SD', 'TN', 'TX', 'UT', 'VT', 'VI', 'VA', 'WA', 'WV', 'WI', 'WY')
  AND Registration_State is not null
  GROUP BY Registration_State
  ORDER BY count(Summons_Number) desc
""")

tickets_by_registration_state.collect()

[Row(Registration_State='NY', TicketFrequency=8481061),
 Row(Registration_State='NJ', TicketFrequency=925965),
 Row(Registration_State='PA', TicketFrequency=285419),
 Row(Registration_State='FL', TicketFrequency=144556),
 Row(Registration_State='CT', TicketFrequency=141088),
 Row(Registration_State='MA', TicketFrequency=85547),
 Row(Registration_State='IN', TicketFrequency=80749),
 Row(Registration_State='VA', TicketFrequency=72626),
 Row(Registration_State='MD', TicketFrequency=61800),
 Row(Registration_State='NC', TicketFrequency=55806),
 Row(Registration_State='IL', TicketFrequency=37329),
 Row(Registration_State='GA', TicketFrequency=36852),
 Row(Registration_State='TX', TicketFrequency=36516),
 Row(Registration_State='AZ', TicketFrequency=26426),
 Row(Registration_State='OH', TicketFrequency=25302),
 Row(Registration_State='CA', TicketFrequency=24260),
 Row(Registration_State='SC', TicketFrequency=21836),
 Row(Registration_State='ME', TicketFrequency=21574),
 Row(Registration_Stat

**QUESTION 3**

Which color vehicles were the most issued tickets for the given fiscal year?

In [97]:
tickets_by_vehicle_color = spark.sql("""
  SELECT Vehicle_Color, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Vehicle_Color is not null
  GROUP BY Vehicle_Color
  ORDER BY count(Summons_Number) desc
""")

tickets_by_vehicle_color.show(100)

+-------------+---------------+
|Vehicle_Color|TicketFrequency|
+-------------+---------------+
|           GY|        1744316|
|           WH|        1687526|
|           BK|        1506970|
|        WHITE|        1253173|
|        BLACK|         634915|
|           BL|         598679|
|           RD|         385858|
|         GREY|         376283|
|        SILVE|         276164|
|        BROWN|         269748|
|         BLUE|         252482|
|          RED|         228593|
|           GR|         201118|
|           TN|         135133|
|        OTHER|         106853|
|        GREEN|         106653|
|           YW|          89924|
|           BR|          88432|
|          BLK|          83865|
|         GRAY|          69524|
|           GL|          57916|
|        YELLO|          43867|
|          TAN|          43725|
|          GRY|          39396|
|         GOLD|          37989|
|           MR|          36219|
|          WHT|          27156|
|           WT|          25598|
|       

Ignoring OTHER as discussed earlier, Black seems to be the color of vehicles getting ticketed the most in 2016-2017 fiscal year.

In [98]:
tickets_by_vehicle_color_coding = tickets_by_vehicle_color.withColumn("Vehicle_Color", \
                                      when((tickets_by_vehicle_color['Vehicle_Color']=='RD') | (tickets_by_vehicle_color['Vehicle_Color']=='RE') , lit("RED")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='BUE') | (tickets_by_vehicle_color['Vehicle_Color']=='BLU') | (tickets_by_vehicle_color['Vehicle_Color']=='BU') | (tickets_by_vehicle_color['Vehicle_Color']=='BE'), lit("BLUE")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='GR') | (tickets_by_vehicle_color['Vehicle_Color']=='GN') | (tickets_by_vehicle_color['Vehicle_Color']=='GRN') | (tickets_by_vehicle_color['Vehicle_Color']=='GRE') | (tickets_by_vehicle_color['Vehicle_Color']=='GREE'), lit("GREEN")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='Y') | (tickets_by_vehicle_color['Vehicle_Color']=='YE') | (tickets_by_vehicle_color['Vehicle_Color']=='YW') | (tickets_by_vehicle_color['Vehicle_Color']=='YELLO') | (tickets_by_vehicle_color['Vehicle_Color']=='YEL') | (tickets_by_vehicle_color['Vehicle_Color']=='YELL'), lit("YELLOW")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='BLA') | (tickets_by_vehicle_color['Vehicle_Color']=='BLK') | (tickets_by_vehicle_color['Vehicle_Color']=='BLAC') | (tickets_by_vehicle_color['Vehicle_Color']=='BK') | (tickets_by_vehicle_color['Vehicle_Color']=='BL'), lit("BLACK")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='W') | (tickets_by_vehicle_color['Vehicle_Color']=='WT') | (tickets_by_vehicle_color['Vehicle_Color']=='WHT') | (tickets_by_vehicle_color['Vehicle_Color']=='WHI') | (tickets_by_vehicle_color['Vehicle_Color']=='WHIT'), lit("WHITE")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='GY') | (tickets_by_vehicle_color['Vehicle_Color']=='GRAY') | (tickets_by_vehicle_color['Vehicle_Color']=='GRA') | (tickets_by_vehicle_color['Vehicle_Color']=='GRY'), lit("GREY")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='T') | (tickets_by_vehicle_color['Vehicle_Color']=='TA') | (tickets_by_vehicle_color['Vehicle_Color']=='TN'), lit("TAN")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='O') | (tickets_by_vehicle_color['Vehicle_Color']=='OR') | (tickets_by_vehicle_color['Vehicle_Color']=='ORA') | (tickets_by_vehicle_color['Vehicle_Color']=='ORAN') | (tickets_by_vehicle_color['Vehicle_Color']=='ORANG'), lit("ORANGE")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='BR') | (tickets_by_vehicle_color['Vehicle_Color']=='BRO') | (tickets_by_vehicle_color['Vehicle_Color']=='BRN') | (tickets_by_vehicle_color['Vehicle_Color']=='BROW') | (tickets_by_vehicle_color['Vehicle_Color']=='BWN') | (tickets_by_vehicle_color['Vehicle_Color']=='BN'), lit("BROWN")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='SI') | (tickets_by_vehicle_color['Vehicle_Color']=='SIL') | (tickets_by_vehicle_color['Vehicle_Color']=='SILV') | (tickets_by_vehicle_color['Vehicle_Color']=='SILVE') | (tickets_by_vehicle_color['Vehicle_Color']=='SR') | (tickets_by_vehicle_color['Vehicle_Color']=='SL') | (tickets_by_vehicle_color['Vehicle_Color']=='SILVR') | (tickets_by_vehicle_color['Vehicle_Color']=='SLVR'), lit("SILVER")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='GL') | (tickets_by_vehicle_color['Vehicle_Color']=='GLD') | (tickets_by_vehicle_color['Vehicle_Color']=='GOL') | (tickets_by_vehicle_color['Vehicle_Color']=='GD'), lit("GOLD")) \
                                     .when((tickets_by_vehicle_color['Vehicle_Color']=='MAROO') | (tickets_by_vehicle_color['Vehicle_Color']=='MA') | (tickets_by_vehicle_color['Vehicle_Color']=='MR') | (tickets_by_vehicle_color['Vehicle_Color']=='MO') | (tickets_by_vehicle_color['Vehicle_Color']=='MAR'), lit("MAROON")) \
                                     .otherwise(lit("OTHER")))

tickets_by_vehicle_color_coding.createOrReplaceTempView("colors")

tickets_by_vehicle_color_updated = spark.sql("""
  SELECT Vehicle_Color, sum(TicketFrequency) as TotalTickets
  FROM colors
  WHERE Vehicle_Color is not null
  GROUP BY Vehicle_Color
  ORDER BY sum(TicketFrequency) desc
""")

tickets_by_vehicle_color_updated.show()


+-------------+------------+
|Vehicle_Color|TotalTickets|
+-------------+------------+
|        OTHER|     5124342|
|        BLACK|     2189605|
|         GREY|     1853279|
|          RED|      385888|
|       SILVER|      302829|
|        GREEN|      210708|
|          TAN|      135149|
|       YELLOW|      135024|
|        BROWN|      102725|
|        WHITE|       69574|
|         GOLD|       60309|
|       MAROON|       38714|
|       ORANGE|       31289|
|         BLUE|       11261|
+-------------+------------+



**QUESTION 4**

In which county were most tickets issued in the given fiscal year?

In [99]:
tickets_by_violation_county = spark.sql("""
  SELECT Violation_County, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Violation_County is not null
  GROUP BY Violation_County
  ORDER BY count(Summons_Number) desc
""")


tickets_by_violation_county.collect()

[Row(Violation_County='NY', TicketFrequency=3433026),
 Row(Violation_County='K', TicketFrequency=2218843),
 Row(Violation_County='Q', TicketFrequency=1838985),
 Row(Violation_County='BX', TicketFrequency=1362463),
 Row(Violation_County='BK', TicketFrequency=880657),
 Row(Violation_County='QN', TicketFrequency=674245),
 Row(Violation_County='ST', TicketFrequency=137542),
 Row(Violation_County='R', TicketFrequency=121416),
 Row(Violation_County='MN', TicketFrequency=96298),
 Row(Violation_County='KINGS', TicketFrequency=2),
 Row(Violation_County='QNS', TicketFrequency=2),
 Row(Violation_County='BRONX', TicketFrequency=2)]

NYC issued the most tickets in 2016-2017.

In [100]:
tickets_by_violation_county_updated = tickets_by_violation_county.withColumn('Violation_County', 
                                                                  when((tickets_by_violation_county['Violation_County'] =='NY') ,regexp_replace(tickets_by_violation_county['Violation_County'],'NY','NYC')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='103'),regexp_replace(tickets_by_violation_county['Violation_County'],'103','NYC')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='K'),regexp_replace(tickets_by_violation_county['Violation_County'],'K','KINGS')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='BX'),regexp_replace(tickets_by_violation_county['Violation_County'],'BX','BRONX')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='R'),regexp_replace(tickets_by_violation_county['Violation_County'],'R','RICHMOND')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='RICH'),regexp_replace(tickets_by_violation_county['Violation_County'],'RICH','RICHMOND')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='RC'),regexp_replace(tickets_by_violation_county['Violation_County'],'RC','RICHMOND')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='Q'),regexp_replace(tickets_by_violation_county['Violation_County'],'Q','QUEENS')) \
                                                                  .when((tickets_by_violation_county['Violation_County'] =='QUEEN'),regexp_replace(tickets_by_violation_county['Violation_County'],'QUEEN','QUEENS')) )

tickets_by_violation_county_updated.createOrReplaceTempView("counties")

tickets_by_violation_county_consolidated = spark.sql("""
  SELECT Violation_County, sum(TicketFrequency) as TotalTickets_Counties
  FROM counties
  WHERE Violation_County is not null
  GROUP BY Violation_County
  ORDER BY sum(TicketFrequency) desc
""")


tickets_by_violation_county_consolidated.collect()


[Row(Violation_County='NYC', TotalTickets_Counties=3433026),
 Row(Violation_County='KINGS', TotalTickets_Counties=2218843),
 Row(Violation_County='QUEENS', TotalTickets_Counties=1838985),
 Row(Violation_County='BRONX', TotalTickets_Counties=1362463),
 Row(Violation_County='RICHMOND', TotalTickets_Counties=121416)]

**QUESTION 5**

What was the year-of-manufacture for most vehicles ticketed in the given fiscal year?

2015 seems to be year of manufacture for most vehicles ticketed. From the output, it also seems newer vehicles have a disadvantage over older vehicles for getting ticketed more.

In [101]:
tickets_by_vehicle_year = spark.sql("""
  SELECT Vehicle_Year, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Vehicle_Year is not null
  AND Vehicle_Year != '0'
  AND Vehicle_Year <= '2017'
  GROUP BY Vehicle_Year
  ORDER BY count(Summons_Number) desc
""")


tickets_by_vehicle_year.collect()

[Row(Vehicle_Year=2015, TicketFrequency=1082841),
 Row(Vehicle_Year=2016, TicketFrequency=1024243),
 Row(Vehicle_Year=2014, TicketFrequency=719433),
 Row(Vehicle_Year=2013, TicketFrequency=612714),
 Row(Vehicle_Year=2012, TicketFrequency=484171),
 Row(Vehicle_Year=2007, TicketFrequency=462711),
 Row(Vehicle_Year=2011, TicketFrequency=417977),
 Row(Vehicle_Year=2006, TicketFrequency=415179),
 Row(Vehicle_Year=2008, TicketFrequency=400453),
 Row(Vehicle_Year=2005, TicketFrequency=374042),
 Row(Vehicle_Year=2004, TicketFrequency=348800),
 Row(Vehicle_Year=2010, TicketFrequency=342015),
 Row(Vehicle_Year=2009, TicketFrequency=319620),
 Row(Vehicle_Year=2017, TicketFrequency=296778),
 Row(Vehicle_Year=2003, TicketFrequency=287789),
 Row(Vehicle_Year=2002, TicketFrequency=236675),
 Row(Vehicle_Year=2001, TicketFrequency=195769),
 Row(Vehicle_Year=1999, TicketFrequency=113641),
 Row(Vehicle_Year=1998, TicketFrequency=86889),
 Row(Vehicle_Year=1997, TicketFrequency=75753),
 Row(Vehicle_Year=19

**QUESTION 6**

Which issuing agency issued the most number of tickets this fiscal year?

The issuing agency with the label "T" has issued most tickets in this fiscal year.

In [102]:
tickets_by_issuing_agency = spark.sql("""
  SELECT Issuing_Agency, count(Summons_Number) as TicketFrequency
  FROM tickets
  WHERE Issuing_Agency is not null
  GROUP BY Issuing_Agency
  ORDER BY count(Summons_Number) desc
""")


tickets_by_issuing_agency.collect()

[Row(Issuing_Agency='T', TicketFrequency=7613149),
 Row(Issuing_Agency='V', TicketFrequency=2062644),
 Row(Issuing_Agency='P', TicketFrequency=798244),
 Row(Issuing_Agency='S', TicketFrequency=297754),
 Row(Issuing_Agency='X', TicketFrequency=20470),
 Row(Issuing_Agency='K', TicketFrequency=8025),
 Row(Issuing_Agency='C', TicketFrequency=830),
 Row(Issuing_Agency='F', TicketFrequency=548),
 Row(Issuing_Agency='R', TicketFrequency=533),
 Row(Issuing_Agency='E', TicketFrequency=352),
 Row(Issuing_Agency='H', TicketFrequency=257),
 Row(Issuing_Agency='D', TicketFrequency=134),
 Row(Issuing_Agency='M', TicketFrequency=54),
 Row(Issuing_Agency='B', TicketFrequency=27),
 Row(Issuing_Agency='G', TicketFrequency=4),
 Row(Issuing_Agency='O', TicketFrequency=2),
 Row(Issuing_Agency='N', TicketFrequency=1)]

# **CONCLUSION**

Some general conclusions can be drawn by considering the insights gained from each fiscal year.



*   More often than not, Spring is the season when most tickets are issued with Fall being a close second and even superseding Spring in 2015-2016.

*   New York state registered vehicles seem to get issued a ticket the most.

*   Although we had a lot of mislabeled values, Black vehicles have been getting tickets the most.

*   New York County issued the most tickets which is understandable since more of the office buildings are located in Manhattan.

*   There is a consistent trend of newer vehicles getting ticketed over the years.

*   The issuing agency labeled "T" has issued more tickets than any other issuing agency over the years.



# **THE END**