In [1]:
#importing SparkSession
from pyspark.sql import SparkSession
spark = SparkSession\
    .builder\
    .appName("NYC_Parking_CS")\
    .getOrCreate()

In [2]:
#reading and loading the CSV file
d_frame = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")

In [3]:
#show
d_frame.show(5)

+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|Summons Number|Plate ID|Registration State|         Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|
+--------------+--------+------------------+-------------------+--------------+-----------------+------------+------------------+---------------+--------------+
|    5092469481| GZH7067|                NY|2016-07-10 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0143A|
|    5092451658| GZH7067|                NY|2016-07-08 00:00:00|             7|             SUBN|       TOYOT|                 0|              0|         0400P|
|    4006265037| FZX9232|                NY|2016-08-23 00:00:00|             5|             SUBN|        FORD|                 0|              0|         0233P|
|    8478629828| 66623ME|         

#### Assumption: Assuming that the whole data belongs to the year 2017. We will not filter anything.

In [4]:
#all the columns 
d_frame.columns

['Summons Number',
 'Plate ID',
 'Registration State',
 'Issue Date',
 'Violation Code',
 'Vehicle Body Type',
 'Vehicle Make',
 'Violation Precinct',
 'Issuer Precinct',
 'Violation Time']

In [5]:
#All the columns seems to have space in their names, lets have them rectified, lets rename them.... 

d_frame = d_frame.withColumnRenamed("Summons Number", "Summons_Number")
d_frame = d_frame.withColumnRenamed("Plate ID","Plate_ID")
d_frame = d_frame.withColumnRenamed("Registration State", "Registration_State")
d_frame = d_frame.withColumnRenamed("Issue Date", "Issue_Date")
d_frame = d_frame.withColumnRenamed("Violation Code", "Violation_Code")
d_frame = d_frame.withColumnRenamed("Vehicle Body Type", "Vehicle_Body_Type")
d_frame = d_frame.withColumnRenamed("Vehicle Make", "Vehicle_Make")
d_frame = d_frame.withColumnRenamed("Violation Precinct", "Violation_Precinct")
d_frame = d_frame.withColumnRenamed("Issuer Precinct", "Issuer_Precinct")
d_frame = d_frame.withColumnRenamed("Violation Time", "Violation_Time")

#### Examine the dataframe

In [6]:
#examining
d_frame.schema

StructType(List(StructField(Summons_Number,LongType,true),StructField(Plate_ID,StringType,true),StructField(Registration_State,StringType,true),StructField(Issue_Date,TimestampType,true),StructField(Violation_Code,IntegerType,true),StructField(Vehicle_Body_Type,StringType,true),StructField(Vehicle_Make,StringType,true),StructField(Violation_Precinct,IntegerType,true),StructField(Issuer_Precinct,IntegerType,true),StructField(Violation_Time,StringType,true)))

In [7]:
#show
d_frame.describe().show()

+-------+-------------------+--------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+
|summary|     Summons_Number|Plate_ID|Registration_State|    Violation_Code| Vehicle_Body_Type|      Vehicle_Make|Violation_Precinct|  Issuer_Precinct|   Violation_Time|
+-------+-------------------+--------+------------------+------------------+------------------+------------------+------------------+-----------------+-----------------+
|  count|           10803028|10803028|          10803028|          10803028|          10803028|          10803028|          10803028|         10803028|         10803028|
|   mean|6.817447029065788E9|Infinity|              99.0|34.599430455979565|3.9258887134586864| 6519.974025974026| 45.01216260848347|46.82931211508477|909.2857142857143|
| stddev|2.320233962328227E9|     NaN|               0.0|19.359868716323483|0.5013415469252528|18091.257389147086|40.552560268435705|62.66703577269572

In [8]:
#scheman view
d_frame.printSchema()

root
 |-- Summons_Number: long (nullable = true)
 |-- Plate_ID: string (nullable = true)
 |-- Registration_State: string (nullable = true)
 |-- Issue_Date: timestamp (nullable = true)
 |-- Violation_Code: integer (nullable = true)
 |-- Vehicle_Body_Type: string (nullable = true)
 |-- Vehicle_Make: string (nullable = true)
 |-- Violation_Precinct: integer (nullable = true)
 |-- Issuer_Precinct: integer (nullable = true)
 |-- Violation_Time: string (nullable = true)



##### checking null values, column values

In [9]:
from pyspark.sql.functions import isnan
print(d_frame.filter((d_frame["Summons_Number"] == "") | d_frame["Summons_Number"].isNull() | isnan(d_frame["Summons_Number"])).count())
print(d_frame.filter((d_frame["Registration_State"] == "") | d_frame["Registration_State"].isNull() | isnan(d_frame["Registration_State"])).count())
print(d_frame.filter((d_frame["Plate_ID"] == "") | d_frame["Plate_ID"].isNull() | isnan(d_frame["Plate_ID"])).count())
print(d_frame.filter((d_frame["Violation_Code"] == "") | d_frame["Violation_Code"].isNull() | isnan(d_frame["Violation_Code"])).count())
print(d_frame.filter((d_frame["Vehicle_Body_Type"] == "") | d_frame["Vehicle_Body_Type"].isNull() | isnan(d_frame["Vehicle_Body_Type"])).count())
print(d_frame.filter((d_frame["Vehicle_Make"] == "") | d_frame["Vehicle_Make"].isNull() | isnan(d_frame["Vehicle_Make"])).count())
print(d_frame.filter((d_frame["Violation_Precinct"] == "") | d_frame["Violation_Precinct"].isNull() | isnan(d_frame["Violation_Precinct"])).count())
print(d_frame.filter((d_frame["Issuer_Precinct"] == "") | d_frame["Issuer_Precinct"].isNull() | isnan(d_frame["Issuer_Precinct"])).count())
print(d_frame.filter((d_frame["Violation_Time"] == "") | d_frame["Violation_Time"].isNull() | isnan(d_frame["Violation_Time"])).count())


0
0
0
0
0
0
0
0
0


##### looks like there are no null values in the columns

In [10]:
#dropping off NAs if any
d_frame = d_frame.na.drop()

### Examine the data(*Questions*)
#### 1. Find the total number of tickets for the year.

In [11]:
from pyspark.sql.functions import count
d_frame.count()

10803028

In [12]:
# After this checkpoint - we would require SQL queries, so to execute them, lets create a temp tableView.
d_frame.createOrReplaceTempView("temp_ViewTickets")

#### 2. Find out the number of unique states from where the cars that got parking tickets came. 
    (Hint: Use the column 'Registration State'.)
     There is a numeric entry '99' in the column, which should be corrected. 
       Replace it with the state having the maximum entries. Provide the number of unique states again.

In [13]:
print("Number of states with tickets: ")
states_tickets = spark.sql("SELECT count(distinct(Registration_State)) as states_tickets_unique_count FROM temp_ViewTickets")
print(states_tickets.show())

Number of states with tickets: 
+---------------------------+
|states_tickets_unique_count|
+---------------------------+
|                         67|
+---------------------------+

None


In [14]:
#tried writting the entire query in one cell, but it took quite a lot of time, so please refer below cells for th same...
#as per question, numeric etry '99' needs to be corrected, lets have a count

# Filter where Registration_State = "99"
from pyspark.sql.functions import col
temp_df = d_frame.where(col("Registration_State") == "99")
temp_df.count()

36625

In [15]:
#to replace '99' lets count the hoghest ticketing state
states_count = spark.sql("SELECT Registration_State, count(*) as total_count FROM temp_ViewTickets group by Registration_State order by total_count desc")
states_count.show(5)

+------------------+-----------+
|Registration_State|total_count|
+------------------+-----------+
|                NY|    8481061|
|                NJ|     925965|
|                PA|     285419|
|                FL|     144556|
|                CT|     141088|
+------------------+-----------+
only showing top 5 rows



In [16]:
#so its NewYork, replacing 99 with NY
d_frame = d_frame.na.replace(['99'], ['NY'], 'Registration_State')

In [18]:
#again the change needs to be encorporated, replacing the existing viewtable again
d_frame.createOrReplaceTempView("temp_ViewTickets")

In [19]:
# Below is the answer to the Que : Find out the number of unique states from where the cars that got parking tickets came. 
Unique_states_count = spark.sql("SELECT count(distinct(Registration_State)) as unique_states_count FROM temp_ViewTickets")
Unique_states_count.show()

+-------------------+
|unique_states_count|
+-------------------+
|                 66|
+-------------------+



##### Aggregation tasks

#### 1. How often does each violation code occur? Display the frequency of the top five violation codes.

In [20]:
voilation_code = spark.sql("SELECT Violation_Code, count(*) as total_count FROM temp_ViewTickets group by Violation_Code order by total_count desc")
voilation_code.show(5)

+--------------+-----------+
|Violation_Code|total_count|
+--------------+-----------+
|            21|    1528588|
|            36|    1400614|
|            38|    1062304|
|            14|     893498|
|            20|     618593|
+--------------+-----------+
only showing top 5 rows



#### 2. How often does each 'vehicle body type' get a parking ticket? How about the 'vehicle make'? 
  (Hint: Find the top 5 for both.)

In [21]:
print("How often does each 'vehicle body type' get a parking ticket?")
how_often = spark.sql("SELECT Vehicle_Body_Type, count(*) as total_count FROM temp_ViewTickets group by Vehicle_Body_Type order by total_count desc")
print(how_often.show(5))


print("How about the 'vehicle make'?")
v_make = spark.sql("SELECT Vehicle_Make, count(*) as total_count FROM temp_ViewTickets group by Vehicle_Make order by total_count desc")
v_make.show(5)
    

How often does each 'vehicle body type' get a parking ticket?
+-----------------+-----------+
|Vehicle_Body_Type|total_count|
+-----------------+-----------+
|             SUBN|    3719802|
|             4DSD|    3082020|
|              VAN|    1411970|
|             DELV|     687330|
|              SDN|     438191|
+-----------------+-----------+
only showing top 5 rows

None
How about the 'vehicle make'?
+------------+-----------+
|Vehicle_Make|total_count|
+------------+-----------+
|        FORD|    1280958|
|       TOYOT|    1211451|
|       HONDA|    1079238|
|       NISSA|     918590|
|       CHEVR|     714655|
+------------+-----------+
only showing top 5 rows



#### 3. A precinct is a police station that has a certain zone of the city under its command. Find the (5 highest) frequencies of tickets for each of the following:
'Violation Precinct' (This is the precinct of the zone where the violation occurred). Using this, can you draw any insights for parking violations in any specific areas of the city?
'Issuer Precinct' (This is the precinct that issued the ticket.)
Here, you would have noticed that the dataframe has the'Violating Precinct' or 'Issuing Precinct' as '0'. 
These are erroneous entries. Hence, you need to provide the records for five correct precincts. (Hint: Print the top six entries after sorting.)


In [22]:
top_v_precinct = spark.sql("SELECT Violation_Precinct, count(*) as total_count FROM temp_ViewTickets group by Violation_Precinct order by total_count desc")
top_v_precinct.show(6)

+------------------+-----------+
|Violation_Precinct|total_count|
+------------------+-----------+
|                 0|    2072400|
|                19|     535671|
|                14|     352450|
|                 1|     331810|
|                18|     306920|
|               114|     296514|
+------------------+-----------+
only showing top 6 rows



In [23]:
#Issuing Precincts (this is the precinct that issued the ticket)

top_i_precinct = spark.sql("SELECT Issuer_Precinct, count(*) as total_count FROM temp_ViewTickets group by Issuer_Precinct order by total_count desc")
top_i_precinct.show(6)

+---------------+-----------+
|Issuer_Precinct|total_count|
+---------------+-----------+
|              0|    2388479|
|             19|     521513|
|             14|     344977|
|              1|     321170|
|             18|     296553|
|            114|     289950|
+---------------+-----------+
only showing top 6 rows



#### 4. Find the violation code frequencies for three precincts that have issued the most number of tickets. 
   Do these precinct zones have an exceptionally high frequency of certain violation codes? 
   Are these codes common across precincts? 
   (Hint: In the SQL view, use the 'where' attribute to filter among three precincts.)

In [24]:
#It looks like '14', '19' and '1' are the top 3 precincts which were issued maximum tickets, so taking them into act..
issue_14 = spark.sql("SELECT Issuer_Precinct, Violation_Code, count(*) as total_tickets FROM temp_ViewTickets where Issuer_Precinct = '14' group by Issuer_Precinct, Violation_Code order by total_tickets desc")
issue_14.show(5)

issue_19 = spark.sql("SELECT Issuer_Precinct, Violation_Code, count(*) as total_tickets FROM temp_ViewTickets where Issuer_Precinct = '19' group by Issuer_Precinct, Violation_Code order by total_tickets desc")
issue_19.show(5)

issue_1 = spark.sql("SELECT Issuer_Precinct, Violation_Code, count(*) as total_tickets FROM temp_ViewTickets where Issuer_Precinct = '1' group by Issuer_Precinct, Violation_Code order by total_tickets desc")
issue_1.show(5)        

+---------------+--------------+-------------+
|Issuer_Precinct|Violation_Code|total_tickets|
+---------------+--------------+-------------+
|             14|            14|        73837|
|             14|            69|        58026|
|             14|            31|        39857|
|             14|            47|        30540|
|             14|            42|        20663|
+---------------+--------------+-------------+
only showing top 5 rows

+---------------+--------------+-------------+
|Issuer_Precinct|Violation_Code|total_tickets|
+---------------+--------------+-------------+
|             19|            46|        86390|
|             19|            37|        72437|
|             19|            38|        72344|
|             19|            14|        57563|
|             19|            21|        54700|
+---------------+--------------+-------------+
only showing top 5 rows

+---------------+--------------+-------------+
|Issuer_Precinct|Violation_Code|total_tickets|
+---------


######  For certain zones the precinct is high as compared above, and they are common accross, and we can say that the most common violation code is "14".    

#### 5. Find out the properties of parking violations across different times of the day:
    
  Find a way to deal with missing values, if any.
  (Hint: Check for the null values using 'isNull' under the SQL. Also, to remove the null values, check the 'dropna' command in the API documentation.)

The Violation Time field is specified in a strange format. Find a way to make this a time attribute that you can use to divide into groups.
Divide 24 hours into six equal discrete bins of time. Choose the intervals as you see fit. 
For each of these groups, find the three most commonly occurring violations.
(Hint: Use the CASE-WHEN in SQL view to segregate into bins. To find the most commonly occurring violations, you can use an approach similar to the one mentioned in the hint for question 4.)

Now, try another direction. For the three most commonly occurring violation codes, find the most common time of the day (in terms of the bins from the previous part).

In [25]:
#First of all, checking for null values..
null_times = spark.sql("select count(*) as null_time_count FROM temp_ViewTickets where Violation_Time is Null")
null_times.show()

+---------------+
|null_time_count|
+---------------+
|              0|
+---------------+



###### there are no null values.

In [26]:
##Further look into the column
violacing_column = spark.sql("select Violation_Time from temp_ViewTickets")
violacing_column.show(5)

+--------------+
|Violation_Time|
+--------------+
|         0143A|
|         0400P|
|         0233P|
|         1120A|
|         0555P|
+--------------+
only showing top 5 rows



Wierd looking time reported, lets make changes to that first.
well this particular question is huge and i don't want to replace the violatopn_time column with the new refined column in the existing d_frame.
So lets create a new d_frame just for the time, and find the analysis just for that, only for this question.
As stated in the queton moving to the 24 hour clock as PM and AM in the new data frame.

In [27]:
d_frame_Vtime = spark.sql("select Violation_Time, if( left(Violation_Time, 2) == '12' or right(Violation_Time, 1) == 'A', \
concat(substring(Violation_Time, 1,2),':', substring(Violation_Time, 3,2)), concat(int(substring(Violation_Time, 1,2)+12),':',\
substring(Violation_Time, 3,2))) as Refined_Violation_Time,Violation_Code from temp_ViewTickets")
    
d_frame_Vtime.show(10)

+--------------+----------------------+--------------+
|Violation_Time|Refined_Violation_Time|Violation_Code|
+--------------+----------------------+--------------+
|         0143A|                 01:43|             7|
|         0400P|                 16:00|             7|
|         0233P|                 14:33|             5|
|         1120A|                 11:20|            47|
|         0555P|                 17:55|            69|
|         0852P|                 20:52|             7|
|         0215A|                 02:15|            40|
|         0758A|                 07:58|            36|
|         1005A|                 10:05|            36|
|         0845A|                 08:45|             5|
+--------------+----------------------+--------------+
only showing top 10 rows



In [28]:
# As we will be needing SQL for this lets create a tempView for SQL...
d_frame_Vtime.createOrReplaceTempView("temp_ViewVTime")

In [29]:
spark.sql("select * from temp_ViewVTime").show()

+--------------+----------------------+--------------+
|Violation_Time|Refined_Violation_Time|Violation_Code|
+--------------+----------------------+--------------+
|         0143A|                 01:43|             7|
|         0400P|                 16:00|             7|
|         0233P|                 14:33|             5|
|         1120A|                 11:20|            47|
|         0555P|                 17:55|            69|
|         0852P|                 20:52|             7|
|         0215A|                 02:15|            40|
|         0758A|                 07:58|            36|
|         1005A|                 10:05|            36|
|         0845A|                 08:45|             5|
|         0015A|                 00:15|            78|
|         0707A|                 07:07|            19|
|         1022A|                 10:22|            36|
|         1150A|                 11:50|            21|
|         0525A|                 05:25|            40|
|         

In [30]:
#Now binning ... using case as mentioned in the question itself..

bins_Refined = spark.sql("select Refined_Violation_Time, Violation_Code, case \
                        when int(substring(`Refined_Violation_Time`,1,2)) between 00 and 04 then '[00-04]'\
                        when int(substring(`Refined_Violation_Time`,1,2)) between 05 and 08 then '[04-08]'\
                        when int(substring(`Refined_Violation_Time`,1,2)) between 09 and 12 then '[08-12]'\
                        when int(substring(`Refined_Violation_Time`,1,2)) between 13 and 16 then '[12-16]'\
                        when int(substring(`Refined_Violation_Time`,1,2)) between 16 and 20 then '[16-20]'\
                        else '[20-24]'\
                        end as bins_Refined_04 from temp_ViewVTime")

bins_Refined.show()    

+----------------------+--------------+---------------+
|Refined_Violation_Time|Violation_Code|bins_Refined_04|
+----------------------+--------------+---------------+
|                 01:43|             7|        [00-04]|
|                 16:00|             7|        [12-16]|
|                 14:33|             5|        [12-16]|
|                 11:20|            47|        [08-12]|
|                 17:55|            69|        [16-20]|
|                 20:52|             7|        [16-20]|
|                 02:15|            40|        [00-04]|
|                 07:58|            36|        [04-08]|
|                 10:05|            36|        [08-12]|
|                 08:45|             5|        [04-08]|
|                 00:15|            78|        [00-04]|
|                 07:07|            19|        [04-08]|
|                 10:22|            36|        [08-12]|
|                 11:50|            21|        [08-12]|
|                 05:25|            40|        [

In [31]:
#lets update the bins in the tempView
bins_Refined.createOrReplaceTempView("temp_ViewVTime")

In [32]:
#after binning lets try to find the most common voilating codes

common_codes = spark.sql("SELECT Violation_Code, count(*) as total_counts FROM temp_ViewVTime group by Violation_Code order by total_counts desc")
common_codes.show(3)

+--------------+------------+
|Violation_Code|total_counts|
+--------------+------------+
|            21|     1528588|
|            36|     1400614|
|            38|     1062304|
+--------------+------------+
only showing top 3 rows



In [33]:
#for each of the bin lets try find the top 3 like we did in the Question (4)

bin1 = spark.sql("SELECT bins_Refined_04, Violation_Code, count(*) as total_count \
                    FROM temp_ViewVTime where bins_Refined_04 = '[00-04]' \
                    group by bins_Refined_04, Violation_Code \
                    order by total_count desc limit 3")
print("for timeBin - [00-04]")
bin1.show()

#---------------------

bin2 = spark.sql("SELECT bins_Refined_04, Violation_Code, count(*) as total_count \
                    FROM temp_ViewVTime where bins_Refined_04 = '[04-08]' \
                    group by bins_Refined_04, Violation_Code \
                    order by total_count desc limit 3")
print("for timeBin - [04-08]")
bin2.show()

#---------------------

bin3 = spark.sql("SELECT bins_Refined_04, Violation_Code, count(*) as total_count \
                    FROM temp_ViewVTime where bins_Refined_04 = '[08-12]' \
                    group by bins_Refined_04, Violation_Code \
                    order by total_count desc limit 3")
print("for timeBin - [08-12]")
bin3.show()

#---------------------

bin4 = spark.sql("SELECT bins_Refined_04, Violation_Code, count(*) as total_count \
                    FROM temp_ViewVTime where bins_Refined_04 = '[12-16]' \
                    group by bins_Refined_04, Violation_Code \
                    order by total_count desc limit 3")
print("for timeBin - [12-16]")
bin4.show()

#---------------------

bin5 = spark.sql("SELECT bins_Refined_04, Violation_Code, count(*) as total_count \
                    FROM temp_ViewVTime where bins_Refined_04 = '[16-20]' \
                    group by bins_Refined_04, Violation_Code \
                    order by total_count desc limit 3")
print("for timeBin - [16-20]")
bin5.show()

#---------------------

bin6 = spark.sql("SELECT bins_Refined_04, Violation_Code, count(*) as total_count \
                    FROM temp_ViewVTime where bins_Refined_04 = '[20-24]' \
                    group by bins_Refined_04, Violation_Code \
                    order by total_count desc limit 3")
print("for timeBin - [20-24]")
bin6.show()

for timeBin - [00-04]
+---------------+--------------+-----------+
|bins_Refined_04|Violation_Code|total_count|
+---------------+--------------+-----------+
|        [00-04]|            21|      74996|
|        [00-04]|            40|      52000|
|        [00-04]|            78|      31628|
+---------------+--------------+-----------+

for timeBin - [04-08]
+---------------+--------------+-----------+
|bins_Refined_04|Violation_Code|total_count|
+---------------+--------------+-----------+
|        [04-08]|            21|     498338|
|        [04-08]|            14|     207958|
|        [04-08]|            36|     170484|
+---------------+--------------+-----------+

for timeBin - [08-12]
+---------------+--------------+-----------+
|bins_Refined_04|Violation_Code|total_count|
+---------------+--------------+-----------+
|        [08-12]|            21|     949967|
|        [08-12]|            36|     826311|
|        [08-12]|            38|     402756|
+---------------+--------------+

In [34]:
# Now as per the question, in the another approach since the 3 most common violation code: 21, 36, 38, lets have them sorted as well from previous part

code21 = spark.sql("SELECT Violation_Code, bins_Refined_04, count(*) as total_count \
                    FROM temp_ViewVTime where Violation_Code = '21' group by Violation_Code, bins_Refined_04 \
                    order by total_count desc")
print("for code21")
code21.show(1)

#---------------------

code36 = spark.sql("SELECT Violation_Code, bins_Refined_04, count(*) as total_count \
                    FROM temp_ViewVTime where Violation_Code = '36' group by Violation_Code, bins_Refined_04 \
                    order by total_count desc")
print("for code36")
code36.show(1)

#---------------------

code38 = spark.sql("SELECT Violation_Code, bins_Refined_04, count(*) as total_count \
                    FROM temp_ViewVTime where Violation_Code = '38' group by Violation_Code, bins_Refined_04 \
                    order by total_count desc")
print("for code38")
code38.show(1)

for code21
+--------------+---------------+-----------+
|Violation_Code|bins_Refined_04|total_count|
+--------------+---------------+-----------+
|            21|        [08-12]|     949967|
+--------------+---------------+-----------+
only showing top 1 row

for code36
+--------------+---------------+-----------+
|Violation_Code|bins_Refined_04|total_count|
+--------------+---------------+-----------+
|            36|        [08-12]|     826311|
+--------------+---------------+-----------+
only showing top 1 row

for code38
+--------------+---------------+-----------+
|Violation_Code|bins_Refined_04|total_count|
+--------------+---------------+-----------+
|            38|        [12-16]|     453644|
+--------------+---------------+-----------+
only showing top 1 row



#### 6. Let’s try and find some seasonality in this data:
    
   First, divide the year into a certain number of seasons, and find the frequencies of tickets for each season. 
        (Hint: Use Issue Date to segregate into seasons.)

   Then, find the three most common violations for each of these seasons.
        (Hint: You can use an approach similar to the one mentioned in the hint for question 4.)

In [35]:
spark.sql("select Violation_Code, Issue_Date from temp_ViewTickets where month(to_date(Issue_Date, 'MM/dd/yyyy')) > 12").show()        


+--------------+----------+
|Violation_Code|Issue_Date|
+--------------+----------+
+--------------+----------+



##### data has no months greater than 12, thats good
#### Since this is NY - USA we are talking about, so there could possibly be 4 seasons - Summer, Winter, Spring and Fall

In [36]:
d_frame_seasonsTickets = spark.sql("select Violation_Code, Issue_Date, case \
                    when month(to_date(Issue_Date, 'MM/dd/yyyy')) in (12,1,2)  then 'Winter' \
                    when month(to_date(Issue_Date, 'MM/dd/yyyy')) in (3,4,5)   then 'Spring' \
                    when month(to_date(Issue_Date, 'MM/dd/yyyy')) in (6,7,8)   then 'Summer' \
                    else 'Fall' \
                    end as NY_season from temp_ViewTickets")

In [37]:
d_frame_seasonsTickets.show()

+--------------+-------------------+---------+
|Violation_Code|         Issue_Date|NY_season|
+--------------+-------------------+---------+
|             7|2016-07-10 00:00:00|   Summer|
|             7|2016-07-08 00:00:00|   Summer|
|             5|2016-08-23 00:00:00|   Summer|
|            47|2017-06-14 00:00:00|   Summer|
|            69|2016-11-21 00:00:00|     Fall|
|             7|2017-06-13 00:00:00|   Summer|
|            40|2016-08-03 00:00:00|   Summer|
|            36|2016-12-21 00:00:00|   Winter|
|            36|2016-11-21 00:00:00|     Fall|
|             5|2016-10-05 00:00:00|     Fall|
|            78|2017-01-11 00:00:00|   Winter|
|            19|2016-09-27 00:00:00|     Fall|
|            36|2016-10-27 00:00:00|     Fall|
|            21|2016-09-30 00:00:00|     Fall|
|            40|2017-02-04 00:00:00|   Winter|
|            71|2016-07-07 00:00:00|   Summer|
|             7|2016-09-24 00:00:00|     Fall|
|            64|2017-01-26 00:00:00|   Winter|
|            

In [38]:
# creating a tempView table for analysis on seasons
d_frame_seasonsTickets.createOrReplaceTempView('temp_ViewSeasons')

In [39]:
# season wise tickets
spark.sql("Select NY_season, count(*) as total_tickets from temp_ViewSeasons group by NY_season order by total_tickets desc").show()

+---------+-------------+
|NY_season|total_tickets|
+---------+-------------+
|   Spring|      2880687|
|     Fall|      2830802|
|   Summer|      2606208|
|   Winter|      2485331|
+---------+-------------+



In [40]:
#Top 3 common violations for each season as done in Que(4)

print("For Spring:")
spark.sql("select NY_season, Violation_Code, count(*) as total_tickets from temp_ViewSeasons \
where NY_season = 'Spring' group by NY_season, Violation_Code order by total_tickets desc").show(3)

print("For Fall:")
spark.sql("select NY_season, Violation_Code, count(*) as total_tickets from temp_ViewSeasons \
where NY_season = 'Fall' group by NY_season, Violation_Code order by total_tickets desc").show(3)

print("For Summer:")
spark.sql("select NY_season, Violation_Code, count(*) as total_tickets from temp_ViewSeasons \
where NY_season = 'Summer' group by NY_season, Violation_Code order by total_tickets desc").show(3)

print("For Winter:")
spark.sql("select NY_season, Violation_Code, count(*) as total_tickets from temp_ViewSeasons \
where NY_season = 'Winter' group by NY_season, Violation_Code order by total_tickets desc").show(3)


For Spring:
+---------+--------------+-------------+
|NY_season|Violation_Code|total_tickets|
+---------+--------------+-------------+
|   Spring|            21|       402807|
|   Spring|            36|       344834|
|   Spring|            38|       271192|
+---------+--------------+-------------+
only showing top 3 rows

For Fall:
+---------+--------------+-------------+
|NY_season|Violation_Code|total_tickets|
+---------+--------------+-------------+
|     Fall|            36|       456046|
|     Fall|            21|       357479|
|     Fall|            38|       283828|
+---------+--------------+-------------+
only showing top 3 rows

For Summer:
+---------+--------------+-------------+
|NY_season|Violation_Code|total_tickets|
+---------+--------------+-------------+
|   Summer|            21|       405961|
|   Summer|            38|       247561|
|   Summer|            36|       240396|
+---------+--------------+-------------+
only showing top 3 rows

For Winter:
+---------+-------

######  As per the result sets we could infer that the codes like '21','36','38' are the most common occuring Violation_Codes for each the seasons.

#### 7. The fines collected from all the instances of parking violation constitute a source of revenue for the NYC Police Department. 
   Let’s take an example of estimating this for the three most commonly occurring codes.
   Find the total occurrences of the three most common violation codes.

In [41]:
spark.sql("select Violation_Code, count(*) as total_tickets from temp_ViewTickets \
                  group by Violation_Code order by total_tickets desc").show(3)

+--------------+-------------+
|Violation_Code|total_tickets|
+--------------+-------------+
|            21|      1528588|
|            36|      1400614|
|            38|      1062304|
+--------------+-------------+
only showing top 3 rows



#### Then, visit the website:
1. http://www1.nyc.gov/site/finance/vehicles/services-violation-codes.page
2. It lists the fines associated with different violation codes. 
3. They’re divided into two categories: one for the highest-density locations in the city and the other for the rest of the city. For the sake of simplicity, take the average of the two.
4. Using this information, find the total amount collected for the three violation codes with the maximum tickets. State the code that has the highest total collection.
5. What can you intuitively infer from these findings?

In [42]:
# Checked the site and here are Avegrages for the codes as found above:        
# Average fine amount for '21' = $55
## Average fine amount for '36' = $50
### Average fine amount for '38' = $50


print("For Code '21' whose Average_fine_amount = $55")
spark.sql("select Violation_Code,(count(*) * 55) as fine from temp_ViewTickets \
                  where Violation_Code = 21 group by Violation_Code order by fine desc").show()


print("For Code '36' whose Average_fine_amount = $50")
spark.sql("select Violation_Code,(count(*) * 50) as fine from temp_ViewTickets \
                  where Violation_Code = 36 group by Violation_Code order by fine desc").show()


print("For Code '38' whose Average_fine_amount = $50")
spark.sql("select Violation_Code,(count(*) * 50) as fine from temp_ViewTickets \
                  where Violation_Code = 38 group by Violation_Code order by fine desc").show()

For Code '21' whose Average_fine_amount = $55
+--------------+--------+
|Violation_Code|    fine|
+--------------+--------+
|            21|84072340|
+--------------+--------+

For Code '36' whose Average_fine_amount = $50
+--------------+--------+
|Violation_Code|    fine|
+--------------+--------+
|            36|70030700|
+--------------+--------+

For Code '38' whose Average_fine_amount = $50
+--------------+--------+
|Violation_Code|    fine|
+--------------+--------+
|            38|53115200|
+--------------+--------+



##### Inference

1. Total fine sums up to slightly greater than $207M.
2. Violation_Code that has the highest total collection: 21 
(~40% of total fine)
3. Main reason for fine:
(Street Cleaning: No parking where parking is not allowed by sign, street marking or traffic control device)

In [43]:
spark.stop()