In [2]:
# class pyspark.sql.SparkSession, The entry point to programming Spark with the Dataset and DataFrame API.

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("PySpark DataFrame and Sql") \
    .getOrCreate()

# Create DataFrames by loading data file from HDFS

In [3]:
# Datafram can be created by by calling read method on spark object

df = spark.read.format("csv").option("header", "true").load("/common_folder/nyc_parking/Parking_Violations_Issued_-_Fiscal_Year_2017.csv")
df.take(3)

[Row(Summons Number='5092469481', Plate ID='GZH7067', Registration State='NY', Issue Date='2016-07-10', Violation Code='7', Vehicle Body Type='SUBN', Vehicle Make='TOYOT', Violation Precinct='0', Issuer Precinct='0', Violation Time='0143A'),
 Row(Summons Number='5092451658', Plate ID='GZH7067', Registration State='NY', Issue Date='2016-07-08', Violation Code='7', Vehicle Body Type='SUBN', Vehicle Make='TOYOT', Violation Precinct='0', Issuer Precinct='0', Violation Time='0400P'),
 Row(Summons Number='4006265037', Plate ID='FZX9232', Registration State='NY', Issue Date='2016-08-23', Violation Code='5', Vehicle Body Type='SUBN', Vehicle Make='FORD', Violation Precinct='0', Issuer Precinct='0', Violation Time='0233P')]

#### Observation: Date loaded seems to have tickets issued in the year 2016 

In [4]:
#DataFame will have columns, and we use a schema to define them.
df.printSchema()

# printSchema returns schema in tree format

root
 |-- Summons Number: string (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Violation Precinct: string (nullable = true)
 |-- Issuer Precinct: string (nullable = true)
 |-- Violation Time: string (nullable = true)



In [5]:
df.schema
#Returns the schema of this DataFrame as a pyspark.sql.types.StructType.

StructType(List(StructField(Summons Number,StringType,true),StructField(Plate ID,StringType,true),StructField(Registration State,StringType,true),StructField(Issue Date,StringType,true),StructField(Violation Code,StringType,true),StructField(Vehicle Body Type,StringType,true),StructField(Vehicle Make,StringType,true),StructField(Violation Precinct,StringType,true),StructField(Issuer Precinct,StringType,true),StructField(Violation Time,StringType,true)))

In [6]:
# Register initial DataFrame df as temp table dfTable
df.createOrReplaceTempView("dfTable")
spark.sql('SELECT MIN(`Issue Date`) AS mindate, MAX(`Issue Date`) AS maxdate FROM dfTable').take(1)

[Row(mindate='1972-03-30', maxdate='2069-11-19')]

#### Observation: Data loaded has tickets issued across many years. 1972 is the oldest and 2069 is latest. 2069 year does not make any sense. This is bad data.

Our focus is to analyze the tickets issued in the year of 2017. Time to clean up the data to ensure that we only analyze tickets issued in the year 2017.

In [7]:
# Filtering data for year 2017
df_2017_Issued = spark.sql('SELECT * FROM dfTable where year(`Issue Date`) = 2017')
df_2017_Issued.createOrReplaceTempView("df_2017_Issued_Table")

spark.sql('SELECT MIN(`Issue Date`) AS mindate, MAX(`Issue Date`) AS maxdate FROM df_2017_Issued_Table').take(1)

[Row(mindate='2017-01-01', maxdate='2017-12-31')]

#### Observation: Now the data seems to be limited to only year 2017.

We will continue with this data for further analysis.

In [8]:
print(spark.sql('SELECT count(`Summons Number`) AS summons_count FROM df_2017_Issued_Table').take(1))
print(spark.sql('SELECT count(distinct `Summons Number`) AS distinct_summons_count FROM df_2017_Issued_Table').take(1))

[Row(summons_count=5431918)]
[Row(distinct_summons_count=5431918)]


#### Observation: We do not seem to have any duplicate Summons Number. We are good to continue with further analysis.
### Examine the data 1. The total number of tickets for the year 2017 is 5431918

#### Number of tickets issued per state vehicle.

In [9]:
print('Total number of states that have cars from with issued tickets is ' + str(spark.sql('SELECT count (distinct `Registration State`) no_of_states  FROM df_2017_Issued_Table').first()[0]))
spark.sql('SELECT `Registration State`, count(`Registration State`) `Violation Count` FROM df_2017_Issued_Table group by 1 order by 2 desc limit 15').show()

Total number of states that have cars from with issued tickets is 65
+------------------+---------------+
|Registration State|Violation Count|
+------------------+---------------+
|                NY|        4273951|
|                NJ|         475825|
|                PA|         140286|
|                CT|          70403|
|                FL|          69468|
|                IN|          45525|
|                MA|          38941|
|                VA|          34367|
|                MD|          30213|
|                NC|          27152|
|                TX|          18827|
|                IL|          18666|
|                GA|          17537|
|                99|          16055|
|                AZ|          12379|
+------------------+---------------+



#### Observation: Total number of states that have issued tickets is 65. Invalid state 99 found issuing 16055 tickets.
NY cars having maximum number of tickets issued, tickets with 99 state will be updated to NY.

In [10]:
from pyspark.sql.functions import *
df_2017_Issued = df_2017_Issued.withColumn('Registration State', regexp_replace('Registration State', '99', 'NY'))

In [11]:
df_2017_Issued.createOrReplaceTempView("df_2017_Issued_Table")
spark.sql('SELECT `Registration State`, count(`Registration State`) `Violation Count` FROM df_2017_Issued_Table group by 1 order by 2 desc limit 15').show()
print('Total number of states that have cars from with issued tickets is ' + str(spark.sql('SELECT count (distinct `Registration State`) no_of_states FROM df_2017_Issued_Table').first()[0]))

+------------------+---------------+
|Registration State|Violation Count|
+------------------+---------------+
|                NY|        4290006|
|                NJ|         475825|
|                PA|         140286|
|                CT|          70403|
|                FL|          69468|
|                IN|          45525|
|                MA|          38941|
|                VA|          34367|
|                MD|          30213|
|                NC|          27152|
|                TX|          18827|
|                IL|          18666|
|                GA|          17537|
|                AZ|          12379|
|                OH|          12281|
+------------------+---------------+

Total number of states that have cars from with issued tickets is 64


#### Observation: 16055 tickets are now updated to NY state now having 4290006 tickets issued.
### Examine the data 2. Total number of states that have cars from with issued tickets is 64.

### Aggregation 1. Frequency of the top five violation codes.

In [12]:
spark.sql('select `Violation Code`, count(`Violation Code`) `Violation Count` from df_2017_Issued_Table group by 1 order by 2 desc limit 5').show()

+--------------+---------------+
|Violation Code|Violation Count|
+--------------+---------------+
|            21|         768087|
|            36|         662765|
|            38|         542079|
|            14|         476664|
|            20|         319646|
+--------------+---------------+



### Aggregation 2. How often does each 'vehicle body type' and 'vehicle make' get a parking ticket? 

In [13]:
spark.sql('select `Vehicle Body Type`, count(`Vehicle Body Type`) `Tickets count` from df_2017_Issued_Table group by 1 order by 2 desc limit 5').show()
spark.sql('select `Vehicle Make`, count(`Vehicle Make`) `Tickets count` from df_2017_Issued_Table group by 1 order by 2 desc limit 5').show()

+-----------------+-------------+
|Vehicle Body Type|Tickets count|
+-----------------+-------------+
|             SUBN|      1883954|
|             4DSD|      1547312|
|              VAN|       724029|
|             DELV|       358984|
|              SDN|       194197|
+-----------------+-------------+

+------------+-------------+
|Vehicle Make|Tickets count|
+------------+-------------+
|        FORD|       636844|
|       TOYOT|       605291|
|       HONDA|       538884|
|       NISSA|       462017|
|       CHEVR|       356032|
+------------+-------------+



### Aggregation 3.

In [14]:
spark.sql('select `Violation Precinct`, count(`Violation Precinct`) `Violation Count` from df_2017_Issued_Table group by 1 order by 2 desc limit 5').show()
spark.sql('select `Issuer Precinct`, count(`Issuer Precinct`) `Violation Count` from df_2017_Issued_Table group by 1 order by 2 desc limit 5').show()

+------------------+---------------+
|Violation Precinct|Violation Count|
+------------------+---------------+
|                 0|         925596|
|                19|         274445|
|                14|         203553|
|                 1|         174702|
|                18|         169131|
+------------------+---------------+

+---------------+---------------+
|Issuer Precinct|Violation Count|
+---------------+---------------+
|              0|        1078406|
|             19|         266961|
|             14|         200495|
|              1|         168740|
|             18|         162994|
+---------------+---------------+



In [15]:
spark.sql('select `Violation Precinct`, count(`Violation Precinct`) `Violation Count` from df_2017_Issued_Table group by 1 order by 2 desc limit 6').show()
spark.sql('select `Issuer Precinct`, count(`Issuer Precinct`) `Violation Count` from df_2017_Issued_Table group by 1 order by 2 desc limit 6').show()

+------------------+---------------+
|Violation Precinct|Violation Count|
+------------------+---------------+
|                 0|         925596|
|                19|         274445|
|                14|         203553|
|                 1|         174702|
|                18|         169131|
|               114|         147444|
+------------------+---------------+

+---------------+---------------+
|Issuer Precinct|Violation Count|
+---------------+---------------+
|              0|        1078406|
|             19|         266961|
|             14|         200495|
|              1|         168740|
|             18|         162994|
|            114|         144054|
+---------------+---------------+



#### Observation: Precinct 0 assumed as invalid/erraneous, Precinct 19, 14, 1, 18, 114 in the descending order have maximum violations recorded.

### Aggregation 4. Violation code frequencies for three precincts that have issued the most number of tickets

In [16]:
spark.sql('select `Issuer Precinct`, `Violation code`, count(`Violation code`) `Violation count` from df_2017_Issued_Table group by 1, 2 order by 3 desc').where("`Issuer Precinct` in ('19')").show(10)
spark.sql('select `Issuer Precinct`, `Violation code`, count(`Violation code`) `Violation count` from df_2017_Issued_Table group by 1, 2 order by 3 desc').where("`Issuer Precinct` in ('14')").show(10)
spark.sql('select `Issuer Precinct`, `Violation code`, count(`Violation code`) `Violation count` from df_2017_Issued_Table group by 1, 2 order by 3 desc').where("`Issuer Precinct` in ('1')").show(10)

+---------------+--------------+---------------+
|Issuer Precinct|Violation code|Violation count|
+---------------+--------------+---------------+
|             19|            46|          48445|
|             19|            38|          36386|
|             19|            37|          36056|
|             19|            14|          29797|
|             19|            21|          28415|
|             19|            20|          14629|
|             19|            40|          11416|
|             19|            16|           9926|
|             19|            71|           7493|
|             19|            19|           6856|
+---------------+--------------+---------------+
only showing top 10 rows

+---------------+--------------+---------------+
|Issuer Precinct|Violation code|Violation count|
+---------------+--------------+---------------+
|             14|            14|          45036|
|             14|            69|          30464|
|             14|            31|          2

#### Observations:
 - Violation Code **14** seems to be the most common across all the top 3 Precincts.
 - Precinct 19 has 46, 38, 37, **14**, 21 Violation codes in the descending order of most common.
 - Precinct 14 has **14**, 69, 31, 47, 42 Violation codes in the descending order of most common.
 - Precinct 1 has **14**, 16, 20, 46, 38 Violation codes in the descending order of most common.
 - Violation code **19** does not seem to be occuring too often when compared to the most common violation across all the top 3 precincts.

### Aggregation 5. Properties of parking violations across different times of the day.
Taking out the hour from the violation time and picking only rows which have valid data i.e. violation time < 24

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date
timedf = spark.sql("select *, case when substr(`Violation Time`,5,1) = 'P' and substr(`Violation Time`,0,2) <> '12' then int(substr(`Violation Time`,0,2) + 12) else int(substr(`Violation Time`,0,2)+0) end as vtime from df_2017_Issued_Table").where("substr(`Violation Time`,1,2) < 13")
timedf.show(2)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|vtime|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|       MITSU|                14|             14|         1120A|   11|
|    5096917368| FZD8593|                NY|2017-06-13|             7|             SUBN|       ME/BE|                 0|              0|         0852P|   20|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
only showing top 2 rows



# Dropping Null values

In [19]:
timedf.filter("`Violation Time` is null").show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|vtime|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+



No Null values in the data

In [20]:
timedf.createOrReplaceTempView("df_2017_timedf_Table");
spark.sql('select * from df_2017_timedf_Table').show(5)

+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
|Summons Number|Plate ID|Registration State|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Violation Precinct|Issuer Precinct|Violation Time|vtime|
+--------------+--------+------------------+----------+--------------+-----------------+------------+------------------+---------------+--------------+-----+
|    8478629828| 66623ME|                NY|2017-06-14|            47|             REFG|       MITSU|                14|             14|         1120A|   11|
|    5096917368| FZD8593|                NY|2017-06-13|             7|             SUBN|       ME/BE|                 0|              0|         0852P|   20|
|    1407740258| 2513JMG|                NY|2017-01-11|            78|             DELV|       FRUEH|               106|            106|         0015A|    0|
|    1413656420|T672371C|                NY|2017-02-

# Checking which code falls under what time of the day

In [21]:
df_2017_timeOfDay   =   spark.sql("""select  *, 
                                case 
                                    when vtime between 0 and 3 
                                    then 'Mid Night' 
                                    when vtime between 4 and 7 
                                    then 'Early Morning' 
                                    when vtime between 8 and 11
                                    then 'Late morning' 
                                    when vtime between 12 and 15
                                    then 'Afternoon'
                                    when vtime between 16 and 19
                                    then 'Evening'
                                    when vtime between 20 and 23
                                    then 'Night'
                                    else 'Unknown'
                                end as TimeOFDay
                                from df_2017_timedf_Table""");

In [22]:
df_2017_timeOfDay.createOrReplaceTempView("df_2017_timeOfDay_Table");

In [23]:
spark.sql('select  TimeOFDay, count(1) `Violation Count` from df_2017_timeOfDay_Table group by 1 order by 2 desc').show()

+-------------+---------------+
|    TimeOFDay|Violation Count|
+-------------+---------------+
| Late morning|        2163568|
|    Afternoon|        1857194|
|      Evening|         637540|
|Early Morning|         449885|
|        Night|         176360|
|    Mid Night|         147300|
+-------------+---------------+



In [24]:
spark.sql('select TimeOFDay,`Violation Code`, count(`Violation Code`) from df_2017_timeOfDay_Table group by 1,2 order by 3 desc').where("`Violation Code` in ('21')").show(1)
spark.sql('select TimeOFDay,`Violation Code`, count(`Violation Code`) from df_2017_timeOfDay_Table group by 1,2 order by 3 desc').where("`Violation Code` in ('36')").show(1)
spark.sql('select TimeOFDay,`Violation Code`, count(`Violation Code`) from df_2017_timeOfDay_Table group by 1,2 order by 3 desc').where("`Violation Code` in ('38')").show(1)

+------------+--------------+---------------------+
|   TimeOFDay|Violation Code|count(Violation Code)|
+------------+--------------+---------------------+
|Late morning|            21|               598070|
+------------+--------------+---------------------+
only showing top 1 row

+------------+--------------+---------------------+
|   TimeOFDay|Violation Code|count(Violation Code)|
+------------+--------------+---------------------+
|Late morning|            36|               348165|
+------------+--------------+---------------------+
only showing top 1 row

+---------+--------------+---------------------+
|TimeOFDay|Violation Code|count(Violation Code)|
+---------+--------------+---------------------+
|Afternoon|            38|               240795|
+---------+--------------+---------------------+
only showing top 1 row



Observations:
- Across the time of the day, Mostly violation occurs in the late morning, that is from 8 AM to 12 PM.

### Aggregation 6. Seasonality in the data.
Dividing the data into real seasons to see if that will identify any pattern. 

 - Winter Season - December, January, February
 - Spring Season - March, April, May
 - Summer Season - June, July, August
 - Fall Season - September, October, November

In [25]:
df_2017_seasonal = spark.sql("""select  *, 
                                case 
                                    when month(`Issue Date`) = 12 or
                                         month(`Issue Date`) = 1 or 
                                         month(`Issue Date`) = 2 
                                    then 'Winter' 
                                    when month(`Issue Date`) = 3 or 
                                         month(`Issue Date`) = 4 or 
                                         month(`Issue Date`) = 5 
                                    then 'Spring' 
                                    when month(`Issue Date`) = 6 or 
                                         month(`Issue Date`) = 7 or 
                                         month(`Issue Date`) = 8 
                                    then 'Summer' 
                                    when month(`Issue Date`) = 9 or
                                         month(`Issue Date`) = 10 or
                                         month(`Issue Date`) = 11
                                    then 'Fall'
                                    else 'Unknown'
                                end as Season
                                from df_2017_timeOfDay_Table""");
df_2017_seasonal.createOrReplaceTempView("df_2017_Seasonal_Table");

spark.sql('select  Season, count(1) `Violation Count` from df_2017_Seasonal_Table group by 1 order by 2 desc').show()

+------+---------------+
|Season|Violation Count|
+------+---------------+
|Spring|        2873340|
|Winter|        1704673|
|Summer|         852855|
|  Fall|            979|
+------+---------------+



#### Observations: 
 - No data found for 'Unknown'. Hence no bad dates.
 - **Spring** season followed by **Winter** season seem to have maximum number of violations. These seasons are known for cold climate. 

In [26]:
spark.sql('select  Season, `Violation Code`, count(1) `Violation Count` from df_2017_Seasonal_Table group by 1, 2 order by 3 desc').where("Season = 'Spring'").show(5)
spark.sql('select  Season, `Violation Code`, count(1) `Violation Count` from df_2017_Seasonal_Table group by 1, 2 order by 3 desc').where("Season = 'Winter'").show(5)
spark.sql('select  Season, `Violation Code`, count(1) `Violation Count` from df_2017_Seasonal_Table group by 1, 2 order by 3 desc').where("Season = 'Summer'").show(5)
spark.sql('select  Season, `Violation Code`, count(1) `Violation Count` from df_2017_Seasonal_Table group by 1, 2 order by 3 desc').where("Season = 'Fall'").show(5)

+------+--------------+---------------+
|Season|Violation Code|Violation Count|
+------+--------------+---------------+
|Spring|            21|         402408|
|Spring|            36|         344834|
|Spring|            38|         271167|
|Spring|            14|         256396|
|Spring|            46|         173437|
+------+--------------+---------------+
only showing top 5 rows

+------+--------------+---------------+
|Season|Violation Code|Violation Count|
+------+--------------+---------------+
|Winter|            21|         238180|
|Winter|            36|         221268|
|Winter|            38|         187385|
|Winter|            14|         142262|
|Winter|            20|          97996|
+------+--------------+---------------+
only showing top 5 rows

+------+--------------+---------------+
|Season|Violation Code|Violation Count|
+------+--------------+---------------+
|Summer|            21|         127347|
|Summer|            36|          96663|
|Summer|            38|       

#### Observations: 
- Across the **Spring/Winter/Summer** seasons, Violation codes 21, 36, 38  in descending order seem to the most common.
- Violation codes 21, 14 also are among the top few in Fall season.
- In **Fall** season, Violation Codes 46, 21, 40 in descending order seem to be the most common.

### Aggregation 7. 
### Finding the total occurrences of the three most common violation codes

In [27]:
spark.sql('select  `Violation code`, count(`Violation code`) from df_2017_Seasonal_Table group by 1 order by 2 desc').show(3)

+--------------+---------------------+
|Violation code|count(Violation code)|
+--------------+---------------------+
|            21|               768063|
|            36|               662765|
|            38|               542078|
+--------------+---------------------+
only showing top 3 rows



## Computing fines associated with different violation codes

In [28]:
# Compute for total fine amount collected for vioaltaion code 21 

high_dens_fine_viol_code_21 = 65
others_fine_viol_code_21 = 45

avg_fine_viol_code_21 = (high_dens_fine_viol_code_21 + others_fine_viol_code_21)/2
print ('Average fine for violation code 21 : $' + str(avg_fine_viol_code_21))

Total_count_viol_code_21  = spark.sql('select  `Violation code`, count(`Violation code`)  from df_2017_Seasonal_Table group by 1 order by 2 desc').where("`Violation code` in ('21')").first()[1]
print ('Toatl  count for violation code 21 : ' + str(Total_count_viol_code_21))

Total_fine_viol_code_21 = Total_count_viol_code_21*avg_fine_viol_code_21
print ('Total fine for  violation code 21 : $' + str(Total_fine_viol_code_21))

Average fine for violation code 21 : $55.0
Toatl  count for violation code 21 : 768063
Total fine for  violation code 21 : $42243465.0


In [29]:
# Compute for total fine amount collected for vioaltaion code 36 

high_dens_fine_viol_code_36 = 50
others_fine_viol_code_36 = 50

avg_fine_viol_code_36 = (high_dens_fine_viol_code_36 + others_fine_viol_code_36)/2
print ('Average fine for violation code 36 : $' + str(avg_fine_viol_code_36))

Total_count_viol_code_36  = spark.sql('select  `Violation code`, count(`Violation code`)  from df_2017_Seasonal_Table group by 1 order by 2 desc').where("`Violation code` in ('36')").first()[1]
print ('Toatl  count for violation code 36 : ' + str(Total_count_viol_code_36))

Total_fine_viol_code_36 = Total_count_viol_code_36*avg_fine_viol_code_36
print ('Total fine for  violation code 36 : $' + str(Total_fine_viol_code_36))

Average fine for violation code 36 : $50.0
Toatl  count for violation code 36 : 662765
Total fine for  violation code 36 : $33138250.0


In [30]:
# Compute for total fine amount collected for vioaltaion code 38 

high_dens_fine_viol_code_38 = 65
others_fine_viol_code_38 = 35

avg_fine_viol_code_38 = (high_dens_fine_viol_code_38 + others_fine_viol_code_38)/2
print ('Average fine for violation code 38 : $' + str(avg_fine_viol_code_38))

Total_count_viol_code_38  = spark.sql('select  `Violation code`, count(`Violation code`)  from df_2017_Seasonal_Table group by 1 order by 2 desc').where("`Violation code` in ('38')").first()[1]
print ('Toatl  count for violation code 38 : ' + str(Total_count_viol_code_38))

Total_fine_viol_code_38 = Total_count_viol_code_38*avg_fine_viol_code_38
print ('Total fine for  violation code 38 : $' + str(Total_fine_viol_code_38))

Average fine for violation code 38 : $50.0
Toatl  count for violation code 38 : 542078
Total fine for  violation code 38 : $27103900.0


 ### Observation : The code that has the highest total collection is 21

 #### We can Infer from the above  findings :
 #### The violation code 21 results highest average penalty/fine  among top three viaolation code 
 #### also the highest total  collection genetares  from violation code 21
 