In [2]:
!pip install pathway bokeh --quiet

In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

In [4]:
df = pd.read_csv("dataset.csv")

# Map string values to numerical representations
df['TrafficConditionNearby'] = df['TrafficConditionNearby'].replace({
    'low': -1,
    'average': 0,
    'high' : 1
})

# Fill missing values in 'TrafficConditionNearby' with 0 and cast to int
df['TrafficConditionNearby'] = df['TrafficConditionNearby'].fillna(0).astype(int)

df

  df['TrafficConditionNearby'] = df['TrafficConditionNearby'].replace({


Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,-1,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,-1,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,-1,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,-1,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,-1,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,0,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,-1,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,-1,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,-1,2,0,19-12-2016,16:03:00


In [5]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)


In [6]:
df.VehicleType.unique()

array(['car', 'bike', 'cycle', 'truck'], dtype=object)

In [7]:
# Map string values to numerical representations
# the numbers are set in order to replicate the proportion of extra charges/discount for parking
df.VehicleType = df.VehicleType.replace({
    'car': 3,
    'bike': 1,
    'cycle' : -1,
    'truck' : 5,
})

df

  df.VehicleType = df.VehicleType.replace({


Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,3,-1,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00
1,5248,BHMNCPHST01,1200,26.140014,91.731000,237,1,-1,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
2,3936,BHMMBMMBX01,687,20.000035,78.000003,264,3,-1,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
3,6560,BHMNCPNST01,485,26.140048,91.730972,249,3,-1,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
4,17056,Shopping,1920,26.150504,91.733531,614,-1,-1,2,0,04-10-2016,07:59:00,2016-10-04 07:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...,...
18363,3935,BHMEURBRD01,470,26.149020,91.739503,373,3,-1,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18364,2623,BHMBCCTHL01,387,26.144495,91.736205,387,3,-1,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18365,1311,BHMBCCMKT01,577,26.144536,91.736172,193,-1,-1,2,0,19-12-2016,16:30:00,2016-12-19 16:30:00
18366,17055,Others-CCCPS98,3103,26.147500,91.727978,1671,3,-1,3,0,19-12-2016,16:30:00,2016-12-19 16:30:00


In [8]:
df_for_each_code = {}
for code_no in df.SystemCodeNumber.unique() :
  df_for_each_code[code_no] = df[df['SystemCodeNumber']==code_no]

# target_lot is the parking lot whose prices we would be optimising
target_lot = 'BHMBCCMKT01'
other_lots={code for code in df.SystemCodeNumber.unique() if code!=target_lot}

df_for_each_code[target_lot]

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,3,-1,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00
27,1,BHMBCCMKT01,577,26.144536,91.736172,64,3,-1,1,0,04-10-2016,08:25:00,2016-10-04 08:25:00
37,2,BHMBCCMKT01,577,26.144536,91.736172,80,3,-1,2,0,04-10-2016,08:59:00,2016-10-04 08:59:00
50,3,BHMBCCMKT01,577,26.144536,91.736172,107,3,-1,2,0,04-10-2016,09:32:00,2016-10-04 09:32:00
67,4,BHMBCCMKT01,577,26.144536,91.736172,150,1,-1,2,0,04-10-2016,09:59:00,2016-10-04 09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...,...
18303,1307,BHMBCCMKT01,577,26.144536,91.736172,309,3,0,4,0,19-12-2016,14:30:00,2016-12-19 14:30:00
18319,1308,BHMBCCMKT01,577,26.144536,91.736172,300,-1,-1,3,0,19-12-2016,15:03:00,2016-12-19 15:03:00
18327,1309,BHMBCCMKT01,577,26.144536,91.736172,274,-1,-1,2,0,19-12-2016,15:29:00,2016-12-19 15:29:00
18352,1310,BHMBCCMKT01,577,26.144536,91.736172,230,3,0,2,0,19-12-2016,16:03:00,2016-12-19 16:03:00


In [9]:
# location_info will contain the information about the distance of the other parking lots from the target lot and the effect_coeff is the factor
# which is proportional to the effect that it is going to have on the price of target lot and thus, is set as 1/distance.
location_info = df[['SystemCodeNumber','Latitude','Longitude']].copy().drop_duplicates()
location_info ['del_lat'] = (location_info['Latitude'] - 26.144536) * (np.pi/180)
location_info ['del_long'] = (location_info['Longitude'] - 91.736172) * (np.pi/180)

r=6371000
location_info ['distance_in_metres'] = 2 * r * np.arcsin(np.sqrt((np.sin(location_info ['del_lat'] / 2))**2 + np.cos(location_info ['Latitude'] * np.pi / 180) * np.cos(26.144536 * np.pi/180) * (np.sin(location_info ['del_long'] / 2))**2 ) )

location_info.drop(0, inplace=True) # removed target lot
location_info['effect_coeff'] = 1/location_info['distance_in_metres']
location_info = location_info.sort_values('distance_in_metres')
location_info

Unnamed: 0,SystemCodeNumber,Latitude,Longitude,del_lat,del_long,distance_in_metres,effect_coeff
10,BHMBCCTHL01,26.144495,91.736205,-7.227408e-07,5.782276e-07,5.66906,0.1763961
5,BHMEURBRD01,26.14902,91.739503,7.825969e-05,5.814564e-05,599.3126,0.001668578
4,Shopping,26.150504,91.733531,0.0001041604,-4.609257e-05,714.0445,0.001400473
3,BHMNCPNST01,26.140048,91.730972,-7.833858e-05,-9.075136e-05,720.0618,0.00138877
1,BHMNCPHST01,26.140014,91.731,-7.892623e-05,-9.027419e-05,720.7044,0.001387532
8,Others-CCCPS105a,26.147473,91.728049,5.126015e-05,-0.0001417707,874.0976,0.001144037
6,Broad Street,26.137958,91.740994,-0.0001148121,8.416763e-05,875.6559,0.001142001
11,Others-CCCPS135a,26.147499,91.728005,5.172161e-05,-0.000142543,879.2937,0.001137276
12,Others-CCCPS202,26.147491,91.727997,5.156628e-05,-0.0001426828,879.6649,0.001136796
13,Others-CCCPS98,26.1475,91.727978,5.173121e-05,-0.0001430161,881.8259,0.001134011


In [10]:
effect_summation = location_info.effect_coeff.sum()

In [11]:
dist_sorted_lots = location_info['SystemCodeNumber'].to_numpy()

In [12]:
# Save the selected columns to a CSV file for streaming or downstream processing
df_for_each_code[target_lot][["Timestamp", "Occupancy", "Capacity", "QueueLength", "TrafficConditionNearby", "IsSpecialDay", "VehicleType"]].to_csv(f"parking_stream_{target_lot}.csv", index=False)

for code in other_lots :
  df_for_each_code[code][["Timestamp", "Occupancy", "Capacity", "QueueLength", "TrafficConditionNearby", "IsSpecialDay", "VehicleType"]].to_csv(f"parking_stream_{code}.csv", index=False)




In [13]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location
    QueueLength : int  # Quelength at the location
    TrafficConditionNearby : int  # traffic condition nearby the location
    IsSpecialDay : int            # whwther the day is special or not
    VehicleType : int             # type of vehicle

In [14]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=100 means approximately 100 rows per second will be ingested into the stream.

data_target = pw.demo.replay_csv(f"parking_stream_{target_lot}.csv", schema=ParkingSchema, input_rate=100)

data_other = {}
for code in other_lots :
  data_other[code] = pw.demo.replay_csv(f"parking_stream_{code}.csv", schema=ParkingSchema, input_rate=100)

In [15]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_target_with_time = data_target.with_columns(
    t = data_target.Timestamp.dt.strptime(fmt),
    day = data_target.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

data_other_with_time = {}
for code in other_lots :
  data_other_with_time[code] = data_other[code].with_columns(
      t = data_other[code].Timestamp.dt.strptime(fmt),
      day = data_other[code].Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
  )




\# Step 2: Making a simple pricing function


In [16]:
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime

#from collections import Counter

# def mode_func(traffic_tuple):
#     # Returns the most common element (the mode)
#     return Counter(traffic_tuple).most_common(1)[0][0]


delta_window_other = {}
target_lot_price_contribution = {}
for code in other_lots :
  effect_coeff = location_info.loc[location_info.SystemCodeNumber == code , 'effect_coeff'].iloc[0]      # effect_coeff is a column in location_info dataframe
  delta_window_other[code] = (
      data_other_with_time[code].windowby(
          pw.this.t,                                                            # Event time column to use for windowing (parsed datetime)
          instance=pw.this.day,                                                 # Logical partitioning key: one instance per calendar day
          window=pw.temporal.tumbling(datetime.timedelta(hours=3)),             # Fixed-size window of 3 hours
          behavior=pw.temporal.exactly_once_behavior()
      )
      .reduce(
          t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
          day=pw.this._pw_instance ,
          occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
          occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
          cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
          effect_coeff = effect_coeff,
      )
      .with_columns(
          # the ratio (occ_max - occ_min)/cap gives an idea about demand fluctuation
          # the pricing logic for the other parking lots is to add 5 times this ratio to the base price of $ 10 following model-1 as per the problem statement
          # the factor of 5 is chosen so that the price reaches a max of 15 maintaining a similar range as the price of the target lot (which is executed next)
          price=10 + 5 * ((pw.this.occ_max - pw.this.occ_min) / pw.this.cap)
      )

  )




In [17]:
a_1 , a_2 , a_3 , a_4 , a_5 = 1 , 0.1 , 1 , -1 , 0.2
delta_window_target = (
    data_target_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(hours=3)),  # Fixed-size window of 3 hours
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                     # Assign the end timestamp of each window
        day=pw.this._pw_instance ,
        occ=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
        cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
        que_length=pw.reducers.avg(pw.this.QueueLength),
        traffic_condition=pw.reducers.avg(pw.this.TrafficConditionNearby),
        is_special_day=pw.reducers.max(pw.this.IsSpecialDay),
        vehicle_type=pw.reducers.avg(pw.this.VehicleType)
    )

    .with_columns(
        demand = a_1 * (pw.this.occ / pw.this.cap) + a_2 * (pw.this.que_length) + a_3 * (pw.this.traffic_condition) + a_4 * (pw.this.is_special_day) + a_5 * (pw.this.vehicle_type) ,
        # a_1 , a_2 , a_3 , a_4 , a_5 = 1 , 0.1 , 1 , -1 , 0.2
        # the values of the constants are decided based on the impact that they should be having on the price
        # for special days, there is a discount which is due to (a_4 = -1)
        norm_factor = a_1 + a_2 * (10) + a_3  + a_5 * (5)     # (demand <= norm_factor) always
    )
    .with_columns(
        # this is the price for the target_lot based on the Model-2 as stated in the Problem Statement which is going to
        # include the features of model-3 in the later steps

        price = 10 * (1 + (pw.this.demand / pw.this.norm_factor))
    )
)

In [18]:
# this is a way to improve our price function for the target_lot as compared to its competitors
# we have the difference between the price of target_lot and its other competitors.
# beginning with the closest competitor, we update the price of the target_lot for each of the other_lots
for code in other_lots :
  joint_stats = (
      delta_window_target.join(
          delta_window_other[code],pw.left.t == pw.right.t, pw.left.day == pw.right.day
      )
      .select(
          *pw.left,
          **{f"price_competitor_{code}": pw.right.price, f"effect_coeff_{code}": pw.right.effect_coeff}
      )
      .with_columns(
          # effect_summation is the sum of all the effect_coeff values
          # the difference between the competitor price and target price is factored with the ratio of the effect_coeff and effect_summation
          # 0.6 is a factor which is applied on the expression to give the change in the target price value
          price = pw.this.price + 0.6 * (pw.this[f"price_competitor_{code}"] - pw.this.price) * pw.this[f"effect_coeff_{code}"] / effect_summation
      )

  )
  delta_window_target = joint_stats # Update delta_window_target with the new table (The primary objective is to update the price after each iteration)

In [30]:
# Activate the Panel extension
pn.extension()

# Create plotting function for target lot
def target_price_plotter(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=600,
        title="Target Lot (BHMBCCMKT01) - Daily Parking Price",
        x_axis_type="datetime",
        tools="pan,wheel_zoom,box_zoom,reset,save"
    )
    fig.line("t", "price", source=source, line_width=2, color="navy", alpha=0.8)
    fig.circle("t", "price", source=source, size=6, color="navy", alpha=0.8)
    fig.yaxis.axis_label = "Price ($)"
    return fig

# Create plotting function for comparison lots
def comparison_price_plotter(source, lot_code):
    fig = bokeh.plotting.figure(
        height=400,
        width=600,
        title=f"Comparison Lot ({lot_code}) - Daily Parking Price",
        x_axis_type="datetime",
        tools="pan,wheel_zoom,box_zoom,reset,save"
    )
    fig.line("t", "price", source=source, line_width=2, color="red", alpha=0.8)
    fig.circle("t", "price", source=source, size=6, color="red", alpha=0.8)
    fig.yaxis.axis_label = "Price ($)"
    fig.xaxis.axis_label = "Time"
    return fig

# Create target visualization
target_viz = delta_window_target.plot(target_price_plotter, sorting_col="t")

# Create comparison visualizations
comparison_plots = []
for code in other_lots:
    delta_window_comparison = delta_window_other[code]
    comparison_viz = delta_window_comparison.plot(
        lambda source, code=code: comparison_price_plotter(source, code),
        sorting_col="t"
    )
    comparison_plots.append(comparison_viz)

# Create a grid layout with all plots
# Using a 3x5 grid (3 columns, 5 rows) to accommodate 14 plots
plots_grid = [target_viz] + comparison_plots

# Arrange plots in rows of 3
plot_rows = []
for i in range(0, len(plots_grid), 3):
    row_plots = plots_grid[i:i+3]
    plot_rows.append(pn.Row(*row_plots))

# Create the final dashboard
dashboard = pn.Column(
    pn.pane.Markdown("# Parking Price Comparison Dashboard - All Lots"),
    pn.pane.Markdown("## Target Lot vs. All Comparison Lots"),
    *plot_rows,
    pn.pane.Markdown("---"),
    pn.pane.Markdown("**Note:** All plots use synchronized time scales for easy comparison")
).servable()

#dashboard
# please uncomment the previous line




In [31]:
# %%capture --no-display
# pw.run()

# please uncomment the previous line


In [21]:
# re-routing logic is to look for a parking lot that is less crowded and is charging less and is among the 5 closest neighbors to target_lot
for code in dist_sorted_lots[:5] :
  new_joint_stats = (
    delta_window_target.join(
      delta_window_other[code],pw.left.t == pw.right.t, pw.left.day == pw.right.day
    )
    .select(
        *pw.left,
        **{f"occ_max_competitor_{code}": pw.right.occ_max, f"cap_competitor_{code}": pw.right.cap, f"reroute_price_{code}" : pw.right.price}
    )
    .with_columns(
        is_alert = (
            (pw.this.occ / pw.this.cap > 0.7 )
            & (pw.this.price > pw.this[f"price_competitor_{code}"])
            & (pw.this[f"occ_max_competitor_{code}"]/pw.this[f"cap_competitor_{code}"] < 0.7 )
        ),
        reroute_code = code
    )

    .with_columns(
        action = pw.if_else(
            pw.this.is_alert,
            "reroute",
            "hold"
        )
    )

  )

  alerts = (
    new_joint_stats
    .filter(pw.this.is_alert)
    .select(pw.this.day, pw.this.t, pw.this.price, pw.this.action,pw.this.reroute_code,pw.this[f"reroute_price_{code}"])
  )


In [32]:
#alerts    # shows the points where alert is generated
# please uncomment the previous line

In [33]:
#pw.run()
# please uncomment the previous line

In [26]:
import bokeh.models

def stats_plotter(src):
    actions=["reroute", "hold"]
    color_map = bokeh.models.CategoricalColorMapper(
        factors=actions,
        palette=("#00ff00", "#ff0000", "#00000000")
    )

    fig = bokeh.plotting.figure(
        height=500, width=5000,
        title="Pricing at target_lot",
        x_axis_type="datetime"
    )

    fig.line("t", "price", source=src)




    fig.scatter(
        "t", "price",
        size=10, marker="circle",
        color={"field": "action", "transform": color_map},
        legend_field="action",
        source=src
    )

    return fig




In [34]:
#new_joint_stats.plot(stats_plotter, sorting_col="t")      # the points where reroute alert is generated are shown in green color, where as "hold" means that the target_lot should be preferred
# please uncomment the previous line

In [35]:
# pw.run()
# please uncomment the previous line