In [None]:
pip install "jupyterlab>=3" "ipywidgets>=7.6"
pip install -U kaleido
pip install py4j
pip install plotly

In [6]:

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from config import *
from pyspark.sql.functions import to_date
import plotly.express as px
import plotly.io as pio
from plotly.subplots import make_subplots
import plotly.graph_objects as go
from py4j.protocol import Py4JJavaError
from pyspark.sql.functions import from_unixtime,unix_timestamp


In [7]:
sparkSession =(SparkSession
                .builder
                .master("local")
                .appName('event-violation')
                .enableHiveSupport()
                .config("spark.sql.warehouse.dir", warehouse_dir)
                .getOrCreate()
              ) 

In [3]:
event_listing=sparkSession.read.csv(base_path+event_listing_path,header=True)
event_locations=sparkSession.read.csv(base_path+event_locations_path,header=True)
event_listing_columns=['event_id', 'date']
event_locations_columns=['event_id', 'lat', 'long', 'borough', 'accessible']
event_table=event_locations[event_locations_columns].join(event_listing[event_listing_columns],how="inner",on="event_id") \
    .na.fill({"accessible":4}) \
    .replace({"X":"BX","M":"NY","B":"K"},subset=["borough"]) 
event_table=event_table.withColumn("accessible",event_table.accessible.cast("int")) \
    .withColumn("long",event_table.long.cast("double")) \
    .withColumn("lat",event_table.lat.cast("double")) \
    .withColumn("date",to_date(event_table.date,"MM/dd/yyyy")) \
    .na.drop(subset=["date","borough"]) \
    .orderBy("date")
event_table.write.saveAsTable("event_table")
last_date=event_table.orderBy(event_table.date.desc()).collect()[0]
first_date=event_table.orderBy(event_table.date.asc()).collect()[0]

In [4]:
parking_violations_columns_replaced=['Issue_Date','Violation_County','PLATE_TYPE','VIOLATION_CODE','Violation_Time','Summons_Number']
parking_violations_columns=['Issue Date','Violation County','PLATE TYPE','Violation Code','Violation Time','Summons Number']
test_result={}
for year in range(2014,2024):
    try:
        parking_violations=sparkSession.read.csv(base_path+parking_violations_path+str(year)+".csv",header=True)

        parking_violations=parking_violations[parking_violations_columns]
        for i in range(len(parking_violations_columns)):
                parking_violations=parking_violations.withColumnRenamed(parking_violations_columns[i],parking_violations_columns_replaced[i])
        parking_violations=parking_violations.withColumn("Issue_Date",to_date(parking_violations.Issue_Date,"MM/dd/yyyy"))
        parking_violations=parking_violations.withColumn("Summons_Number",parking_violations.Summons_Number.cast("int"))
        parking_violations=parking_violations.filter((parking_violations.Issue_Date>=first_date.date)&(parking_violations.Issue_Date<last_date.date) \
                                                    &(parking_violations.Violation_Time.isNotNull())&(parking_violations.PLATE_TYPE.isin(registration_class_code)) \
                                                     &(parking_violations.VIOLATION_CODE.isin(violation_code))) \
            .na.drop(subset=["Issue_Date","Violation_County"]) \
            .orderBy("Issue_Date")
        test_result[year]=parking_violations.count()
        parking_violations=parking_violations[['Summons_Number','Violation_County','Issue_Date']]        
        if year==2014:
            parking_violations.write.saveAsTable("parking_violations")
        parking_violations.write.mode('append').saveAsTable("parking_violations")
    except Py4JJavaError as e:
        parking_violations=sparkSession.read.csv(base_path+parking_violations_path+str(year)+".csv",header=True)
        parking_violations=parking_violations[parking_violations_columns]
        for i in range(len(parking_violations_columns)):
                parking_violations=parking_violations.withColumnRenamed(parking_violations_columns[i],parking_violations_columns_replaced[i])
        parking_violations=parking_violations.withColumn("Issue_Date",from_unixtime(unix_timestamp(parking_violations.Issue_Date,"MM/dd/yyyy hh:mm:ss a"),'yyyy-MM-dd').cast('date'))
        parking_violations=parking_violations.withColumn("Summons_Number",parking_violations.Summons_Number.cast("int"))
        parking_violations=parking_violations.filter((parking_violations.Issue_Date>=first_date.date)&(parking_violations.Issue_Date<last_date.date) \
                                                    &(parking_violations.Violation_Time.isNotNull())&(parking_violations.PLATE_TYPE.isin(registration_class_code)) \
                                                     &(parking_violations.VIOLATION_CODE.isin(violation_code))) \
            .na.drop(subset=["Issue_Date","Violation_County"]) \
            .orderBy("Issue_Date")
        test_result[year]=parking_violations.count()
        parking_violations=parking_violations[['Summons_Number','Violation_County','Issue_Date']]
        parking_violations.write.mode('append').saveAsTable("parking_violations")

In [66]:
test_result

{2014: 8255672,
 2015: 9801578,
 2016: 8754229,
 2017: 10608974,
 2018: 11502154,
 2019: 11223207,
 2021: 166,
 2022: 43,
 2023: 6}

In [36]:
detailed_proportion={}
for year in range(2013,2019):
    start_date=year
    end_date=year+1
    expr=str(start_date)+"_"+str(end_date)
    count_query=f"""
    select count(1) as cnt from parking_violations as p

    left join event_table as e on p.Issue_Date=e.date 
        and p.Violation_County=e.borough
    where p.Violation_County IN ('K','NY','Q','BX','R') and (p.Issue_Date>'{start_date}') and (p.Issue_Date<'{end_date}')
    """
    query_join=f"""select count(1) as cnt from parking_violations as p

    left join event_table as e on p.Issue_Date=e.date 
        and p.Violation_County=e.borough
    where p.Violation_County IN ('K','NY','Q','BX','R') and (p.Issue_Date>'{start_date}') and (p.Issue_Date<'{end_date}') and (e.date is null) and (e.borough is null)
    """
    joined_table=sparkSession.sql(query_join)
    count_all=sparkSession.sql(count_query)
    no_event=joined_table.select('cnt').rdd.flatMap(lambda x:x).collect()[0]
    violation_occurred=count_all.select('cnt').rdd.flatMap(lambda x:x).collect()[0]
    proportion_percentage=(no_event/violation_occurred)*100
    detailed_proportion[expr]=[no_event,violation_occurred,round(proportion_percentage,2)]
    

In [37]:
detailed_proportion

{'2013_2014': [160945, 76417706, 0.21],
 '2014_2015': [487574, 116767811, 0.42],
 '2015_2016': [712108, 74266545, 0.96],
 '2016_2017': [451558, 67150022, 0.67],
 '2017_2018': [1118683, 72899610, 1.53],
 '2018_2019': [1174226, 89337914, 1.31]}

In [62]:
for year in range(2013,2019):
    start_date=year
    end_date=year+1
    expr=str(start_date)+"_"+str(end_date)
    query_join=f"""select p.Issue_Date,count(*) as violation_observation from parking_violations as p

    left join event_table as e on p.Issue_Date=e.date 
        and p.Violation_County=e.borough
    where p.Violation_County IN ('K','NY','Q','BX','R') and (p.Issue_Date>'{start_date}') and (p.Issue_Date<'{end_date}')
    group by p.Issue_Date
    order by violation_observation
    desc
    """
    query_join_2=f"""select date,count(1) as event_observation  from event_table 
    where (date>'{start_date}') and (date<'{end_date}') 
    group by date
    order by event_observation
    desc
    """
    joined_table=sparkSession.sql(query_join)
    joined_table_event=sparkSession.sql(query_join_2)
    vc=joined_table.toPandas()
    ec=joined_table_event.toPandas()

    fig=make_subplots(specs=[[{"secondary_y":True}]])
    fig.add_trace(go.Scatter(x=vc.Issue_Date,y=vc.violation_observation,name="Violation Count",mode='markers'),secondary_y= False)
    fig.add_trace(go.Scatter(x=ec.date,y=ec.event_observation,name="Event Count",mode='markers'),secondary_y=True)
    fig.update_layout(title_text="Violation Observation vs. Event Observation between "+str(start_date)+"-"+str(end_date))           
    fig.update_xaxes(title_text="Date")    
    fig.update_yaxes(title_text="Violation Observation Count",secondary_y=False)
    fig.update_yaxes(title_text="Event Observation Count",secondary_y=True)
    pio.write_image(fig, "/home/jovyan/work/spark/"+expr+".png", format='png')

In [8]:
sparkSession.stop()