In [None]:
spark

In [None]:
s3path = 's3a://upgrad-spark-data/Parking_Violations_Issued_-_Fiscal_Year_2017.csv'

In [None]:
tickets_2017 = spark.read.format("csv").option("header", "true").load(s3path)

In [None]:
tickets_2017 = tickets_2017.select("Summons Number","Plate ID","Registration State","Issue Date","Violation Code","Vehicle Body Type","Vehicle Make","Violation Precinct","Issuer Precinct","Violation Time")


In [None]:
tickets_2017.printSchema()

In [None]:
tickets_2017.show(5)

In [None]:
tickets_2017.describe().show()

In [None]:
# 1. Find the total number of tickets for the year.
tickets_2017.count()

In [None]:
# For using SQL, you need to create a temporary view
tickets_2017.createOrReplaceTempView("data_2017")

In [None]:
# 2. Find out the number of unique states from where the cars 
# that got parking tickets came from. (Hint: Use 'Registration State')
spark.sql("SELECT count(distinct `Registration State`) as count FROM data_2017").show()

In [None]:
# Arranging the dataframe based on number of entries
spark.sql("SELECT `Registration State`, count(*) as count \
        FROM data_2017 group by `Registration State` order by `count` desc").show()

In [None]:
#Replacing '99' by 'NY' in the dataframe
from pyspark.sql import functions as F
tickets_2017_new = tickets_2017.withColumn('Registration State',F.when(tickets_2017['Registration State']=='99','NY').otherwise(tickets_2017['Registration State']))

In [None]:
# The temporary view needs to be recreated as values have been updated in tickets_2017_new
tickets_2017_new.createOrReplaceTempView("data_2017")

In [None]:
spark.sql("SELECT `Registration State`, count(*) as count \
        FROM data_2017 group by `Registration State` order by `count` desc").show()

In [None]:
spark.sql('SELECT count(distinct `Registration State`) as count \
                 FROM data_2017').show()

### AGGREGATION TASKS

In [None]:
spark.sql('SELECT `Violation Code`, count(*) as count \
                    FROM data_2017 group by `Violation Code` order by `count` desc limit 5').show()

In [None]:
## Here, `Violation Code` will be replaced by the each of the following variables:
spark.sql('SELECT `Vehicle Body Type`, count(*) as count FROM data_2017 \
                    group by `Vehicle Body Type` order by `count` desc limit 5').show()

In [None]:
## `Vehicle Make`
spark.sql('SELECT `Vehicle Make`, count(*) as count FROM data_2017 \
                    group by `Vehicle Make` order by `count` desc limit 5').show()

In [None]:
## Here, `Violation Code` will be replaced by 'Violation Precinct' and 'Issuer Precinct'
spark.sql('SELECT `Violation Precinct`, count(*) as count FROM data_2017 \
                    group by `Violation Precinct` order by `count` desc limit 6').show()

In [None]:
spark.sql('SELECT `Issuer Precinct`, count(*) as count FROM data_2017\
                    group by `Issuer Precinct` order by `count` desc limit 6').show()

In [None]:
spark.sql("SELECT `Issuer Precinct`, `Violation Code`, count(*) as count_tickets \
                    FROM data_2017 where `Issuer Precinct` = '19'\
                    group by `Issuer Precinct`, `Violation Code` \
                    order by `count_tickets` desc limit 5").show()

In [None]:
spark.sql("SELECT `Issuer Precinct`, `Violation Code`, count(*) as count_tickets \
                    FROM data_2017 where `Issuer Precinct` = '14'\
                    group by `Issuer Precinct`, `Violation Code` \
                    order by `count_tickets` desc limit 5").show()

In [None]:
spark.sql("SELECT `Issuer Precinct`, `Violation Code`, count(*) as count_tickets \
                    FROM data_2017 where `Issuer Precinct` = '1'\
                    group by `Issuer Precinct`, `Violation Code` \
                    order by `count_tickets` desc limit 5").show()

In [None]:
## Check for missing values
spark.sql("select count(*) as count\
                 FROM data_2017 where `Violation Time` is Null").show()

In [None]:
tickets_2017_new.select('Violation Time').dropna().count()

In [None]:
# Check for the operation to be performed

In [None]:
spark.sql("select `Violation Time`, if(right(`Violation Time`, 1) == 'A' or left(`Violation Time`, 2) == '12',concat(substring(`Violation Time`, 1,2),\
                    ':', substring(`Violation Time`, 3,2)), concat(int(substring(`Violation Time`, 1,2) + 12),\
                    ':', substring(`Violation Time`, 3,2))) as `Violation Time 2`\
                    from data_2017 limit 50").show()

In [None]:
# Creating a separate df with the required fields for analysis
time_violation_analysis = spark.sql("select if(right(`Violation Time`, 1) == 'A' or left(`Violation Time`, 2) == '12',\
  concat(substring(`Violation Time`, 1,2),':', substring(`Violation Time`,3,2)),\
  concat(int(substring(`Violation Time`, 1,2) + 12),':', substring(`Violation Time`, 3,2)))\
  as `Violation Time`, `Violation Code` from data_2017")

In [None]:
time_violation_analysis.show()

In [None]:
# For using SQL, you need to create a temporary view
time_violation_analysis.createOrReplaceTempView('time_violation_data')

In [None]:
time_violation_analysis = spark.sql('''select case
                                       when int(substring(`Violation Time`,1,2)) between 00 and 03
                                       then '00:00-03:59'
                                       when int(substring(`Violation Time`,1,2)) between 04 and 07
                                       then '04:00-07:59'
                                       when int(substring(`Violation Time`,1,2)) between 08 and 11
                                       then '08:00-11:59'
                                       when int(substring(`Violation Time`,1,2)) between 12 and 15
                                       then '12:00-15:59'
                                       when int(substring(`Violation Time`,1,2)) between 16 and 19
                                       then '16:00-19:59'
                                       else '20:00-23:59'
                                       end as bins,  `Violation Time`, `Violation Code`
                                       from time_violation_data''')

In [None]:
time_violation_analysis.show()

In [None]:
# Updating the SQL view
time_violation_analysis.createOrReplaceTempView('time_violation_data')

In [None]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '00:00-03:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

In [None]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '04:00-07:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

In [None]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '08:00-11:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

In [None]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '12:00-15:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

In [None]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '16:00-19:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

In [None]:
spark.sql("SELECT bins, `Violation Code`, count(*) as `count`\
                    FROM time_violation_data where bins = '20:00-23:59'\
                    group by bins, `Violation Code`\
                    order by `count` desc limit 3").show()

In [None]:
# Finding the 3 most commonly occurring violation codes
spark.sql("SELECT `Violation Code`, count(*) as `count`\
                    FROM time_violation_data\
                    group by `Violation Code`\
                    order by `count` desc limit 3").show()

In [None]:
spark.sql("SELECT `Violation Code`, bins, count(*) as `count`\
                    FROM time_violation_data where `Violation Code` = '21'\
                    group by `Violation Code`, bins\
                    order by `count` desc limit 1").show()

In [None]:
spark.sql("SELECT `Violation Code`, bins, count(*) as `count`\
                    FROM time_violation_data where `Violation Code` = '36'\
                    group by `Violation Code`, bins\
                    order by `count` desc limit 1").show()

In [None]:
spark.sql("SELECT `Violation Code`, bins, count(*) as `count`\
                    FROM time_violation_data where `Violation Code` = '38'\
                    group by `Violation Code`, bins\
                    order by `count` desc limit 1").show()

In [None]:
tickets_seasonality = spark.sql('''select `Violation Code`, `Issue Date`, case
                                  when month(to_date(`Issue Date`, 'yyyy-MM-dd')) between 03 and 05
                                  then 'spring'
                                  when month(to_date(`Issue Date`, 'yyyy-MM-dd')) between 06 and 08
                                  then 'summer'
                                  when month(to_date(`Issue Date`, 'yyyy-MM-dd')) between 09 and 11
                                  then 'autumn'
                                  when month(to_date(`Issue Date`, 'yyyy-MM-dd')) in (1,2,12)
                                  then 'winter'
                                  else 'unknown'
                                  end as season
                                  from data_2017''')

In [None]:
tickets_seasonality.show()

In [None]:
# For using SQL, you need to create a temporary view
tickets_seasonality.createOrReplaceTempView('seasonal_data')

In [None]:
spark.sql("select `season`, count(*) as no_of_tickets\
                    from seasonal_data\
                    group by `season`\
                    order by no_of_tickets desc").show()

In [None]:
spark.sql("select `season`, count(*) as no_of_tickets\
                    from seasonal_data\
                    group by `season`\
                    order by no_of_tickets desc").show()

In [None]:
spark.sql("select `season`, `Violation Code`, count(*) as no_of_tickets\
                    from seasonal_data where `season` = 'autumn' \
                    group by season, `Violation Code` order by no_of_tickets desc\
                    limit 3").show()

In [None]:
spark.sql("select `season`, `Violation Code`, count(*) as no_of_tickets\
                    from seasonal_data where `season` = 'summer' \
                    group by season, `Violation Code` order by no_of_tickets desc\
                    limit 3").show()

In [None]:
spark.sql("select `season`, `Violation Code`, count(*) as no_of_tickets\
                    from seasonal_data where `season` = 'winter' \
                    group by season, `Violation Code` order by no_of_tickets desc\
                    limit 3").show()

In [None]:
## Total occurrences of the 3 most common violation codes
spark.sql("select `Violation Code`, count(*) as `no_of_tickets`\
                    from data_2017\
                    group by `Violation Code`\
                    order by `no_of_tickets` desc\
                    limit 3").show()

In [None]:
spark.sql('''select `Violation Code`, case
                    when `Violation Code` = 21
                    then 55 * count(*)
                    when `Violation Code` = 36
                    then 50* count(*)
                    when `Violation Code` = 38
                    then 50* count(*)
                    else '0'
                    end as `fine_amount`
                    from data_2017
                    group by `Violation Code`
                    order by `fine_amount` desc
                    limit 3''').show()

In [None]:
spark.stop()