##                              NYC Parking Tickets Data Assignment

In [1]:
#### Import Libraries and Functions
import pyspark.sql.functions as functions
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, when, lit, col, max, regexp_replace, from_unixtime, isnan, expr, substring
from pyspark.sql.types import IntegerType

In [2]:
### Create a PySPark Session
spark = SparkSession \
    .builder \
    .appName("PySpark DataFrame and Sql") \
    .getOrCreate()

In [3]:
## Load Data from CSV to a DataFrame
nycDF = spark.read.format("csv").option("header", "true").load( \
    "/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")

In [4]:
#### Count Number of Rows in the DF.
nycDF.select('Summons Number').count()

10803028

#### Total Rows in the JSON File
There are 10803028 rows in the CSV Provided.

In [5]:
## List First 5 rows
nycDF.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|

#### We can see many rows with 2016 as Issue Date as well we need to remove them
First list the datatype of each column

In [6]:
nycDF.dtypes

[('Summons Number', 'string'),
 ('Plate ID', 'string'),
 ('Registration State', 'string'),
 ('Issue Date', 'string'),
 ('Violation Code', 'string'),
 ('Vehicle Body Type', 'string'),
 ('Vehicle Make', 'string'),
 ('Violation Precinct', 'string'),
 ('Issuer Precinct', 'string'),
 ('Violation Time', 'string')]

In [7]:
### Convert Issue Date Column from String to Date for processing
nycDate = nycDF.withColumn("Issue_Date", (col("Issue Date").cast("date")))
nycDate.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Issue_Date|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|         0143A|2016-07-10|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|         0400P|2016-07-08|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|        FORD|                 0|              0|         0233P|2016-08-23|
|    8478629828| 66623

In [8]:
#### Print the Datatype again to confirm
nycDate.dtypes

[('Summons Number', 'string'),
 ('Plate ID', 'string'),
 ('Registration State', 'string'),
 ('Issue Date', 'string'),
 ('Violation Code', 'string'),
 ('Vehicle Body Type', 'string'),
 ('Vehicle Make', 'string'),
 ('Violation Precinct', 'string'),
 ('Issuer Precinct', 'string'),
 ('Violation Time', 'string'),
 ('Issue_Date', 'date')]

In [9]:
### Print first 5 Rows to review
nycDate.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Issue_Date|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|    5092469481| GZH7067|                NY|2016-07-10|             7|             SUBN|       TOYOT|                 0|              0|         0143A|2016-07-10|
|    5092451658| GZH7067|                NY|2016-07-08|             7|             SUBN|       TOYOT|                 0|              0|         0400P|2016-07-08|
|    4006265037| FZX9232|                NY|2016-08-23|             5|             SUBN|        FORD|                 0|              0|         0233P|2016-08-23|
|    8478629828| 66623

In [10]:
#### Filter Rows between 31st of Dec 2016 and 1st of Jan 2019
nycDF = nycDate.filter((nycDate["Issue_Date"] > "2016-12-31") & (nycDate["Issue_Date"] < "2018-01-01"))
nycDF.show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Issue_Date|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|       MITSU|                14|             14|         1120A|2017-06-14|
|    5096917368| FZD8593|                NY|2017-06-13|             7|             SUBN|       ME/BE|                 0|              0|         0852P|2017-06-13|
|    1407740258| 2513JMG|                NY|2017-01-11|            78|             DELV|       FRUEH|               106|            106|         0015A|2017-01-11|
|    1413656420|T67237

In [11]:
#### Count Number of Rows in the DF.
nycDF.select('Summons Number').count()

5431918

In [12]:
### Just to make sure there are no Duplicates printing out unique Tickets
#nycDF.select("Summons Number").unique().show()
nycDF.select('Summons Number').distinct().count()

5431918

### Ques 1. Find the total number of tickets for the year.
Ans: There are 5431918 Tickets Issues in the year 2017

In [13]:
#### Checking for City with value 99
nycDF.filter(nycDF['Registration State'] == '99').show()

+--------------+----------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|Summons Number|  Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Issue_Date|
+--------------+----------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|    1420029915|   53468JZ|                99|2017-04-27|            21|              VAN|       CHEVR|               108|              0|         1255A|2017-04-27|
|    1411263467|BLANKPLATE|                99|2017-02-13|            74|              SDN|       NISSA|               121|            121|         0951A|2017-02-13|
|    1420388526|   7CH4978|                99|2017-04-10|            99|              SDN|       CHEVR|                44|             44|         1050P|2017-04-10|
|    84741

##### As mentioned in the Problem Statement Registration State 99 doesnt mean anything so replacing that with the State with most number of appearences in the Dataset

In [14]:
### Get the State with Maximum count
maxCountCity = nycDF.select('Registration State').groupBy(nycDF['Registration State']
                                                         ).count().sort(col("count").desc()).collect()[0][0]
maxCountCity

'NY'

In [15]:
## Replace 99 with the State Found from above statement
nycDF = nycDF.withColumn('Registration State', when(nycDF['Registration State'] == '99', lit(maxCountCity)).otherwise(nycDF['Registration State']))

In [16]:
### Checking for rows with Registration State as 99 Again 
nycDF.filter(nycDF['Registration State'] == '99').show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Issue_Date|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+



###### ===================== So no more Registration State with Values as 99 ====================

In [17]:
#### Checking for Null Values
nycDFNull = nycDF.agg(*[functions.count(functions.when(functions.isnull(c), c)).alias(c) for c in nycDF.columns])
nycDFNull.show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Issue_Date|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|             0|       0|                 0|         0|             0|                0|           0|                 0|              0|             0|         0|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+----------+



###### ================================== No Null Values =========================================

### Ques 2. Find out the number of unique states from where the cars that got parking tickets came
Ans: There are 64 Unique states in the Dataset.

In [18]:
### CReating a new DF with Unique Registration State
DistinctState = nycDF.groupby(nycDF["Registration State"]).count()
DistinctState.select('Registration State').distinct().count()

64

In [19]:
#### Renaming Columns
nycDF = nycDF.withColumnRenamed("Summons Number", "Summons_num").withColumnRenamed("Plate ID", "Plate_ID") \
.withColumnRenamed("Registration State", "Registration_State") \
.withColumnRenamed("Issue Date", "Issue_Date") \
.withColumnRenamed("Violation Code", "Violation_Code") \
.withColumnRenamed("Vehicle Body Type", "Vehicle_BodyType") \
.withColumnRenamed("Vehicle Make", "Vehicle_Make") \
.withColumnRenamed("Violation Precinct", "Violation_Prec") \
.withColumnRenamed("Issuer Precinct", "Issuer_Precinct") \
.withColumnRenamed("Violation Time", "Violation_Time") \
.withColumnRenamed("New Date", "New_Date") \

nycDF.show(5)

+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+
|Summons_num|Plate_ID|Registration_State|Issue_Date|Violation_Code|Vehicle_BodyType|Vehicle_Make|Violation_Prec|Issuer_Precinct|Violation_Time|Issue_Date|
+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+
| 8478629828| 66623ME|                NY|2017-06-14|            47|            REFG|       MITSU|            14|             14|         1120A|2017-06-14|
| 5096917368| FZD8593|                NY|2017-06-13|             7|            SUBN|       ME/BE|             0|              0|         0852P|2017-06-13|
| 1407740258| 2513JMG|                NY|2017-01-11|            78|            DELV|       FRUEH|           106|            106|         0015A|2017-01-11|
| 1413656420|T672371C|                NY|2017-02-04|            40|   

### =================================== Aggregation Task ===================================

### Ques. 1 How often does each violation code occur? Display the frequency of the top five violation codes.

In [20]:
### 5 Most Occuring Voilation Codes OverAll
nycDF.select('Violation_Code').groupBy(nycDF['Violation_Code']
                                                         ).count().sort(col("count").desc()).show(5)

+--------------+------+
|Violation_Code| count|
+--------------+------+
|            21|768087|
|            36|662765|
|            38|542079|
|            14|476664|
|            20|319646|
+--------------+------+
only showing top 5 rows



### Ques. 2 How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? 

In [21]:
### Displaying top 5 Vehicle Body Type getting tickets more often
nycDF.select('Vehicle_BodyType').groupBy(nycDF['Vehicle_BodyType']
                                                         ).count().sort(col("count").desc()).show(5)

+----------------+-------+
|Vehicle_BodyType|  count|
+----------------+-------+
|            SUBN|1883954|
|            4DSD|1547312|
|             VAN| 724029|
|            DELV| 358984|
|             SDN| 194197|
+----------------+-------+
only showing top 5 rows



In [22]:
### Displaying top 5 Vehicle Make getting tickets more often
nycDF.select('Vehicle_Make').groupBy(nycDF['Vehicle_Make']
                                                         ).count().sort(col("count").desc()).show(5)

+------------+------+
|Vehicle_Make| count|
+------------+------+
|        FORD|636844|
|       TOYOT|605291|
|       HONDA|538884|
|       NISSA|462017|
|       CHEVR|356032|
+------------+------+
only showing top 5 rows



### Ques. 3 A precinct is a police station that has a certain zone of the city under its command. Find the (5 highest) frequencies of tickets for each of the following:
### --> 'Violation Precinct' (This is the precinct of the zone where the violation occurred). Using this, can you draw any insights for parking violations in any specific areas of the city?
### --> 'Issuer Precinct' (This is the precinct that issued the ticket.) Here, you would have noticed that the dataframe has the'Violating Precinct' or 'Issuing Precinct' as '0'. These are erroneous entries. Hence, you need to provide the records for five correct precincts. 

In [23]:
#### Displaying First 6 Rows for Voilation Precint 
nycDF.select('Violation_Prec').groupBy(nycDF['Violation_Prec']
                                                         ).count().sort(col("count").desc()).show(6)

+--------------+------+
|Violation_Prec| count|
+--------------+------+
|             0|925596|
|            19|274445|
|            14|203553|
|             1|174702|
|            18|169131|
|           114|147444|
+--------------+------+
only showing top 6 rows



In [24]:
#### Displaying First 6 Rows for Issues Precint 
nycDF.select('Issuer_Precinct').groupBy(nycDF['Issuer_Precinct']
                                                         ).count().sort(col("count").desc()).show(6)

+---------------+-------+
|Issuer_Precinct|  count|
+---------------+-------+
|              0|1078406|
|             19| 266961|
|             14| 200495|
|              1| 168740|
|             18| 162994|
|            114| 144054|
+---------------+-------+
only showing top 6 rows



### Ques. 4 Find the violation code frequencies for three precincts that have issued the most number of tickets. Do these precinct zones have an exceptionally high frequency of certain violation codes? Are these codes common across precincts? 

In [25]:
nycDF.filter((nycDF["Issuer_Precinct"] == "19") | (nycDF["Issuer_Precinct"] == "14") | \
             (nycDF["Issuer_Precinct"] == "1")).select('Issuer_Precinct', 'Violation_Code') \
.groupby('Issuer_Precinct', 'Violation_Code').count() \
.sort(col("count").desc()).show()


+---------------+--------------+-----+
|Issuer_Precinct|Violation_Code|count|
+---------------+--------------+-----+
|             19|            46|48445|
|             14|            14|45036|
|              1|            14|38354|
|             19|            38|36386|
|             19|            37|36056|
|             14|            69|30464|
|             19|            14|29797|
|             19|            21|28415|
|             14|            31|22555|
|              1|            16|19081|
|             14|            47|18364|
|              1|            20|15408|
|             19|            20|14629|
|              1|            46|12745|
|             19|            40|11416|
|             14|            42|10027|
|             19|            16| 9926|
|              1|            38| 8535|
|             14|            46| 7679|
|              1|            17| 7526|
+---------------+--------------+-----+
only showing top 20 rows



#### Listing out Frequencies of Voilation Codes for the top three Issuer Precinct 
we can see there are few Voilation code featuring more often than others,

##### Yes some precinct zones have an exceptionally high frequency of certain violation codes and they are common as well.
Here are some of the Voilation Codes:
###### Voilation Code No. 46.
Issuer_Precinct: 1      Count: 12745

Issuer_Precinct: 14     Count: 7679

Issuer_Precinct: 19     Count: 48445

###### Voilation Code No. 14.
Issuer_Precinct: 1      Count:12745

Issuer_Precinct: 14     Count: 45036

Issuer_Precinct: 19     Count: 48445

Also Some other Voilation Codes with high count,
###### Voilation Code 
###### 38
Issuer_Precinct: 19     Count: 36386

Issuer_Precinct: 1      Count: 8535



### Ques. 5 Find out the properties of parking violations across different times of the day:
#### Find a way to deal with missing values, if any.

In [26]:
#### Checking for Null Values
# No Null Values Returned
nycDF.where(nycDF.Violation_Time.isNull()).show()

+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+
|Summons_num|Plate_ID|Registration_State|Issue_Date|Violation_Code|Vehicle_BodyType|Vehicle_Make|Violation_Prec|Issuer_Precinct|Violation_Time|Issue_Date|
+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+
+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+



In [27]:
### Checking for Blank entries
nycDF.where(nycDF.Violation_Time == '').show()

+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+
|Summons_num|Plate_ID|Registration_State|Issue_Date|Violation_Code|Vehicle_BodyType|Vehicle_Make|Violation_Prec|Issuer_Precinct|Violation_Time|Issue_Date|
+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+
+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+



In [28]:
### Checking for Zero Values
nycDF.where(nycDF.Violation_Time == 0).show()

+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+
|Summons_num|Plate_ID|Registration_State|Issue_Date|Violation_Code|Vehicle_BodyType|Vehicle_Make|Violation_Prec|Issuer_Precinct|Violation_Time|Issue_Date|
+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+
+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+



In [29]:
### Although we have already checked for Null values again checking for Same, and we do not find a Null Values in
### in the DataFrame.
nycDFnoDate = nycDF.drop('Issue_Date')
nycDFnoDate.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in nycDFnoDate.columns]).show()

+-----------+--------+------------------+--------------+----------------+------------+--------------+---------------+--------------+
|Summons_num|Plate_ID|Registration_State|Violation_Code|Vehicle_BodyType|Vehicle_Make|Violation_Prec|Issuer_Precinct|Violation_Time|
+-----------+--------+------------------+--------------+----------------+------------+--------------+---------------+--------------+
|          0|       0|                 0|             0|               0|           0|             0|              0|             0|
+-----------+--------+------------------+--------------+----------------+------------+--------------+---------------+--------------+



In [30]:
#### Check for the Format of Voilation Time.
nycDF.select(nycDF.Violation_Time).show()

+--------------+
|Violation_Time|
+--------------+
|         1120A|
|         0852P|
|         0015A|
|         0525A|
|         0256P|
|         1232A|
|         1034A|
|         1021A|
|         0721A|
|         0940A|
|         1223P|
|         1028A|
|         0148A|
|         1206P|
|         0141P|
|         0822A|
|         0820A|
|         1043A|
|         0204P|
|         0853A|
+--------------+
only showing top 20 rows



In [31]:
from pyspark.sql.functions import concat

In [32]:
### Create New Columns for Time and Bucketing of Times
nycDF = nycDF.withColumn("TimeAbbr", substring(col('Violation_Time'), -1, 1))
nycDF = nycDF.withColumn("TimeHour", substring(col('Violation_Time'), 1, 2))
nycDF = nycDF.withColumn("MinHour", substring(col('Violation_Time'), 3, 2))
nycDF = nycDF.withColumn("Violation_Time",expr("substring(Violation_Time, 1, length(Violation_Time)-1)"))
nycDF = nycDF.withColumn('TimeHour', when(nycDF['TimeAbbr'] == 'P', \
                                                   nycDF['TimeHour'] + 12).otherwise( nycDF['TimeHour']))
nycDF = nycDF.withColumn("TimeHour", when((nycDF["TimeHour"] == 24) & (nycDF["TimeAbbr"] == "A"), 0) \
                         .otherwise(nycDF['TimeHour']))
nycDF = nycDF.withColumn("TimeHour", when((nycDF["TimeHour"] == 24) & (nycDF["TimeAbbr"] == "P"), 12) \
                         .otherwise(nycDF['TimeHour']).cast("int"))
nycDF.withColumn("Violation_TimeNew", concat(col("TimeHour"), lit(":"), col("MinHour"))).show()

+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+--------+--------+-------+-----------------+
|Summons_num|Plate_ID|Registration_State|Issue_Date|Violation_Code|Vehicle_BodyType|Vehicle_Make|Violation_Prec|Issuer_Precinct|Violation_Time|Issue_Date|TimeAbbr|TimeHour|MinHour|Violation_TimeNew|
+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+--------+--------+-------+-----------------+
| 8478629828| 66623ME|                NY|2017-06-14|            47|            REFG|       MITSU|            14|             14|          1120|2017-06-14|       A|      11|     20|            11:20|
| 5096917368| FZD8593|                NY|2017-06-13|             7|            SUBN|       ME/BE|             0|              0|          0852|2017-06-13|       P|      20|     52|            20:52|
| 140

In [33]:
#### Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. 
#### For each of these groups, find the three most commonly occurring violations.
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 0, 4, 8, 12, 16, 20, 24],inputCol="TimeHour", outputCol="buckets")
nycDF1 = bucketizer.setHandleInvalid("keep").transform(nycDF)

In [34]:
nycDF1 = nycDF1.withColumn("buckets", (col("buckets").cast("string")))

In [35]:
nycDF1.printSchema()

root
 |-- Summons_num: string (nullable = true)
 |-- Plate_ID: string (nullable = true)
 |-- Registration_State: string (nullable = true)
 |-- Issue_Date: string (nullable = true)
 |-- Violation_Code: string (nullable = true)
 |-- Vehicle_BodyType: string (nullable = true)
 |-- Vehicle_Make: string (nullable = true)
 |-- Violation_Prec: string (nullable = true)
 |-- Issuer_Precinct: string (nullable = true)
 |-- Violation_Time: string (nullable = true)
 |-- Issue_Date: date (nullable = true)
 |-- TimeAbbr: string (nullable = true)
 |-- TimeHour: integer (nullable = true)
 |-- MinHour: string (nullable = true)
 |-- buckets: string (nullable = true)



In [36]:
nycDF1.show(5)

+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+--------+--------+-------+-------+
|Summons_num|Plate_ID|Registration_State|Issue_Date|Violation_Code|Vehicle_BodyType|Vehicle_Make|Violation_Prec|Issuer_Precinct|Violation_Time|Issue_Date|TimeAbbr|TimeHour|MinHour|buckets|
+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+--------+--------+-------+-------+
| 8478629828| 66623ME|                NY|2017-06-14|            47|            REFG|       MITSU|            14|             14|          1120|2017-06-14|       A|      11|     20|    2.0|
| 5096917368| FZD8593|                NY|2017-06-13|             7|            SUBN|       ME/BE|             0|              0|          0852|2017-06-13|       P|      20|     52|    5.0|
| 1407740258| 2513JMG|                NY|2017-01-11|   

### Ques 6. Let’s try and find some seasonality in this data:

First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season. (Hint: Use Issue Date to segregate into seasons.)

Then, find the three most common violations for each of these seasons.
(Hint: You can use an approach similar to the one mentioned in the hint for question 4.)

#### Assumptions base on NYC Weather Facts

###### Fall Season. September, October, November. ...
###### Winter Season. December, January, February. ...
###### Spring Season. March, April, May. ...
###### Summer Season. June, July, August. ...

In [37]:
nycDF.createOrReplaceTempView("nycDF_SQL")

In [38]:
spark.sql("select count(1) from nycDF_SQL").show()

+--------+
|count(1)|
+--------+
| 5431918|
+--------+



In [39]:
spark.sql("select * from nycDF_SQL limit 10").show()

+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+--------+--------+-------+
|Summons_num|Plate_ID|Registration_State|Issue_Date|Violation_Code|Vehicle_BodyType|Vehicle_Make|Violation_Prec|Issuer_Precinct|Violation_Time|Issue_Date|TimeAbbr|TimeHour|MinHour|
+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+--------+--------+-------+
| 8509075645| GZJ9338|                NY|2017-04-11|            20|            SUBN|       HYUND|             1|              1|          1255|2017-04-11|       P|      12|     55|
| 8508901987|   82484|                NY|2017-03-28|            31|            SUBN|       ME/BE|             6|              6|          0544|2017-03-28|       P|      17|     44|
| 8526478370| GXB2290|                NY|2017-06-19|            71|            SUBN|       ACUR

In [41]:
nycDF = nycDF.withColumn("Violation_Time",expr("substring(Violation_Time, 1, length(Violation_Time)-1)"))
nycDF.show(5)

+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+--------+--------+-------+
|Summons_num|Plate_ID|Registration_State|Issue_Date|Violation_Code|Vehicle_BodyType|Vehicle_Make|Violation_Prec|Issuer_Precinct|Violation_Time|Issue_Date|TimeAbbr|TimeHour|MinHour|
+-----------+--------+------------------+----------+--------------+----------------+------------+--------------+---------------+--------------+----------+--------+--------+-------+
| 8478629828| 66623ME|                NY|2017-06-14|            47|            REFG|       MITSU|            14|             14|           112|2017-06-14|       A|      11|     20|
| 5096917368| FZD8593|                NY|2017-06-13|             7|            SUBN|       ME/BE|             0|              0|           085|2017-06-13|       P|      20|     52|
| 1407740258| 2513JMG|                NY|2017-01-11|            78|            DELV|       FRUE

In [42]:
nycDF.columns

['Summons_num',
 'Plate_ID',
 'Registration_State',
 'Issue_Date',
 'Violation_Code',
 'Vehicle_BodyType',
 'Vehicle_Make',
 'Violation_Prec',
 'Issuer_Precinct',
 'Violation_Time',
 'Issue_Date',
 'TimeAbbr',
 'TimeHour',
 'MinHour']

### Ques 7. The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:
*---> Find the total occurrences of the three most common violation codes.

In [43]:
### Listing the Top 3 Voilations.
topViolations = nycDF.select('Violation_Code').groupBy(nycDF['Violation_Code'] 
                                                         ).count().sort(col("count").desc()).limit(3)
topViolations.show()

+--------------+------+
|Violation_Code| count|
+--------------+------+
|            21|768087|
|            36|662765|
|            38|542079|
+--------------+------+



#### The top 3 Voilation codes are 21, 36 and 38, following are the fines for these Parking Tickets Source(https://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page), by averaging out the NY Metropolitan Area and other Areas 

Code: 21, Fine: $55

Code: 36, Fine: $50

Code: 38, Fine: $50


In [44]:
topViolations.dtypes

[('Violation_Code', 'string'), ('count', 'bigint')]

In [45]:
#nycDF = nycDF.withColumn('Registration State', when(nycDF['Registration State'] == '99', 
 #                                                   lit(maxCountCity)).otherwise(nycDF['Registration State']))
topViolations = topViolations.withColumn('Total_Penalty', when(topViolations['Violation_Code'] == '21', topViolations['count'] * 55).otherwise \
                                        (when(topViolations['Violation_Code'] == '36', topViolations['count'] * 50).otherwise \
                                       (when(topViolations['Violation_Code'] == '38', topViolations['count'] * 50))))
topViolations.show()

+--------------+------+-------------+
|Violation_Code| count|Total_Penalty|
+--------------+------+-------------+
|            21|768087|     42244785|
|            36|662765|     33138250|
|            38|542079|     27103950|
+--------------+------+-------------+

