In [2]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [3]:
# !pip install pyspark
!pip install pandas findspark --user



In [4]:
import findspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, expr, when, udf, struct, lit

In [5]:
findspark.init("c:\spark")

In [6]:
# spark = SparkSession \
#     .builder \
#     .appName("Violation Rate") \
#     .getOrCreate()

spark = SparkSession.builder.config("spark.driver.memory", "4g").appName("Violation Rate").getOrCreate()

In [7]:
spark.version

'3.5.0'

In [8]:
selected_columns = [
    'Street Code1',
    'Street Code2',
    'Street Code3',
    'Street Name',
    'Violation Post Code',
    'Intersecting Street',
    'Violation Precinct',
    'Violation County',
    'Vehicle Body Type',
    'Issue Date',
    'Violation Time',
    'Violation Code',
]

parking_violation = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load('Parking_Violations_2022.csv') \
    .select(*[col(col_name) for col_name in selected_columns]) \
    .limit(5000)

In [9]:
parking_violation.show(5)

+------------+------------+------------+------------------+-------------------+-------------------+------------------+----------------+-----------------+----------+--------------+--------------+
|Street Code1|Street Code2|Street Code3|       Street Name|Violation Post Code|Intersecting Street|Violation Precinct|Violation County|Vehicle Body Type|Issue Date|Violation Time|Violation Code|
+------------+------------+------------+------------------+-------------------+-------------------+------------------+----------------+-----------------+----------+--------------+--------------+
|       63430|       69230|       13490|NORTH PORTLAND AVE|               NULL|               NULL|                88|               K|              VAN|06/25/2021|         0130A|            40|
|       13490|       40404|       40404|      AUBURN PLACE|               NULL|               NULL|                88|               K|             SUBN|06/25/2021|         0225A|            20|
|       79430|       4713

In [10]:
parking_violation.filter(col('Violation Precinct').isNull()).count()

0

In [11]:
parking_violation.head()

Row(Street Code1=63430, Street Code2=69230, Street Code3=13490, Street Name='NORTH PORTLAND AVE', Violation Post Code=None, Intersecting Street=None, Violation Precinct=88, Violation County='K', Vehicle Body Type='VAN', Issue Date='06/25/2021', Violation Time='0130A', Violation Code=40)

In [12]:
parking_violation.describe().show()

+-------+------------------+-----------------+------------------+------------------+-------------------+-------------------+------------------+----------------+-----------------+----------+------------------+------------------+
|summary|      Street Code1|     Street Code2|      Street Code3|       Street Name|Violation Post Code|Intersecting Street|Violation Precinct|Violation County|Vehicle Body Type|Issue Date|    Violation Time|    Violation Code|
+-------+------------------+-----------------+------------------+------------------+-------------------+-------------------+------------------+----------------+-----------------+----------+------------------+------------------+
|  count|              5000|             5000|              5000|              4990|                  0|                773|              5000|            4289|             4724|      5000|              4997|              5000|
|   mean|        32016.9234|       32162.7564|        30802.9034|             143.0|    

In [13]:
# datatype of columns
parking_violation.printSchema()

root
 |-- Street Code1: integer (nullable = true)
 |-- Street Code2: integer (nullable = true)
 |-- Street Code3: integer (nullable = true)
 |-- Street Name: string (nullable = true)
 |-- Violation Post Code: string (nullable = true)
 |-- Intersecting Street: string (nullable = true)
 |-- Violation Precinct: integer (nullable = true)
 |-- Violation County: string (nullable = true)
 |-- Vehicle Body Type: string (nullable = true)
 |-- Issue Date: string (nullable = true)
 |-- Violation Time: string (nullable = true)
 |-- Violation Code: integer (nullable = true)



In [14]:
parking_violation.count()

5000

In [15]:
len(parking_violation.columns)

12

In [16]:
# Drop duplicates
parking_violation = parking_violation.dropDuplicates()
parking_violation.count()

4977

In [17]:
parking_violation = parking_violation.toDF(*(c.replace(' ', '_') for c in parking_violation.columns))
parking_violation.show(5)

+------------+------------+------------+------------------+-------------------+-------------------+------------------+----------------+-----------------+----------+--------------+--------------+
|Street_Code1|Street_Code2|Street_Code3|       Street_Name|Violation_Post_Code|Intersecting_Street|Violation_Precinct|Violation_County|Vehicle_Body_Type|Issue_Date|Violation_Time|Violation_Code|
+------------+------------+------------+------------------+-------------------+-------------------+------------------+----------------+-----------------+----------+--------------+--------------+
|       63430|       69230|       13490|NORTH PORTLAND AVE|               NULL|               NULL|                88|               K|              VAN|06/25/2021|         0130A|            40|
|       13490|       40404|       40404|      AUBURN PLACE|               NULL|               NULL|                88|               K|             SUBN|06/25/2021|         0225A|            20|
|       79430|       4713

In [18]:
# Assuming 'parking_violation' is your PySpark DataFrame
parking_violation = parking_violation.withColumn('Violation_Date', to_date(col('Issue_Date')))
parking_violation = parking_violation.withColumn('Violation_Date_Month', col('Violation_Date').cast('string').substr(6, 2).cast('int'))
parking_violation = parking_violation.withColumn('Violation_Date_Day', col('Violation_Date').cast('string').substr(9, 2).cast('int'))
parking_violation = parking_violation.withColumn('Violation_Date_Year', col('Violation_Date').cast('string').substr(1, 4).cast('int'))

In [19]:
parking_violation = parking_violation.drop('Issue_Date')

In [20]:
parking_violation.show(10)

+------------+------------+------------+------------------+-------------------+-------------------+------------------+----------------+-----------------+--------------+--------------+--------------+--------------------+------------------+-------------------+
|Street_Code1|Street_Code2|Street_Code3|       Street_Name|Violation_Post_Code|Intersecting_Street|Violation_Precinct|Violation_County|Vehicle_Body_Type|Violation_Time|Violation_Code|Violation_Date|Violation_Date_Month|Violation_Date_Day|Violation_Date_Year|
+------------+------------+------------+------------------+-------------------+-------------------+------------------+----------------+-----------------+--------------+--------------+--------------+--------------------+------------------+-------------------+
|       63430|       69230|       13490|NORTH PORTLAND AVE|               NULL|               NULL|                88|               K|              VAN|         0130A|            40|          NULL|                NULL|    

In [21]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def convert_to_24hr_format_udf(time_str):
    try:
        if '.' in time_str:
            return None
        if time_str[-1] == 'P':
            return '{:02d}:{:02d}'.format((int(time_str[:2]) + 12) % 24, int(time_str[2:4]))
        elif time_str[-1] == 'A':
            return '{:02d}:{:02d}'.format(int(time_str[:2]) % 12, int(time_str[2:4]))
        else:
            return None
    except (ValueError, TypeError):
        return None


convert_to_24hr_format_udf_spark = udf(convert_to_24hr_format_udf, StringType())

parking_violation = parking_violation.withColumn('Violation_Time', convert_to_24hr_format_udf_spark(col('Violation_Time')))

parking_violation = parking_violation.dropna(subset=['Violation_Time'])

In [22]:
parking_violation.show(5)

+------------+------------+------------+-------------+-------------------+-------------------+------------------+----------------+-----------------+--------------+--------------+--------------+--------------------+------------------+-------------------+
|Street_Code1|Street_Code2|Street_Code3|  Street_Name|Violation_Post_Code|Intersecting_Street|Violation_Precinct|Violation_County|Vehicle_Body_Type|Violation_Time|Violation_Code|Violation_Date|Violation_Date_Month|Violation_Date_Day|Violation_Date_Year|
+------------+------------+------------+-------------+-------------------+-------------------+------------------+----------------+-----------------+--------------+--------------+--------------+--------------------+------------------+-------------------+
|       39202|           0|           0|MIDLAND BEACH|               NULL|              LOT 8|               122|               R|              VAN|         14:54|            27|          NULL|                NULL|              NULL|     

In [23]:
parking_violation.select('Violation_Time').show(5)

+--------------+
|Violation_Time|
+--------------+
|         14:54|
|         10:50|
|         00:36|
|         08:17|
|         08:21|
+--------------+
only showing top 5 rows



In [24]:
parking_violation = parking_violation.orderBy(col('Violation_Date'), col('Violation_Time'))

In [25]:
parking_violation = parking_violation.withColumn(
    'Violation_Hour',
    (expr("hour(`Violation_Time`) % 24")).cast('int')
)

In [26]:
parking_violation.select('Violation_Hour').show(5)

+--------------+
|Violation_Hour|
+--------------+
|             0|
|             0|
|             0|
|             0|
|             0|
+--------------+
only showing top 5 rows



In [27]:
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.types import TimestampType

parking_violation = parking_violation.withColumn(
    'Violation_Time',
    unix_timestamp('Violation_Time', 'HH:mm:ss').cast(TimestampType())
)

In [28]:
print(parking_violation.dtypes)

[('Street_Code1', 'int'), ('Street_Code2', 'int'), ('Street_Code3', 'int'), ('Street_Name', 'string'), ('Violation_Post_Code', 'string'), ('Intersecting_Street', 'string'), ('Violation_Precinct', 'int'), ('Violation_County', 'string'), ('Vehicle_Body_Type', 'string'), ('Violation_Time', 'timestamp'), ('Violation_Code', 'int'), ('Violation_Date', 'date'), ('Violation_Date_Month', 'int'), ('Violation_Date_Day', 'int'), ('Violation_Date_Year', 'int'), ('Violation_Hour', 'int')]


In [29]:
from datetime import datetime, time

def calculate_location_violation_rate(parking_violation, location, time_range, precinct, violation_date):
    start_time = datetime.strptime(time_range[0], "%H:%M").time()
    end_time = datetime.strptime(time_range[1], "%H:%M").time()

    violation_date = datetime.strptime(violation_date, "%Y-%m-%d").date()

    filtered_data = parking_violation.filter(
        (col('Street_Name') == location) &
        (col('Violation_Date') == violation_date) &
        (col('Violation_Time').isNotNull()) &
        (col('Violation_Time').between(lit(datetime.combine(violation_date, start_time)), 
                                       lit(datetime.combine(violation_date, end_time))))
    )

    print("filtered data:", filtered_data.count())

    total_location_violations = parking_violation.filter(
        (col('Violation_Time').between(lit(datetime.combine(violation_date, start_time)),
                                        lit(datetime.combine(violation_date, end_time)))) &
        (col('Violation_Precinct') == precinct) &
        (col('Violation_Date') == violation_date)
    ).count()

    print("total_location_violations:", total_location_violations)

    if total_location_violations == 0:
        return 0

    location_violations = filtered_data.count()

    location_violation_rate = (location_violations / total_location_violations) * 100

    return location_violation_rate

location = 'GRANDVIEW AVE'
location_precinct = 121
time_range = ('00:00', '23:00')
violation_date = '2021-07-25'

# Assuming 'parking_violation' is your PySpark DataFrame
rate = calculate_location_violation_rate(parking_violation, location, time_range, location_precinct, violation_date)
print(f'Location Violation Rate for {location} between {time_range[0]} and {time_range[1]}: {rate:.2f}%')

filtered data: 0
total_location_violations: 0
Location Violation Rate for GRANDVIEW AVE between 00:00 and 23:00: 0.00%


In [53]:
from pyspark.sql.functions import col, when, lit
from datetime import datetime

def calculate_violation_rate(hour, violation_date, precinct, street_name):

    total_location_violations = parking_violation.filter(
        (col('Violation_Hour').between(hour,hour+1)) &
        (col('Violation_Precinct') == precinct) &
        (col('Violation_Date') == violation_date)
    ).count()

    location_violations = parking_violation.filter(
        (col('Street_Name') == street_name) &
        (col('Violation_Hour').between(hour,hour+1)) &
        (col('Violation_Date') == violation_date)
    ).count()

    print("total_location_violations:", total_location_violations)
    if total_location_violations == 0:
            return 0

    return when(total_location_violations != 0,(location_violations / total_location_violations) * 100).otherwise(0)

if 'Violation_Rate' in parking_violation.columns:
    parking_violation = parking_violation.drop('Violation_Rate')

parking_violation = parking_violation.withColumn(
    'Violation_Rate',
    calculate_violation_rate(
        col('Violation_Hour'),
        col('Violation_Date'),
        col('Violation_Precinct'),
        col('Street_Name')
    )
)

total_location_violations: 0


PySparkTypeError: [NOT_COLUMN] Argument `col` should be a Column, got int.