In [80]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, LongType, TimestampType
from pyspark.sql.functions import col, to_date
from pyspark.sql import functions as F, types as T

In [2]:
spark = SparkSession.builder.appName("NYC Parking Tickets").getOrCreate()

### **DATA**

In [194]:
df = spark.read.csv('/content/sample_data/nyc_parking_tickets/*.csv', header=True, inferSchema=True)
df2= spark.read.csv('/content/sample_data/violation_codes/*.csv', header=True, inferSchema=True)

In [173]:
df.show()

+--------------+--------+------------------+----------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+--------------------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+---------------------+------------+------------+--------------+-------------------+
|Summons Number|Plate ID|Registration State|Plate Type|Issue Date|Violation Code|Vehicle Body Type|Vehicle Make|Issuing Agency|Street Code1|Street Code2|Street Code3|Vehicle Expiration Date|Violation Location|Violation Precinct|Issuer Precinct|Issuer Code|Issuer Command|Issuer Squad|Violation Time|Time First Observed|Violation County|Violation

In [195]:
df2.show()

+--------------+---------------------+---------------+------------+
|VIOLATION CODE|VIOLATION DESCRIPTION|Manhattan_FineA|Others_FineA|
+--------------+---------------------+---------------+------------+
|             1| FAILURE TO DISPLA...|            515|         515|
|             2| NO OPERATOR NAM/A...|            515|         515|
|             3| UNAUTHORIZED PASS...|            515|         515|
|             4| BUS PARKING IN LO...|            115|         115|
|             5|   BUS LANE VIOLATION|             50|          50|
|             6| OVERNIGHT TRACTOR...|            265|         265|
|             7| FAILURE TO STOP A...|             50|          50|
|             8|               IDLING|            115|         115|
|             9| OBSTRUCTING TRAFF...|            115|         115|
|            10| NO STOPPING-DAY/T...|            115|         115|
|            11| NO STANDING-HOTEL...|            115|         115|
|            12| MOBILE BUS LANE V...|          

In [174]:
df.printSchema()

root
 |-- Summons Number: long (nullable = true)
 |-- Plate ID: string (nullable = true)
 |-- Registration State: string (nullable = true)
 |-- Plate Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Code: integer (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Vehicle Make: string (nullable = true)
 |-- Issuing Agency: string (nullable = true)
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Vehicle Expiration Date: integer (nullable = true)
 |-- Violation Location: integer (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Issuer Precinct: integer (nullable = true)
 |-- Issuer Code: integer (nullable = true)
 |-- Issuer Command: string (nullable = true)
 |-- Issuer Squad: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Time First Observed: string (nullable = true)
 |-- Violation Coun

In [196]:
df2.printSchema()

root
 |-- VIOLATION CODE: integer (nullable = true)
 |-- VIOLATION DESCRIPTION: string (nullable = true)
 |-- Manhattan_FineA: integer (nullable = true)
 |-- Others_FineA: integer (nullable = true)



In [62]:
df.count()

3532970

In [202]:
#Update the structure of the data
schema = StructType([
    StructField('Summons Number', LongType(), True),
    StructField('Plate ID', StringType(), True),
    StructField('Registration State', StringType(), True),
    StructField('Plate Type', StringType(), True),
    StructField('Issue Date', StringType(), True),
    StructField('Violation Code', IntegerType(), True),
    StructField('Vehicle Body Type', StringType(), True),
    StructField('Vehicle Make', StringType(), True),
    StructField('Issuing Agency', StringType(), True),
    StructField('Street Code1', IntegerType(), True),
    StructField('Street Code2', IntegerType(), True),
    StructField('Street Code3', IntegerType(), True),
    StructField('Vehicle Expiration Date', IntegerType(), True),
    StructField('Violation Location', StringType(), True),
    StructField('Violation Precinct', IntegerType(), True),
    StructField('Issuer Precinct', IntegerType(), True),
    StructField('Issuer Code', IntegerType(), True),
    StructField('Issuer Command', StringType(), True),
    StructField('Issuer Squad', StringType(), True),
    StructField('Violation Time', StringType(), True),
    StructField('Time First Observed', StringType(), True),
    StructField('Violation County', StringType(), True),
    StructField('Violation In Front Of Or Opposite', StringType(), True),
    StructField('House Number', StringType(), True),
    StructField('Street Name', StringType(), True),
    StructField('Intersecting Street', StringType(), True),
    StructField('Date First Observed', IntegerType(), True),
    StructField('Law Section', IntegerType(), True),
    StructField('Sub Division', StringType(), True),
    StructField('Violation Legal Code', StringType(), True),
    StructField('Days Parking In Effect    ', StringType(), True),
    StructField('From Hours In Effect', StringType(), True),
    StructField('To Hours In Effect', StringType(), True),
    StructField('Vehicle Color', StringType(), True),
    StructField('Unregistered Vehicle?:', StringType(), True),
    StructField('Vehicle Year', StringType(), True),
    StructField('Meter Number', StringType(), True),
    StructField('Feet From Curb', StringType(), True),
    StructField('Violation Post Code', StringType(), True)

])

In [237]:
#Update the structure of the data
schema2 = StructType([
    StructField('VIOLATION CODE', IntegerType(), True),
    StructField('VIOLATION DESCRIPTION', StringType(), True),
    StructField('Manhattan_FineA', IntegerType(), True),
    StructField('Others_FineA', IntegerType(), True)

])


In [203]:
#update the dataframe with the structure
df = spark.read.csv('/content/sample_data/nyc_parking_tickets/*.csv', header=True, schema=schema)


In [204]:
#Update the columns name
for col in df.columns:
    df = df.withColumnRenamed(col, col.lower().replace(' ', '_').replace('-', '_'))

In [201]:
for col in df2.columns:
    df2 = df2.withColumnRenamed(col, col.lower().replace(' ', '_').replace('-', '_'))

In [235]:
df.show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+--------------------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------+------------+--------------+-------------------+-------------------+----------------+------------------+
|summons_number|plate_id|registration_state|plate_type|violation_code|vehicle_body_type|vehicle_make|issuing_agency|street_code1|street_code2|street_code3|vehicle_expiration_date|violation_location|violation_precinct|issuer_precinct|issuer_code|issuer_command|issuer_squad|violation_time|time_first_

In [236]:
df2.show()

+--------------+---------------------+---------------+------------+
|violation_code|violation_description|manhattan_finea|others_finea|
+--------------+---------------------+---------------+------------+
|             1| FAILURE TO DISPLA...|            515|         515|
|             2| NO OPERATOR NAM/A...|            515|         515|
|             3| UNAUTHORIZED PASS...|            515|         515|
|             4| BUS PARKING IN LO...|            115|         115|
|             5|   BUS LANE VIOLATION|             50|          50|
|             6| OVERNIGHT TRACTOR...|            265|         265|
|             7| FAILURE TO STOP A...|             50|          50|
|             8|               IDLING|            115|         115|
|             9| OBSTRUCTING TRAFF...|            115|         115|
|            10| NO STOPPING-DAY/T...|            115|         115|
|            11| NO STANDING-HOTEL...|            115|         115|
|            12| MOBILE BUS LANE V...|          

In [207]:
#Review the schema of the data
df.printSchema()

root
 |-- summons_number: long (nullable = true)
 |-- plate_id: string (nullable = true)
 |-- registration_state: string (nullable = true)
 |-- plate_type: string (nullable = true)
 |-- issue_date: string (nullable = true)
 |-- violation_code: integer (nullable = true)
 |-- vehicle_body_type: string (nullable = true)
 |-- vehicle_make: string (nullable = true)
 |-- issuing_agency: string (nullable = true)
 |-- street_code1: integer (nullable = true)
 |-- street_code2: integer (nullable = true)
 |-- street_code3: integer (nullable = true)
 |-- vehicle_expiration_date: integer (nullable = true)
 |-- violation_location: string (nullable = true)
 |-- violation_precinct: integer (nullable = true)
 |-- issuer_precinct: integer (nullable = true)
 |-- issuer_code: integer (nullable = true)
 |-- issuer_command: string (nullable = true)
 |-- issuer_squad: string (nullable = true)
 |-- violation_time: string (nullable = true)
 |-- time_first_observed: string (nullable = true)
 |-- violation_count

In [89]:
df.describe().show()

+-------+--------------------+--------+------------------+----------+----------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+-----------------------+------------------+------------------+------------------+------------------+------------------+--------------------+-----------------+-------------------+----------------+---------------------------------+--------------------+-----------------+-------------------+-------------------+-----------------+------------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------------+------------------+-------------------+-------------------+
|summary|      summons_number|plate_id|registration_state|plate_type|issue_date|    violation_code|  vehicle_body_type|      vehicle_make|    issuing_agency|      street_code1|      street_code2|      street_code3|vehicle_expiration_date

In [90]:
df = df.dropDuplicates()

No duplicate values found

In [91]:
df.count()

3532970

In [208]:
df = df.withColumn('issue_date2', F.to_timestamp('issue_date', 'MM/dd/yyyy')).drop('issue_date')

In [209]:
df = df.withColumnRenamed('issue_date2', 'issue_date')

In [113]:
df.show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+--------------------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------+------------+--------------+-------------------+-------------------+
|summons_number|plate_id|registration_state|plate_type|violation_code|vehicle_body_type|vehicle_make|issuing_agency|street_code1|street_code2|street_code3|vehicle_expiration_date|violation_location|violation_precinct|issuer_precinct|issuer_code|issuer_command|issuer_squad|violation_time|time_first_observed|violation_county|violation_

## **Filters**

In [210]:
df.filter(df.violation_code.isNull()).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------+------------+--------------+-------------------+----------+
|summons_number|plate_id|registration_state|plate_type|violation_code|vehicle_body_type|vehicle_make|issuing_agency|street_code1|street_code2|street_code3|vehicle_expiration_date|violation_location|violation_precinct|issuer_precinct|issuer_code|issuer_command|issuer_squad|violation_time|time_first_observed|violation_county|violation_in_front_of_or_opp

In [104]:
df.filter(df.plate_id.isNull()).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------+------------+--------------+-------------------+-----------+
|summons_number|plate_id|registration_state|plate_type|violation_code|vehicle_body_type|vehicle_make|issuing_agency|street_code1|street_code2|street_code3|vehicle_expiration_date|violation_location|violation_precinct|issuer_precinct|issuer_code|issuer_command|issuer_squad|violation_time|time_first_observed|violation_county|violation_in_front_of_or_op

We have the Violation Code for all the tickets as well as the plate ID

In [105]:
df.filter(df.vehicle_year >= 2025).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+--------------------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------+------------+--------------+-------------------+-------------------+
|summons_number|plate_id|registration_state|plate_type|violation_code|vehicle_body_type|vehicle_make|issuing_agency|street_code1|street_code2|street_code3|vehicle_expiration_date|violation_location|violation_precinct|issuer_precinct|issuer_code|issuer_command|issuer_squad|violation_time|time_first_observed|violation_county|violation_

In [106]:
df.filter((df.vehicle_make == 'DODGE') & ((df.vehicle_color == 'White' ) |(df.vehicle_color=='WH'))).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+--------------------+--------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------+------------+--------------+-------------------+-------------------+
|summons_number|plate_id|registration_state|plate_type|violation_code|vehicle_body_type|vehicle_make|issuing_agency|street_code1|street_code2|street_code3|vehicle_expiration_date|violation_location|violation_precinct|issuer_precinct|issuer_code|issuer_command|issuer_squad|violation_time|time_first_observed|violation_county|violation

In [107]:
df.filter((df.registration_state != 'NY') & (df.registration_state!= 'NJ')).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+--------------------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------+------------+--------------+-------------------+-------------------+
|summons_number|plate_id|registration_state|plate_type|violation_code|vehicle_body_type|vehicle_make|issuing_agency|street_code1|street_code2|street_code3|vehicle_expiration_date|violation_location|violation_precinct|issuer_precinct|issuer_code|issuer_command|issuer_squad|violation_time|time_first_observed|violation_county|violation_

In [108]:
df.filter((df.vehicle_body_type == 'SPOR')& (df.violation_code != 5) & (df.violation_code != 36) ).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+--------------------+--------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------+------------+--------------+-------------------+-------------------+
|summons_number|plate_id|registration_state|plate_type|violation_code|vehicle_body_type|vehicle_make|issuing_agency|street_code1|street_code2|street_code3|vehicle_expiration_date|violation_location|violation_precinct|issuer_precinct|issuer_code|issuer_command|issuer_squad|violation_time|time_first_observed|violation_county|violation

## **New Columns**

In [211]:
#New column to see Month in the date
df = df.withColumn('Month_Issue_Date', F.month('issue_date'))

In [212]:
# Extract - F.regexp_extract(str, pattern, idx)
df = df.withColumn('violation_day_time', F.regexp_extract('violation_time','(.)(.)(.)(.)(.)' ,5))



In [213]:
# Regex Replace - F.regexp_replace(str, pattern, replacement)[source]
df = df.withColumn('violation_day_time', F.regexp_replace('violation_day_time', 'A', 'Morning'))
df = df.withColumn('violation_day_time', F.regexp_replace('violation_day_time', 'P', 'Afternoon'))

In [214]:
df.show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+--------------------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------+------------+--------------+-------------------+-------------------+----------------+------------------+
|summons_number|plate_id|registration_state|plate_type|violation_code|vehicle_body_type|vehicle_make|issuing_agency|street_code1|street_code2|street_code3|vehicle_expiration_date|violation_location|violation_precinct|issuer_precinct|issuer_code|issuer_command|issuer_squad|violation_time|time_first_

## **Queries**

In [238]:
df_data = df
df2_data =df2

In [239]:
df_data.createOrReplaceTempView('tickets')
df2_data.createOrReplaceTempView('codes')

**Top 10 Most Frequent Violation Areas**

In [248]:
df.filter(df.street_code1.isNull()).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------+------------+--------------+-------------------+----------+----------------+------------------+
|summons_number|plate_id|registration_state|plate_type|violation_code|vehicle_body_type|vehicle_make|issuing_agency|street_code1|street_code2|street_code3|vehicle_expiration_date|violation_location|violation_precinct|issuer_precinct|issuer_code|issuer_command|issuer_squad|violation_time|time_first_observed|violation

In [249]:
f_sc1_df = df.filter(df.street_code1 != 0)
f_sc1_df.count()

1895416

In [250]:
f_sc1_df.createOrReplaceTempView("f_sc1_tickets")

In [251]:
top10_df = spark.sql("""
    SELECT `violation_county`, `street_code1`, COUNT(*) AS ticket_count
    FROM f_sc1_tickets
    GROUP BY `violation_county`, `street_code1`
    ORDER BY ticket_count DESC
    LIMIT 10
    """).show()

+----------------+------------+------------+
|violation_county|street_code1|ticket_count|
+----------------+------------+------------+
|              NY|       13610|       33119|
|              NY|       10210|       21732|
|              NY|       25390|       18271|
|              NY|       24890|       15376|
|              NY|       10110|       14880|
|              NY|       10010|       14050|
|              NY|       10410|       13140|
|              NY|       11710|       11266|
|               K|       28830|       10781|
|              NY|       10510|       10276|
+----------------+------------+------------+



| Code | Administrative District


---


| NY |	Manhattan

| K |	Brooklyn

In [252]:
top10_df = spark.sql("""
    SELECT
        CASE
            WHEN `violation_county` IN ('K') THEN 'Brooklyn'
            WHEN `violation_county` IN ('NY') THEN 'Manhattan'
            ELSE `violation_county`
        END AS `violation_county`,
        `street_code1`,
        COUNT(*) AS ticket_count
    FROM f_sc1_tickets
    GROUP BY `violation_county`, `street_code1`
    ORDER BY ticket_count DESC
    LIMIT 10
""")

top10_df.show()

+----------------+------------+------------+
|violation_county|street_code1|ticket_count|
+----------------+------------+------------+
|       Manhattan|       13610|       33119|
|       Manhattan|       10210|       21732|
|       Manhattan|       25390|       18271|
|       Manhattan|       24890|       15376|
|       Manhattan|       10110|       14880|
|       Manhattan|       10010|       14050|
|       Manhattan|       10410|       13140|
|       Manhattan|       11710|       11266|
|        Brooklyn|       28830|       10781|
|       Manhattan|       10510|       10276|
+----------------+------------+------------+



**Top 5 For Each Most Frequent Violation Areas.**

In [253]:
top5_df = spark.sql("""
WITH renamed_tickets AS (
  SELECT
    CASE
      WHEN `violation_county` IN ('K', 'BK') THEN 'Brooklyn'
      WHEN `violation_county` = 'NY' THEN 'Manhattan'
      ELSE 'Other'
    END AS borough,
    `street_code1`,
    COUNT(*) AS ticket_count
  FROM f_sc1_tickets
  WHERE `violation_county` IN ('K', 'BK', 'NY')
  GROUP BY borough, `street_code1`
),
ranked_per_borough AS (
  SELECT *,
         ROW_NUMBER() OVER(PARTITION BY borough ORDER BY ticket_count DESC) AS rank
  FROM renamed_tickets
)
SELECT *
FROM ranked_per_borough
WHERE rank <= 5
ORDER BY ticket_count DESC, borough
    """).show()

+---------+------------+------------+----+
|  borough|street_code1|ticket_count|rank|
+---------+------------+------------+----+
|Manhattan|       13610|       33119|   1|
|Manhattan|       10210|       21732|   2|
|Manhattan|       25390|       18271|   3|
|Manhattan|       24890|       15376|   4|
|Manhattan|       10110|       14880|   5|
| Brooklyn|       28830|       10781|   1|
| Brooklyn|       67730|        5859|   2|
| Brooklyn|       13430|        5320|   3|
| Brooklyn|        5580|        5306|   4|
| Brooklyn|       54050|        4866|   5|
+---------+------------+------------+----+



| Code | Administrative District


---


| NY |	Manhattan

| K |	Brooklyn

| BK |	Brooklyn（Old Writing Style）

**What is the Top3 common issuing agency?**

In [254]:
df.filter(df.issuing_agency.isNull()).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------+------------+--------------+-------------------+----------+----------------+------------------+
|summons_number|plate_id|registration_state|plate_type|violation_code|vehicle_body_type|vehicle_make|issuing_agency|street_code1|street_code2|street_code3|vehicle_expiration_date|violation_location|violation_precinct|issuer_precinct|issuer_code|issuer_command|issuer_squad|violation_time|time_first_observed|violation

In [255]:
df.createOrReplaceTempView("f_ia_tickets")

In [None]:
top3_df = spark.sql("""
WITH agency_counts AS (
  SELECT
    `issuing_agency`,
    COUNT(*) AS ticket_count
  FROM f_ia_tickets
  GROUP BY `issuing_agency`
),
ranked_agencies AS (
  SELECT *,
         RANK() OVER (ORDER BY ticket_count DESC) AS rank
  FROM agency_counts
)
SELECT *
FROM ranked_agencies
WHERE rank <= 3
ORDER BY ticket_count DESC
    """).show()

T-TRAFFIC

V-DEPARTMENT OF TRANSPORTATION

M-TRANSIT AUTHORITY

**What is the most common violation code?**

In [256]:
df.filter(df.violation_code.isNull()).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------+------------+--------------+-------------------+----------+----------------+------------------+
|summons_number|plate_id|registration_state|plate_type|violation_code|vehicle_body_type|vehicle_make|issuing_agency|street_code1|street_code2|street_code3|vehicle_expiration_date|violation_location|violation_precinct|issuer_precinct|issuer_code|issuer_command|issuer_squad|violation_time|time_first_observed|violation

In [258]:
df2.filter(df2.violation_code.isNull()).show()

+--------------+---------------------+---------------+------------+
|violation_code|violation_description|manhattan_finea|others_finea|
+--------------+---------------------+---------------+------------+
+--------------+---------------------+---------------+------------+



In [259]:
top10_vc_df = spark.sql("""
    SELECT
      t.`violation_code`,
      COUNT(*) AS ticket_count,
      vc.`violation_description`
    FROM tickets AS t
    JOIN codes AS vc
    ON t.`violation_code` = vc.`violation_code`
    GROUP BY t.`violation_code`, vc.`violation_description`
    ORDER BY ticket_count DESC
    LIMIT 10
    """).show(truncate=False)

+--------------+------------+------------------------------+
|violation_code|ticket_count|violation_description         |
+--------------+------------+------------------------------+
|36            |938024      |PHTO SCHOOL ZN SPEED VIOLATION|
|21            |398682      |NO PARKING-STREET CLEANING    |
|38            |272844      |FAIL TO DSPLY MUNI METER RECPT|
|14            |191770      |NO STANDING-DAY/TIME LIMITS   |
|43            |162783      |EXPIRED METER-COMM METER ZONE |
|5             |152779      |BUS LANE VIOLATION            |
|40            |148959      |FIRE HYDRANT                  |
|15            |138523      |NO STANDING-OFF-STREET LOT    |
|20            |129074      |NO PARKING-DAY/TIME LIMITS    |
|7             |124776      |FAILURE TO STOP AT RED LIGHT  |
+--------------+------------+------------------------------+



**Which vehicle years have the most violations?**

In [260]:
df.filter(df.vehicle_year.isNull()).show()

+--------------+--------+------------------+----------+--------------+-----------------+------------+--------------+------------+------------+------------+-----------------------+------------------+------------------+---------------+-----------+--------------+------------+--------------+-------------------+----------------+---------------------------------+------------+-----------+-------------------+-------------------+-----------+------------+--------------------+--------------------------+--------------------+------------------+-------------+----------------------+------------+------------+--------------+-------------------+----------+----------------+------------------+
|summons_number|plate_id|registration_state|plate_type|violation_code|vehicle_body_type|vehicle_make|issuing_agency|street_code1|street_code2|street_code3|vehicle_expiration_date|violation_location|violation_precinct|issuer_precinct|issuer_code|issuer_command|issuer_squad|violation_time|time_first_observed|violation

In [261]:
f_vy_df = df.filter(df.vehicle_year != 0)
f_vy_df.count()

2916930

In [262]:
f_vy_df.createOrReplaceTempView("f_vy_tickets")

In [263]:
top10_vy_df = spark.sql("""
    SELECT
      `vehicle_year`,
      COUNT(*) AS violation_count
    FROM f_vy_tickets
    WHERE `vehicle_year` IS NOT NULL
    GROUP BY `vehicle_year`
    ORDER BY violation_count DESC
    LIMIT 10
""").show()

+------------+---------------+
|vehicle_year|violation_count|
+------------+---------------+
|        2024|         316546|
|        2023|         291611|
|        2022|         211793|
|        2019|         209685|
|        2021|         196030|
|        2020|         189677|
|        2018|         171224|
|        2017|         163106|
|        2016|         141310|
|        2015|         133599|
+------------+---------------+



**What month has the most violations?**

In [217]:
spark.sql("""
          SELECT month_issue_date, COUNT(plate_id) as Number_Tickets
          FROM tickets
          GROUP BY month_issue_date
          ORDER BY Number_Tickets DESC
          """).show()

+----------------+--------------+
|month_issue_date|Number_Tickets|
+----------------+--------------+
|               3|       1183173|
|               2|       1176807|
|               1|       1172934|
|               4|            56|
+----------------+--------------+



**What kind of car has the most violations? **

In [122]:
spark.sql("""
          SELECT vehicle_body_type, COUNT(plate_id) as Number_Tickets
          FROM tickets
          GROUP BY vehicle_body_type
          ORDER BY Number_Tickets DESC
          """).show()

+-----------------+--------------+
|vehicle_body_type|Number_Tickets|
+-----------------+--------------+
|             SUBN|       1603966|
|             4DSD|        795446|
|              VAN|        306965|
|             PICK|        115865|
|             DELV|        104736|
|             SPOR|         90679|
|             SEDA|         58295|
|             2DSD|         49717|
|              SDN|         49068|
|               SW|         39216|
|               UT|         36330|
|               4D|         28789|
|             REFG|         25055|
|             TRAC|         17819|
|             TAXI|         16617|
|               SD|         13311|
|             CONV|         12217|
|               4S|         11754|
|             UTIL|         11524|
|             TRLR|          9720|
+-----------------+--------------+
only showing top 20 rows



**When is the most common violation time?**

In [133]:
df_data = df.filter(df.violation_time.isNotNull())
df_data.createOrReplaceTempView('tickets')


In [134]:
spark.sql("""
          SELECT violation_time, COUNT(plate_id) as Number_Tickets
          FROM tickets
          GROUP BY violation_time
          ORDER BY Number_Tickets DESC
          """).show()

+--------------+--------------+
|violation_time|Number_Tickets|
+--------------+--------------+
|         1142A|          6693|
|         1140A|          6652|
|         0840A|          6578|
|         1141A|          6531|
|         0836A|          6492|
|         0906A|          6486|
|         1143A|          6481|
|         0908A|          6473|
|         0838A|          6471|
|         1145A|          6467|
|         1144A|          6467|
|         1146A|          6462|
|         1139A|          6462|
|         0839A|          6455|
|         1138A|          6434|
|         0910A|          6401|
|         0843A|          6374|
|         0842A|          6368|
|         1148A|          6355|
|         1200P|          6320|
+--------------+--------------+
only showing top 20 rows



In [218]:
df_data = df.filter(df.violation_day_time.isNotNull())
df_data.createOrReplaceTempView('tickets')

In [242]:
spark.sql("""
          SELECT violation_day_time, COUNT(plate_id) as Number_Tickets
          FROM tickets
          WHERE violation_day_time ='Morning' OR violation_day_time ='Afternoon'
          GROUP BY violation_day_time
          ORDER BY Number_Tickets DESC
          """).show()

+------------------+--------------+
|violation_day_time|Number_Tickets|
+------------------+--------------+
|           Morning|       1768544|
|         Afternoon|       1764385|
+------------------+--------------+



**How much do we collect in tickets per violation code? **

In [247]:
spark.sql("""
          SELECT r.violation_code , c.manhattan_finea, c.others_finea, Number_Tickets_M,Number_Tickets_M*c.manhattan_finea as Total_M, Number_Tickets_O, Number_Tickets_O * c.others_finea as Total_O, Total_o+Total_M as Total
          FROM(
          SELECT violation_code, COUNT(CASE when registration_state='NY'THEN 1 END) as Number_Tickets_M, COUNT(CASE when registration_state!='NY' THEN 1 END) as Number_Tickets_O
          FROM tickets
          GROUP BY violation_code) r
          INNER JOIN codes c
          ON r.violation_code =c.violation_code
          ORDER BY Total DESC
          """).show()

+--------------+---------------+------------+----------------+--------+----------------+--------+--------+
|violation_code|manhattan_finea|others_finea|Number_Tickets_M| Total_M|Number_Tickets_O| Total_O|   Total|
+--------------+---------------+------------+----------------+--------+----------------+--------+--------+
|            36|             50|          50|          685722|34286100|          252302|12615100|46901200|
|            21|             65|          45|          271748|17663620|          126934| 5712030|23375650|
|            14|            115|         115|          126458|14542670|           65312| 7510880|22053550|
|            40|            115|         115|           93843|10791945|           55116| 6338340|17130285|
|            15|            115|         115|           99602|11454230|           38921| 4475915|15930145|
|            38|             65|          35|          193145|12554425|           79699| 2789465|15343890|
|            43|             65|     