# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [None]:
!pip install pathway bokeh --quiet

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m5.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m41.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m11.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m67.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [None]:
df = pd.read_csv('dataset.csv')

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

In [None]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:
df['Occupancy_ratio']=df['Occupancy']/(df['Capacity']+1e-6)
df['QueueLength_pressure']=df['QueueLength']/(df['Capacity']+1e-6)

In [None]:
# from sklearn.preprocessing import LabelEncoder
# encoder = LabelEncoder()
# df['SystemCodeNumber'] = encoder.fit_transform(df['SystemCodeNumber'])

In [None]:
import pandas as pd

# Ensure Timestamp is datetime
df['Timestamp'] = pd.to_datetime(df['Timestamp'])

# Extract date
df['Date'] = df['Timestamp'].dt.date

# Sort to ensure the first entry per group is truly the earliest
df = df.sort_values(by=['SystemCodeNumber', 'Date', 'Timestamp'])

# Group by SystemCodeNumber and Date, then take the first row
base_values = df.groupby(['SystemCodeNumber', 'Date']).agg(
    base_Occupancy_ratio=('Occupancy_ratio', 'first'),
    base_QueueLength_pressure=('QueueLength_pressure', 'first')
).reset_index()

# Merge back with main DataFrame
df = pd.merge(df, base_values, on=['SystemCodeNumber', 'Date'], how='left')


In [None]:
df.head(2)

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp,Occupancy_ratio,QueueLength_pressure,Date,base_Occupancy_ratio,base_QueueLength_pressure
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00,0.105719,0.001733,2016-10-04,0.105719,0.001733
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00,2016-10-04 08:25:00,0.110919,0.001733,2016-10-04,0.105719,0.001733


In [None]:
df.drop("Date",axis=1,inplace=True)

In [None]:
# from sklearn.preprocessing import LabelEncoder

# label_encoder = LabelEncoder()
# df['SystemCodeNumber_Encoded'] = label_encoder.fit_transform(df['SystemCodeNumber'])

In [None]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["SystemCodeNumber","base_QueueLength_pressure","base_Occupancy_ratio","Timestamp", "Occupancy", "Capacity","QueueLength","Occupancy_ratio","QueueLength_pressure"]].to_csv("model_1.csv", index=False)

# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [None]:
df.head(2)

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp,Occupancy_ratio,QueueLength_pressure,base_Occupancy_ratio,base_QueueLength_pressure
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00,0.105719,0.001733,0.105719,0.001733
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00,2016-10-04 08:25:00,0.110919,0.001733,0.105719,0.001733


In [None]:
df.loc[(df['SystemCodeNumber']=='BHMBCCMKT01') & (df['LastUpdatedDate']=="04-10-2016")].head(2)

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp,Occupancy_ratio,QueueLength_pressure,base_Occupancy_ratio,base_QueueLength_pressure
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00,0.105719,0.001733,0.105719,0.001733
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00,2016-10-04 08:25:00,0.110919,0.001733,0.105719,0.001733


In [None]:
df.head(2)

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime,Timestamp,Occupancy_ratio,QueueLength_pressure,base_Occupancy_ratio,base_QueueLength_pressure
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00,2016-10-04 07:59:00,0.105719,0.001733,0.105719,0.001733
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00,2016-10-04 08:25:00,0.110919,0.001733,0.105719,0.001733


# model_1

In [None]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location
    QueueLength: int # Number of cars waiting in the queue
    SystemCodeNumber: str # Unique identifier for the parking spot
    Occupancy_ratio: float
    QueueLength_pressure: float
    base_Occupancy_ratio: float
    base_QueueLength_pressure: float
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("model_1.csv", schema=ParkingSchema, input_rate=1000)
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00"),
)

# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime
base_price = 10.0
alpha = 5.0
beta = 10.0
min_price = 7.0
max_price = 20.0
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime
import math
def custom_smooth_demand(x, p=1.5, q=1.5):
    if x < 1:
        return -0.5 * (1 - x)**p
    else:
        return 1 - 1 / ((x + 1)**q)
def smooth(demand,base):
  lam=custom_smooth_demand(demand)
  return base*(1+lam)

delta_window = (
    data_with_time.groupby(pw.this.SystemCodeNumber).windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=2/24)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        SystemCodeNumber=pw.this._pw_instance,  # Unique identifier for the parking spot
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        Occupancy_ratio=pw.reducers.max(pw.this.Occupancy_ratio),      # Highest occupancy observed in the window     # Lowest occupancy observed in the window
        q_pressure=pw.reducers.max(pw.this.QueueLength_pressure),
        base_Occupancy_ratio=pw.reducers.max(pw.this.base_Occupancy_ratio),
        base_QueueLength_pressure=pw.reducers.max(pw.this.base_QueueLength_pressure),
    ).with_columns(
        temp=alpha*pw.this.Occupancy_ratio+beta*pw.this.q_pressure,
        base_temp=alpha*pw.this.base_Occupancy_ratio+beta*pw.this.base_QueueLength_pressure,
    ).with_columns(
        demand=pw.this.temp/pw.this.base_temp
    )
.with_columns(
    price = pw.apply(
        lambda demand: smooth(demand, base_price),
        pw.this.demand
    )
)
)
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface




In [None]:
%%capture --no-display
pw.run()



# **Model_2**

In [None]:
df['VehicleType']=df['VehicleType'].map({
     'car': 0.2,
    'bike': 0.1,
    'truck': 0.4,
    'cycle': 0.05
})
df['TrafficConditionNearby']=df['TrafficConditionNearby'].map({
   'low': 1,
    'avg': 5,
    'average': 5,    # Just in case your dataset uses "Average"
    'medium': 5,     # Optional alias
    'high': 10
})
print("1"*30)
import pandas as pd

# Ensure Timestamp is datetime
df['Timestamp'] = pd.to_datetime(df['Timestamp'])

# Extract date
df['Date'] = df['Timestamp'].dt.date

# Sort to ensure the first entry per group is truly the earliest
df = df.sort_values(by=['SystemCodeNumber', 'Date', 'Timestamp'])

# Group by SystemCodeNumber and Date, then take the first row
base_values = df.groupby(['SystemCodeNumber', 'Date']).agg(
    base_Occupancy=('Occupancy', 'first'),
    base_QueueLength=('QueueLength', 'first')
).reset_index()

# Merge back with main DataFrame
df = pd.merge(df, base_values, on=['SystemCodeNumber', 'Date'], how='left')
df.drop("Date",axis=1,inplace=True)
df['delta_occ']=df['Occupancy']-df['base_Occupancy']
df['delta_q']=df['QueueLength']-df['base_QueueLength']


In [None]:
df[['Capacity',"delta_occ","delta_q",'SystemCodeNumber',"Capacity","Occupancy","VehicleType","TrafficConditionNearby","QueueLength","IsSpecialDay","Timestamp","Occupancy_ratio","QueueLength_pressure","base_Occupancy_ratio","base_QueueLength_pressure"]].to_csv("model_2.csv", index=False)

In [None]:
class ParkingScheme(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location
    QueueLength: int # Number of cars waiting in the queue
    SystemCodeNumber: str # Unique identifier for the parking spot
    Occupancy_ratio: float
    QueueLength_pressure: float
    base_Occupancy_ratio: float
    base_QueueLength_pressure: float
    VehicleType: float
    TrafficConditionNearby: float
    IsSpecialDay: int
    delta_occ: int
    delta_q: int

data = pw.demo.replay_csv("model_2.csv", schema=ParkingScheme, input_rate=1000)
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
table = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

print("debug2"*30)
data2 = pd.read_csv("model_2.csv")
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime
from datetime import timedelta # Import timedelta
import pathway as pw # Ensure pathway is imported

# Constants
base_price = 10.0
alpha= 5.0
beta = 3.0
gamma = 1.5
delta= 2.0
epsilon = 1.2
lam= 0.1  # moderate price scaling
min_price = 7.0
max_price = 20.0

# Calculate maximum values from the original pandas DataFrame for normalization
# This is a workaround to resolve the AttributeError in the current pipeline structure.
# For a truly dynamic streaming approach, normalization should be handled differently.
max_occ_ratio = data2['Occupancy_ratio'].max() if not data2['Occupancy_ratio'].empty else 1.0 # Avoid division by zero if empty
max_q_pressure = data2['QueueLength_pressure'].max() if not data2['QueueLength_pressure'].empty else 1.0
max_traffic = data2['TrafficConditionNearby'].max() if not data2['TrafficConditionNearby'].empty else 1.0
max_is_special = data2['IsSpecialDay'].max() if not data2['IsSpecialDay'].empty else 1.0
max_vehicle = data2['VehicleType'].max() if not data2['VehicleType'].empty else 1.0

# Avoid division by zero in normalization denominator
normalization_denominator = (alpha * max_occ_ratio + beta * max_q_pressure + gamma * max_traffic + delta * max_is_special + epsilon * max_vehicle)
if normalization_denominator == 0:
    normalization_denominator = 1.0 # Prevent division by zero

import math

def custom_smooth_demand(x, p=1.5, q=1.5):
    if x < 1:
        return -0.5 * (1 - x)**p
    else:
        return 1 - 1 / ((x + 1)**q)
def smooth(demand,base):
  lam=custom_smooth_demand(demand)
  return base*(1+lam)

delta_window =  (table.groupby(pw.this.SystemCodeNumber).windowby(
    pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.SystemCodeNumber,  # Logical partitioning key: one instance per parking spot
        window=pw.temporal.tumbling(datetime.timedelta(hours=0.4)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-o
).reduce(
    SystemCodeNumber=pw.this._pw_instance,
    t=pw.this._pw_window_end,  # Assign the end timestamp of each window
    max_Occupancy_ratio=pw.reducers.max(pw.this.Occupancy_ratio),
    max_q_pressure=pw.reducers.max(pw.this.QueueLength),
    max_traffic=pw.reducers.max(pw.this.TrafficConditionNearby),
    max_vehicle=pw.reducers.max(pw.this.VehicleType),
    max_is_special=pw.reducers.max(pw.this.IsSpecialDay),
)
.with_columns(
    demand=alpha*pw.this.max_Occupancy_ratio + beta*pw.this.max_q_pressure + gamma*pw.this.max_traffic + delta*pw.this.max_is_special + epsilon*pw.this.max_vehicle
)
.with_columns(
    price = pw.apply(
        lambda demand: smooth(demand/normalization_denominator, base_price),
        pw.this.demand
    )
)
)
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=10000,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()

# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()