# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [1]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m12.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m41.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m9.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m59.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [3]:
from google.colab import files
uploaded = files.upload()
df = pd.read_csv('/content/dataset.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Saving dataset.csv to dataset.csv


Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [4]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values(['SystemCodeNumber', 'Timestamp']).reset_index(drop=True)

In [5]:

#df = pd.read_csv("/content/dataset.csv")


df["Timestamp"] = pd.to_datetime(df["Timestamp"])

# Pre-map Traffic
traffic_map = {"Low": 0, "Avg": 0.5, "High": 1}
df["TrafficValue"] = df["TrafficConditionNearby"].map(traffic_map).fillna(0)

# Pre-map Vehicle Type
vehicle_weight_map = {"bike": 0.05, "cycle": 0.02, "car": 0.1, "truck": 0.2}
df["VehicleWeight"] = df["VehicleType"].map(vehicle_weight_map).fillna(0.1)

# Clean SpecialDay
df["SpecialDay"] = df["IsSpecialDay"].fillna(0).astype(int)

#Normalising QueueLength
df["Queue"] = df["QueueLength"]/max(df["QueueLength"])

df[["Timestamp", "Occupancy", "Capacity", "SystemCodeNumber", "Queue", "TrafficValue", "VehicleWeight", "SpecialDay"]].to_csv(
    "parking_stream_model2.csv", index=False
)
d = pd.read_csv("parking_stream_model2.csv")
print("Columns in CSV:", d.columns.tolist())
print("\nFirst few rows:")
print(d.head())




Columns in CSV: ['Timestamp', 'Occupancy', 'Capacity', 'SystemCodeNumber', 'Queue', 'TrafficValue', 'VehicleWeight', 'SpecialDay']

First few rows:
             Timestamp  Occupancy  Capacity SystemCodeNumber     Queue  \
0  2016-10-04 07:59:00         61       577      BHMBCCMKT01  0.066667   
1  2016-10-04 08:25:00         64       577      BHMBCCMKT01  0.066667   
2  2016-10-04 08:59:00         80       577      BHMBCCMKT01  0.133333   
3  2016-10-04 09:32:00        107       577      BHMBCCMKT01  0.133333   
4  2016-10-04 09:59:00        150       577      BHMBCCMKT01  0.133333   

   TrafficValue  VehicleWeight  SpecialDay  
0           0.0           0.10           0  
1           0.0           0.10           0  
2           0.0           0.10           0  
3           0.0           0.10           0  
4           0.0           0.05           0  


##Setting all the parameters for Model 2

In [29]:
pn.extension()

# Parameters for model 2
base_price = 10
lambda_ = 2
alpha = 1      # for Occupancy/Capacity
beta = 1     # for QueueLength
gamma = 0.75   # for Traffic (negative coefficient)
delta = 0.25    # for IsSpecialDay
epsilon = 0.75  # for VehicleTypeWeight

# Define Schema
class ParkingSchema(pw.Schema):
    Timestamp: str
    Occupancy: int
    Capacity: int
    Queue: float
    SystemCodeNumber: str
    TrafficValue: float
    VehicleWeight: float
    SpecialDay: int

## Making the Price Function

In [30]:
import datetime
# Load stream
data = pw.demo.replay_csv("parking_stream_model2.csv", schema=ParkingSchema, input_rate=1000)

# Parse timestamp and day
fmt = "%Y-%m-%d %H:%M:%S"
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

# 1 Day tumbling window per lot per day
windowed_stats = (
    data_with_time
    .windowby(
        pw.this.t,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        instance=pw.this.day + pw.this.SystemCodeNumber
    )
    .reduce(
        t = pw.this._pw_window_end,
        lot_id = pw.reducers.min(pw.this.SystemCodeNumber),
        occ_sum = pw.reducers.sum(pw.this.Occupancy),
        cap_max = pw.reducers.max(pw.this.Capacity),
        queue_sum = pw.reducers.sum(pw.this.Queue),
        traffic_sum = pw.reducers.sum(pw.this.TrafficValue),
        vehicle_sum = pw.reducers.sum(pw.this.VehicleWeight),
        special_day = pw.reducers.max(pw.this.SpecialDay),
        n = pw.reducers.count()
    )
)

# Calculate averages and occupancy rate
base_stats = windowed_stats.with_columns(
    occ_avg = pw.this.occ_sum / pw.this.n,
    queue_avg = pw.this.queue_sum / pw.this.n,
    traffic_avg = pw.this.traffic_sum / pw.this.n,
    vehicle_avg = pw.this.vehicle_sum / pw.this.n,
    occupancy_rate = pw.this.occ_sum / (pw.this.n * pw.this.cap_max)
)

# Calculate demand using the formula :
# Demand = α·(Occupancy/Capacity) + β·QueueLength - γ·Traffic + δ·IsSpecialDay + ε·VehicleTypeWeight
demand_stats = base_stats.with_columns(
    demand_raw = (
        alpha * pw.this.occupancy_rate +
        beta * pw.this.queue_avg -
        gamma * pw.this.traffic_avg +
        delta * pw.this.special_day +
        epsilon * pw.this.vehicle_avg
    )
)

# Simple normalization for streaming data
# Assuming demand typically ranges from -2 to 4 based on the formula
final_stats = demand_stats.with_columns(
    # Normalize demand to [0, 1] range
    normalized_demand = (pw.this.demand_raw + 2) / 6,  # Assuming range [-2, 4]

    # Calculate price using: Price_t = BasePrice * (1 + λ * NormalizedDemand)
    price = base_price * (1 + lambda_ * ((pw.this.demand_raw + 2) / 6))
)

##Plotting all 14 parking lot data

In [31]:
# Visualization function
def price_plotter(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title=f"Pathway: Model 2 Dynamic Pricing for {lot}",
        x_axis_type="datetime"
    )
    fig.line("t", "price", source=source, line_width=2, color="navy")
    fig.circle("t", "price", source=source, size=6, color="red")
    return fig

lot_ids = df["SystemCodeNumber"].unique()

viz_list = []

for lot in lot_ids:
    filtered = final_stats.filter(pw.this.lot_id == lot)
    viz = filtered.plot(price_plotter, sorting_col="t")
    viz_list.append(pn.Column(f"## {lot}", viz))

pn.Column(*viz_list).servable()




In [32]:
# Run Pathway
%%capture --no-display
pw.run()

Output()



## Function to calculate nearest parking lot

In [33]:
!pip install haversine --quiet

In [34]:
from haversine import haversine, Unit


meta_df = pd.read_csv("/content/dataset.csv")

coords = {}
for _, row in meta_df.iterrows():
    coords[row["SystemCodeNumber"]] = (row["Latitude"], row["Longitude"])

competitor_map = {}

for lot_i, coord_i in coords.items():
    min_dist = float("inf")
    competitor = None
    for lot_j, coord_j in coords.items():
        if lot_i == lot_j:
            continue
        dist = haversine(coord_i, coord_j)
        if dist < min_dist:
            min_dist = dist
            competitor = lot_j
    competitor_map[lot_i] = competitor

print(competitor_map)

{'BHMBCCMKT01': 'BHMBCCTHL01', 'BHMBCCTHL01': 'BHMBCCMKT01', 'BHMEURBRD01': 'BHMBCCMKT01', 'BHMMBMMBX01': 'BHMNCPNST01', 'BHMNCPHST01': 'BHMNCPNST01', 'BHMNCPNST01': 'BHMNCPHST01', 'Broad Street': 'BHMBCCTHL01', 'Others-CCCPS105a': 'Others-CCCPS135a', 'Others-CCCPS119a': 'Others-CCCPS8', 'Others-CCCPS135a': 'Others-CCCPS202', 'Others-CCCPS202': 'Others-CCCPS135a', 'Others-CCCPS8': 'Others-CCCPS119a', 'Others-CCCPS98': 'Others-CCCPS202', 'Shopping': 'BHMEURBRD01'}


## Plotting competitor vs parking lot comparision

In [35]:
import panel as pn
import bokeh.plotting

pn.extension()

plots = []

for lot_i, lot_j in competitor_map.items():
    our_data = final_stats.filter(pw.this.lot_id == lot_i)
    comp_data = final_stats.filter(pw.this.lot_id == lot_j)

    # Join on timestamp
    joined = our_data.join(
        comp_data,
        pw.left.t == pw.right.t,
        how=pw.JoinMode.INNER
    ).select(
        our_t = pw.left.t,
        our_price = pw.left.price,
        comp_price = pw.right.price
    )

    def dual_price_plotter(source, lot_i=lot_i, lot_j=lot_j):
        fig = bokeh.plotting.figure(
            height=400,
            width=800,
            title=f"Pricing Comparison: {lot_i} vs {lot_j}",
            x_axis_type="datetime"
        )
        fig.line("our_t", "our_price", source=source, line_width=2, color="navy", legend_label=f"{lot_i} (Our Price)")
        fig.circle("our_t", "our_price", source=source, size=4, color="navy")
        fig.line("our_t", "comp_price", source=source, line_width=2, color="red", legend_label=f"{lot_j} (Competitor Price)", line_dash="dashed")
        fig.circle("our_t", "comp_price", source=source, size=4, color="red")
        fig.legend.location = "top_left"
        return fig

    viz = joined.plot(dual_price_plotter, sorting_col="our_t")
    plots.append(pn.Column(f"### {lot_i} vs {lot_j}", viz))

# Display all 17 interactive graphs
pn.Column(*plots).servable()




In [36]:
# Run Pathway
%%capture --no-display
pw.run()

Output()

