This cell may take a few seconds to execute.


In [1]:
!pip install pathway bokeh --quiet

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [3]:
df = pd.read_csv('dataset.csv')
df

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [4]:
df.columns

Index(['ID', 'SystemCodeNumber', 'Capacity', 'Latitude', 'Longitude',
       'Occupancy', 'VehicleType', 'TrafficConditionNearby', 'QueueLength',
       'IsSpecialDay', 'LastUpdatedDate', 'LastUpdatedTime'],
      dtype='object')

In [5]:
df.SystemCodeNumber.unique()

array(['BHMBCCMKT01', 'BHMBCCTHL01', 'BHMEURBRD01', 'BHMMBMMBX01',
       'BHMNCPHST01', 'BHMNCPNST01', 'Broad Street', 'Others-CCCPS105a',
       'Others-CCCPS119a', 'Others-CCCPS135a', 'Others-CCCPS202',
       'Others-CCCPS8', 'Others-CCCPS98', 'Shopping'], dtype=object)

In [6]:
df.Latitude.unique()

array([26.14453614, 26.14449459, 26.14901995, 20.0000347 , 26.14001386,
       26.14004753, 26.13795775, 26.14747299, 26.14754061, 26.14749943,
       26.14749053, 26.14754886, 26.14749998, 26.15050395])

In [7]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

## Now we will add the other columns also so that we can make a better price model

In [8]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["Timestamp", "Occupancy", "Capacity","IsSpecialDay","VehicleType","TrafficConditionNearby","QueueLength","SystemCodeNumber"]].to_csv("parking_stream.csv", index=False)


In [9]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str               # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int               # Number of occupied parking spots
    Capacity: int                # Total parking capacity at the location
    IsSpecialDay: int            # 0 for normal day and 1 for any event or holiday
    VehicleType : str            # ex car,bike,truck
    TrafficConditionNearby : str # ex low,average,high
    QueueLength : int            # vehicles in queue
    SystemCodeNumber : str       # lot id of 14 different lots

In [10]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)



In [11]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"


# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
     t = data.Timestamp.dt.strptime(fmt),
     day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)



In [12]:
df2 = pd.read_csv("parking_stream.csv")
print(df2.shape)

(18368, 8)


# Step 2: Making a simple pricing function

This is our model 1

In [13]:
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime


delta_window = (
    data_with_time.windowby(
        pw.this.t,
        instance=(pw.this.SystemCodeNumber, pw.this.day),
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t=pw.this._pw_window_end,
        SystemCodeNumber=pw.this._pw_instance[0],
        occ_max=pw.reducers.max(pw.this.Occupancy),
        occ_min=pw.reducers.min(pw.this.Occupancy),
        cap=pw.reducers.max(pw.this.Capacity)
    )
    .with_columns(
        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)

# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [14]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()



In [15]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()



## Now we will make the model 2
In this model we will be using multiple features to generate the demand.
We will construct a mathematical demand function using key features:
– Occupancy rate
– Queue length
– Traffic level
– Special day
– Vehicle type

In [28]:
# UDF to map traffic condition strings to numerical weights
@pw.udf
def map_traffic_weight(level: str) -> float:
    if level == "low":
        return 0.0
    elif level == "average":
        return 0.5
    elif level == "high":
        return 1.0
    return 0.0  # default fallback

# UDF to map vehicle type strings to numerical weights
@pw.udf
def map_vehicle_weight(vtype: str) -> float:
    if vtype == "bike":
        return 0.2
    elif vtype == "car":
        return 0.5
    elif vtype == "truck":
        return 1.0
    return 0.5  # default fallback

# UDF to clip the raw demand score between 0 and 2.5
@pw.udf
def clip_demand(x: float) -> float:
    return max(0, min(x, 2.5))


In [29]:
import datetime

step1 = data_with_time.windowby(
    pw.this.t,
    instance=(pw.this.SystemCodeNumber, pw.this.day),  # one window per lot per day
    window=pw.temporal.tumbling(datetime.timedelta(days=1)),
    behavior=pw.temporal.exactly_once_behavior()
).reduce(
    t = pw.this._pw_window_end,
    SystemCodeNumber = pw.this._pw_instance[0],
    occ_avg = pw.reducers.avg(pw.this.Occupancy),
    cap = pw.reducers.max(pw.this.Capacity),
    queue_avg = pw.reducers.avg(pw.this.QueueLength),
    is_special = pw.reducers.max(pw.this.IsSpecialDay),
    traffic_mode = pw.reducers.max(pw.this.TrafficConditionNearby),
    vehicle_mode = pw.reducers.max(pw.this.VehicleType)
)

step1 = step1.with_columns(

    # Convert traffic_mode to numerical weight

    traffic_weight = pw.if_else(
        pw.this.traffic_mode == "low", 0.0,
        pw.if_else(
            pw.this.traffic_mode == "average", 0.5,
            pw.if_else(
                pw.this.traffic_mode == "high", 1.0,
                0.0  # default fallback
            )
        )
    ),

    # Convert vehicle_mode to numerical weight
    vehicle_weight = pw.if_else(
        pw.this.vehicle_mode == "bike", 0.2,
        pw.if_else(
            pw.this.vehicle_mode == "car", 0.5,
            pw.if_else(
                pw.this.vehicle_mode == "truck", 1.0,
                0.5  # default fallback
            )
        )
    )
)

In [30]:
step2 = step1.with_columns(
    raw_demand = (
        (pw.this.occ_avg / pw.this.cap) +
        0.2 * pw.this.queue_avg -
        0.3 * pw.this.traffic_weight +
        0.5 * pw.this.is_special +
        0.2 * pw.this.vehicle_weight
    )
)


In [31]:
step3 = step2.with_columns(
    normalized_demand = clip_demand(pw.this.raw_demand)
)


In [32]:
delta_window2 = step3.with_columns(
    price = 10 * (1 + 0.2* pw.this.normalized_demand)
)


In [33]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    print("[DEBUG] price_plotter called with", len(source.data["t"]), "rows")
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz2 = delta_window2.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz2).servable()

[DEBUG] price_plotter called with 0 rows




In [34]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()

