# PROBLEM STATEMENT

#### New York City is a thriving metropolis. Just like most other metros its size, one of the biggest problems its citizens face is parking. The classic combination of a huge number of cars and cramped geography leads to a huge number of parking tickets.
#### In an attempt to scientifically analyse this phenomenon, the NYC Police Department has collected data for parking tickets. Of these, the data files for multiple years are publicly available on Kaggle. We will try and perform some exploratory analysis on #a part of this data. Spark will allow us to analyse the full files at high speeds as opposed to taking a series of random #samples that will approximate the population. For the scope of this analysis, we will analyse the parking tickets over the year 2017. 

In [1]:
# class pyspark.sql.SparkSession, The entry point to programming Spark with the Dataset and DataFrame API.
#A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and 
# read parquet files.To create a SparkSession, use the following builder pattern:

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("NYC Parking Ticket Assignment") \
    .getOrCreate()

In [2]:
# Datafram can be created by by calling read method on spark object

tickets = spark.read.csv("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv", inferSchema=True,header=True)
tickets.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|         

In [3]:
tickets.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: timestamp (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Violation Time: string (nullable = true)



### PART 1: Examine the data

In [4]:
#1.the total number of tickets for the year 2017
from pyspark.sql.functions import count, year

total_ticket_count_2017 = tickets.filter(year("Issue Date")==2017).agg(count("Summons Number"))
total_ticket_count_2017 = total_ticket_count_2017.withColumnRenamed("count(Summons Number)", "Total Count Of Tickets(2017)")
total_ticket_count_2017.show()

+----------------------------+
|Total Count Of Tickets(2017)|
+----------------------------+
|                     5431918|
+----------------------------+



In [5]:
# the number of parking tickets per unique states from where the cars that got parking tickets came.
from pyspark.sql.functions import count, countDistinct, desc, regexp_replace, sum, col

unique_states = tickets.filter(year("Issue Date")==2017).groupby("Registration State")\
                .agg(countDistinct("Summons Number"))
unique_states = unique_states.withColumnRenamed("count(DISTINCT Summons Number)","count")
unique_states.sort(desc("count")).show(100)

+------------------+-------+
|Registration State|  count|
+------------------+-------+
|                NY|4273951|
|                NJ| 475825|
|                PA| 140286|
|                CT|  70403|
|                FL|  69468|
|                IN|  45525|
|                MA|  38941|
|                VA|  34367|
|                MD|  30213|
|                NC|  27152|
|                TX|  18827|
|                IL|  18666|
|                GA|  17537|
|                99|  16055|
|                AZ|  12379|
|                OH|  12281|
|                CA|  12153|
|                ME|  10806|
|                SC|  10395|
|                MN|  10083|
|                OK|   9088|
|                TN|   8514|
|                DE|   7905|
|                MI|   7231|
|                RI|   5814|
|                NH|   4119|
|                VT|   3683|
|                AL|   3178|
|                WA|   3052|
|                OR|   2622|
|                MO|   2483|
|             

In [6]:
# replacing 99 in Registration State with NY
state_replace = unique_states.withColumn("Registration State", regexp_replace("Registration State", "99", "NY"))
state_replace = state_replace.where(col("Registration State") == "NY").groupby("Registration State")\
                        .agg(sum("count").alias("count"))
# total number of cars from NY
state_replace.show(100)

+------------------+-------+
|Registration State|  count|
+------------------+-------+
|                NY|4290006|
+------------------+-------+



In [7]:
temp = unique_states.select("Registration State", "count")\
                    .where((col("Registration State") != "NY") & (col("Registration State") != "99"))\
                    .sort(desc("count"))
# combining dataframes with NY only count and remaining state counts to get the final dataframe
unique_states_count = state_replace.union(temp)
unique_states_count.sort(desc("count")).show(64)

+------------------+-------+
|Registration State|  count|
+------------------+-------+
|                NY|4290006|
|                NJ| 475825|
|                PA| 140286|
|                CT|  70403|
|                FL|  69468|
|                IN|  45525|
|                MA|  38941|
|                VA|  34367|
|                MD|  30213|
|                NC|  27152|
|                TX|  18827|
|                IL|  18666|
|                GA|  17537|
|                AZ|  12379|
|                OH|  12281|
|                CA|  12153|
|                ME|  10806|
|                SC|  10395|
|                MN|  10083|
|                OK|   9088|
|                TN|   8514|
|                DE|   7905|
|                MI|   7231|
|                RI|   5814|
|                NH|   4119|
|                VT|   3683|
|                AL|   3178|
|                WA|   3052|
|                OR|   2622|
|                MO|   2483|
|                ON|   2460|
|             

In [8]:
# the number of unique states from where the cars that got parking tickets came.
num_unique_states = unique_states_count.agg(countDistinct("Registration State").alias("unique_states_count"))
num_unique_states.show()

+-------------------+
|unique_states_count|
+-------------------+
|                 64|
+-------------------+



### PART 1: Examine the data: RESULTS
##### 1. Find the total number of tickets for the year.
##### 5431918

##### 2. Find out the number of unique states from where the cars that got parking tickets came.
##### 64

### Aggregation Tasks

In [9]:
# Converting the dataframe into a spark sql table/view
tickets = tickets.withColumn("Registration State", regexp_replace("Registration State", "99", "NY"))

tickets.cache()
tickets.createOrReplaceTempView("v_tickets")

In [15]:
# 1. How often does each violation code occur? Display the frequency of the top five violation codes.
violation_code_freq = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) AS Frequency FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 GROUP BY `Violation Code` ORDER BY Frequency desc LIMIT 5""") 
violation_code_freq.show()

+--------------+---------+
|Violation Code|Frequency|
+--------------+---------+
|            21|   768087|
|            36|   662765|
|            38|   542079|
|            14|   476664|
|            20|   319646|
+--------------+---------+



In [16]:
#2(a).How often does each 'vehicle body type' get a parking ticket?
parking_tick_by_vehicle_body_type = spark.sql("""SELECT `Vehicle Body Type`, count(`Summons Number`) AS Frequency FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 GROUP BY `Vehicle Body Type` ORDER BY Frequency desc LIMIT 5""")
parking_tick_by_vehicle_body_type.show()

+-----------------+---------+
|Vehicle Body Type|Frequency|
+-----------------+---------+
|             SUBN|  1883954|
|             4DSD|  1547312|
|              VAN|   724029|
|             DELV|   358984|
|              SDN|   194197|
+-----------------+---------+



In [12]:
#2(b).How often does each 'Vehicle Make' get a parking ticket?
parking_tick_by_vehicle_Make = spark.sql("""SELECT `Vehicle Make`, count(*) AS Frequency FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 GROUP BY `Vehicle Make` ORDER BY Frequency desc LIMIT 5""")
parking_tick_by_vehicle_Make.show()

+------------+---------+
|Vehicle Make|Frequency|
+------------+---------+
|        FORD|   636844|
|       TOYOT|   605291|
|       HONDA|   538884|
|       NISSA|   462017|
|       CHEVR|   356032|
+------------+---------+



In [13]:
#3. Find the (5 highest) frequencies of tickets for each of the following:
# Ignoring the entry with Violation & Issue Precinct as 0.
#3(a): 'Violation Precinct'
parking_tick_by_Violation_Precint = spark.sql("""SELECT `Violation Precinct`, count(`Summons Number`) AS Frequency FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 GROUP BY `Violation Precinct` ORDER BY Frequency desc LIMIT 6""")
parking_tick_by_Violation_Precint.show()

#3(b): 'Issuer Precinct'
parking_tick_by_Issuer_Precint = spark.sql("""SELECT `Issuer Precinct`, count(`Summons Number`) AS Frequency FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 GROUP BY `Issuer Precinct` ORDER BY Frequency desc LIMIT 6""")
parking_tick_by_Issuer_Precint.show()

+------------------+---------+
|Violation Precinct|Frequency|
+------------------+---------+
|                 0|   925596|
|                19|   274445|
|                14|   203553|
|                 1|   174702|
|                18|   169131|
|               114|   147444|
+------------------+---------+

+---------------+---------+
|Issuer Precinct|Frequency|
+---------------+---------+
|              0|  1078406|
|             19|   266961|
|             14|   200495|
|              1|   168740|
|             18|   162994|
|            114|   144054|
+---------------+---------+



#### Precinct 19 tops as the highest number of parking tickets for a Violation Precinct. 
#### Precinct 19 also tops as the highest number of parking tickets for a Issuer Precinct.
#### Address for Precint 19 is 153 E 67th St, New York, NY 10065, USA which means there is a higher number of parking violations happening around the 67th Street in NYC i.e. this area has severe problem of parking space.

In [24]:
#4. Find the violation code frequencies for three precincts that have issued the most number of tickets
# violation code frequncies for the top three precincts with highest frequency of tickets

#From above question, we found out that precinct 19, 14 & 1 has issued the most number of parking tickets.
#1. Finding the frequency of violation codes for precinct 19
freq_prcnt_19_VCode = spark.sql("""SELECT `Violation Code`,count(`Violation Code`) AS `Freq of Violation Codes Prnct 19` 
                                FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 AND `Issuer Precinct` = 19 
                                GROUP BY `Violation Code` ORDER BY `Freq of Violation Codes Prnct 19` desc LIMIT 5""")

freq_prcnt_19_VCode.show()

#1. Finding the frequency of violation codes for precinct 19
freq_prcnt_14_VCode = spark.sql("""SELECT `Violation Code`,count(`Violation Code`) AS `Freq of Violation Codes Prnct 14` 
                                FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 AND `Issuer Precinct` = 14 
                                GROUP BY `Violation Code` ORDER BY `Freq of Violation Codes Prnct 14` desc LIMIT 5""")

freq_prcnt_14_VCode.show()

#1. Finding the frequency of violation codes for precinct 19
freq_prcnt_1_VCode = spark.sql("""SELECT `Violation Code`,count(`Violation Code`) AS `Freq of Violation Codes Prnct 1` 
                                FROM v_tickets WHERE YEAR(`ISSUE DATE`) = 2017 AND `Issuer Precinct` = 1
                                GROUP BY `Violation Code` ORDER BY `Freq of Violation Codes Prnct 1` desc LIMIT 5""")

freq_prcnt_1_VCode.show()

+--------------+--------------------------------+
|Violation Code|Freq of Violation Codes Prnct 19|
+--------------+--------------------------------+
|            46|                           48445|
|            38|                           36386|
|            37|                           36056|
|            14|                           29797|
|            21|                           28415|
+--------------+--------------------------------+

+--------------+--------------------------------+
|Violation Code|Freq of Violation Codes Prnct 14|
+--------------+--------------------------------+
|            14|                           45036|
|            69|                           30464|
|            31|                           22555|
|            47|                           18364|
|            42|                           10027|
+--------------+--------------------------------+

+--------------+-------------------------------+
|Violation Code|Freq of Violation Codes Prnct 1|


In [158]:
#5(a) Find a way to deal with missing values
from pyspark.sql.functions import isnan, isnull

# Filtering Record for year 2017 only
tickets_2017 = tickets.filter(year("Issue Date")==2017)

# Checking null or nan values in the Violation Time field
null_count = tickets_2017.where(col("Violation Time") == "null").count()
print("Count when column is null", null_count)

nan_count = tickets_2017.where(col("Violation Time") == "nan").count()
print("Count when column has 'nan' value", nan_count)

#Since, the count of nan values is quite low, so we are safe to drop the rows with nan values 
#as it won't affect our results much.
print("+++++++++++AFTER CLEANING THE DATA++++++++++++++")
tickets_2017_filtered = tickets_2017.filter(col("Violation Time") != "nan")
print("Count when column has 'nan':", tickets_2017_filtered.filter(col("Violation Time") == "nan").count())

Count when column is null 0
Count when column has 'nan' value 16
+++++++++++AFTER CLEANING THE DATA++++++++++++++
Count when column has 'nan': 0


In [159]:
#5(b)The Violation Time field is specified in a strange format. 
#    Find a way to make this a time attribute that you can use to divide into groups.

#converting the Violation Time column to timestamp
from pyspark.sql.functions import substring

tickets_2017_hour = tickets_2017_filtered.withColumn("Hour",col("Violation Time").substr(1,2))
tickets_2017_min = tickets_2017_hour.withColumn("Minutes", col("Violation Time").substr(3,2))
tickets_2017_new = tickets_2017_min.withColumn("TimeOfDay",col("Violation Time").substr(5,1))
tickets_2017_new.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----+-------+---------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Hour|Minutes|TimeOfDay|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----+-------+---------+
|    8478629828| 66623ME|                NY|2017-06-14 00:00:00|            47|             REFG|       MITSU|                14|             14|         1120A|  11|     20|        A|
|    5096917368| FZD8593|                NY|2017-06-13 00:00:00|             7|             SUBN|       ME/BE|                 0|              0|         0852P|  08|     52|        P|
|    1407740258| 2513JMG|                NY|2017-01-11 00:00:00|            78| 

In [165]:
#5(b)Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. 
#    For each of these groups, find the three most commonly occurring violations.

# creating view after removing the null or nan values from Violation Field
tickets_2017_new.createOrReplaceTempView("tickets_2017_new")

# Dividing Violation Time into 6 buckets
violation_time_interval = spark.sql("""SELECT *, CASE WHEN ((HOUR =="00") OR (HOUR =="01") OR (HOUR=="02") OR (HOUR=="03") AND (TimeOfDay == "A")) THEN "DAWN" WHEN ((HOUR =="04") OR (HOUR =="05") OR (HOUR=="06") OR (HOUR=="07") AND (TimeOfDay == "A")) THEN "MORNING" WHEN ((HOUR =="08") OR (HOUR =="09") OR (HOUR=="10") OR (HOUR=="11") AND (TimeOfDay == "A")) THEN "LATE MORNING" WHEN ((HOUR =="12") OR (HOUR =="01") OR (HOUR=="02") OR (HOUR=="03") AND (TimeOfDay == "P")) THEN "AFTERNOON" WHEN ((HOUR =="04") OR (HOUR =="05") OR (HOUR=="06") OR (HOUR=="07") AND (TimeOfDay == "P")) THEN "DUSK" WHEN ((HOUR =="08") OR (HOUR =="09") OR (HOUR=="10") OR (HOUR=="11") AND (TimeOfDay == "P")) THEN "NIGHT" END AS `Time of the Day` FROM tickets_2017_new""")

violation_time_interval.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----+-------+---------+---------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Hour|Minutes|TimeOfDay|Time of the Day|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----+-------+---------+---------------+
|    8478629828| 66623ME|                NY|2017-06-14 00:00:00|            47|             REFG|       MITSU|                14|             14|         1120A|  11|     20|        A|   LATE MORNING|
|    5096917368| FZD8593|                NY|2017-06-13 00:00:00|             7|             SUBN|       ME/BE|                 0|              0|         0852P|  08|     52|        P|   LATE MORNING|


#### Divided the Violation Time into 6 time intervals:
#### 1. DAWN - 00 HRS to 03 Hrs (AM)
#### 2. MORNING - 04 HRS to 07 Hrs (AM)
#### 3. LATE MORNING - 08 HRS to 11 Hrs (AM)
#### 4. AFTERNOON - 12 HRS to 03 Hrs (PM)
#### 5. DUSK - 04 HRS to 07 Hrs (PM)
#### 6. NIGHT - 08 HRS to 11 Hrs (PM)

In [166]:
#three most occuring violations for each time interval
violation_time_interval.createOrReplaceTempView("violation_per_time_interval")
# 3 most violation for dawn
three_most_violation_dawn = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) As count_VCode_dawn FROM violation_per_time_interval WHERE `Time of the Day`== "DAWN" GROUP BY `Violation Code` LIMIT 3""")
three_most_violation_dawn.show()

# 3 most violation for morning
three_most_violation_morning = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) As count_VCode_morning FROM violation_per_time_interval WHERE `Time of the Day`== "MORNING" GROUP BY `Violation Code` LIMIT 3""")
three_most_violation_morning.show()

# 3 most violation for late morning
three_most_violation_late_morning = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) As count_VCode_late_mor FROM violation_per_time_interval WHERE `Time of the Day`== "LATE MORNING" GROUP BY `Violation Code` LIMIT 3""")
three_most_violation_late_morning.show()

# 3 most violation for afternoon
three_most_violation_afternoon = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) As count_VCode_aftrn FROM violation_per_time_interval WHERE `Time of the Day`== "AFTERNOON" GROUP BY `Violation Code` LIMIT 3""")
three_most_violation_afternoon.show()

# 3 most violation for dusk
three_most_violation_dusk= spark.sql("""SELECT `Violation Code`, count(`Summons Number`) As count_VCode_dusk FROM violation_per_time_interval WHERE `Time of the Day`== "DUSK" GROUP BY `Violation Code` LIMIT 3""")
three_most_violation_dusk.show()

# 3 most violation for night
three_most_violation_night = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) As count_VCode_night FROM violation_per_time_interval WHERE `Time of the Day`== "NIGHT" GROUP BY `Violation Code` LIMIT 3""")
three_most_violation_night.show()

+--------------+----------------+
|Violation Code|count_VCode_dawn|
+--------------+----------------+
|            85|            6654|
|            76|               6|
|            91|              75|
+--------------+----------------+

+--------------+-------------------+
|Violation Code|count_VCode_morning|
+--------------+-------------------+
|            85|                670|
|            76|                  5|
|            22|                 20|
+--------------+-------------------+

+--------------+--------------------+
|Violation Code|count_VCode_late_mor|
+--------------+--------------------+
|            53|                6840|
|            78|                7375|
|            27|                1276|
+--------------+--------------------+

+--------------+-----------------+
|Violation Code|count_VCode_aftrn|
+--------------+-----------------+
|            31|            16583|
|            78|             1670|
|            27|              440|
+--------------+--------

In [167]:
#For the three most commonly occurring violation codes, find the most common time of the day
common_time_for_violation = spark.sql("""SELECT `Violation Code`, count(`Summons Number`) as Three_most_common_VCode FROM violation_per_time_interval GROUP BY `Violation Code` LIMIT 3""")
common_time_for_violation.show()

common_time_for_violation= spark.sql("""SELECT `Time Of the Day`, count(`Summons Number`) `Violation Code` FROM violation_per_time_interval WHERE `Violation Code` in (53,27,26) GROUP BY `Time Of the Day`""")
common_time_for_violation.show()

+--------------+-----------------------+
|Violation Code|Three_most_common_VCode|
+--------------+-----------------------+
|            53|                  19488|
|            81|                     14|
|            44|                      4|
+--------------+-----------------------+

+---------------+--------------+
|Time Of the Day|Violation Code|
+---------------+--------------+
|           DAWN|          5243|
|      AFTERNOON|          3274|
|   LATE MORNING|          8207|
|           DUSK|           180|
|        MORNING|          6125|
|          NIGHT|           158|
+---------------+--------------+



**6.Let’s try and find some seasonality in this data:
First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season. (Hint: Use Issue Date to segregate into seasons.)
Then, find the three most common violations for each of these seasons..**

* For this first we convert string issue date into date 
* then we extract month from it 
* according to month we will assign season.
* count number of tickits for each season
* then we count top 3 code for each season.

In [168]:
tickets_2017.createOrReplaceTempView("Q6")
df_Q6=spark.sql('select *,cast(`Issue Date` AS date) from Q6')
df_Q6.show(10)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Issue Date|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+----------+
|    8478629828| 66623ME|                NY|2017-06-14 00:00:00|            47|             REFG|       MITSU|                14|             14|         1120A|2017-06-14|
|    5096917368| FZD8593|                NY|2017-06-13 00:00:00|             7|             SUBN|       ME/BE|                 0|              0|         0852P|2017-06-13|
|    1407740258| 2513JMG|                NY|2017-01-11 00:00:00|            78|             DELV|       FRUEH|               106|           

In [171]:
tickets_2017.createOrReplaceTempView("Q61")

df_Q61=spark.sql('select *,month(`Issue Date`)as Month from Q61')
df_Q61.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Month|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
|    8478629828| 66623ME|                NY|2017-06-14 00:00:00|            47|             REFG|       MITSU|                14|             14|         1120A|    6|
|    5096917368| FZD8593|                NY|2017-06-13 00:00:00|             7|             SUBN|       ME/BE|                 0|              0|         0852P|    6|
|    1407740258| 2513JMG|                NY|2017-01-11 00:00:00|            78|             DELV|       FRUEH|               106|            106|         0015A|    1

In [172]:
df_Q61.createOrReplaceTempView("Q62")
df_Q62=spark.sql('select *,Case When Month in(7,8,9,10) then "Monsson" When Month in(2,3,4,5,6) then "Summer" Else"Winter" End as Season from Q62')
df_Q62.show(5)
df_Q62.groupby('Season').agg({'Summons Number' : 'count'}).sort(col('count(Summons Number)').desc()).show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----+------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|Month|Season|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+-----+------+
|    8478629828| 66623ME|                NY|2017-06-14 00:00:00|            47|             REFG|       MITSU|                14|             14|         1120A|    6|Summer|
|    5096917368| FZD8593|                NY|2017-06-13 00:00:00|             7|             SUBN|       ME/BE|                 0|              0|         0852P|    6|Summer|
|    1407740258| 2513JMG|                NY|2017-01-11 00:00:00|            78|             DELV|       FRUEH|               106| 

* Summer has higest frequency

In [173]:
df_Q62.createOrReplaceTempView("Q63")

** Winter **

In [174]:
df_Q6_winter=spark.sql('select `Violation code` from Q63 where `Season`="Winter"')
df_Q6_winter.groupby('Violation code').agg({'Violation code' : 'count'}).sort(col('count(Violation code)').desc()).show(3)

+--------------+---------------------+
|Violation code|count(Violation code)|
+--------------+---------------------+
|            36|               129769|
|            21|               119931|
|            38|                95451|
+--------------+---------------------+
only showing top 3 rows



** Summer **

In [175]:
df_Q6_Summer=spark.sql('select `Violation code` from Q63 where `Season`="Summer"')
df_Q6_Summer.groupby('Violation code').agg({'Violation code' : 'count'}).sort(col('count(Violation code)').desc()).show(3)

+--------------+---------------------+
|Violation code|count(Violation code)|
+--------------+---------------------+
|            21|               647909|
|            36|               532996|
|            38|               446618|
+--------------+---------------------+
only showing top 3 rows



** Monsson **

In [176]:
df_Q6_Monsson=spark.sql('select `Violation code` from Q63 where `Season`="Monsson"')
df_Q6_Monsson.groupby('Violation code').agg({'Violation code' : 'count'}).sort(col('count(Violation code)').desc()).show(3)

+--------------+---------------------+
|Violation code|count(Violation code)|
+--------------+---------------------+
|            46|                  287|
|            21|                  247|
|            40|                  146|
+--------------+---------------------+
only showing top 3 rows



* For each season we get same code in top3 which are 21,36,38

**7.The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. Let’s take an example of estimating this for the three most commonly occurring codes:**

* We count number of ticket fo each code then we multiply top 3 code with find which we get from previous question. 

In [177]:
dfQ7=tickets_2017.cube('Violation code').count().sort(col('count').desc())

In [178]:
dfQ7.show(2)

+--------------+-------+
|Violation code|  count|
+--------------+-------+
|          null|5431918|
|            21| 768087|
+--------------+-------+
only showing top 2 rows



* For 21 code total  768087 tickets are issued
 for 36 1400614 tickets and for 38 1062304 tickets are issued

In [179]:
spark.stop()