# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [1]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m43.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m9.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m75.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [3]:
df = pd.read_csv('/content/dataset.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [4]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [5]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["Timestamp", "Occupancy", "Capacity"]].to_csv("parking_stream.csv", index=False)

# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [6]:

# -------------------------------
# MODEL SELECTION AND DEFINITIONS
# -------------------------------

# Set this to 1, 2, or 3 depending on the model you want to activate
MODEL_NUMBER = 1

# Model 1: Baseline Linear Pricing
def model_1_pricing(prev_price, occupancy, capacity):
    alpha = 2.0
    return prev_price + alpha * (occupancy / capacity)

# Model 2: Demand-Based Pricing
def model_2_pricing(base_price, occupancy, capacity, queue, traffic, is_special, vehicle_type):
    alpha, beta, gamma, delta, epsilon = 1.2, 0.5, 0.3, 2.0, 1.5
    vehicle_weights = {"car": 1.0, "bike": 0.5, "truck": 1.5}
    vehicle_weight = vehicle_weights.get(vehicle_type.lower(), 1.0)

    demand = (alpha * occupancy / capacity) + (beta * queue) - (gamma * traffic) + (delta * is_special) + (epsilon * vehicle_weight)
    demand = max(min(demand, 1), -1)  # Clamp between -1 and 1
    lambda_ = 0.4
    return base_price * (1 + lambda_ * demand)

# Model 3: Competitive Pricing Model
def model_3_pricing(base_price, occupancy, capacity, queue, traffic, is_special, vehicle_type, competitor_price, distance):
    # Example logic combining demand and competitor context
    own_demand_price = model_2_pricing(base_price, occupancy, capacity, queue, traffic, is_special, vehicle_type)

    # Adjust based on competitor price and distance
    if occupancy >= capacity and competitor_price < own_demand_price:
        reroute = True
        adjusted_price = max(base_price * 0.8, competitor_price - 1)
    elif competitor_price > own_demand_price:
        reroute = False
        adjusted_price = min(base_price * 1.5, own_demand_price + 1)
    else:
        reroute = False
        adjusted_price = own_demand_price

    return adjusted_price, reroute

# Example unified pricing wrapper
def compute_price(*args, **kwargs):
    if MODEL_NUMBER == 1:
        return model_1_pricing(kwargs['prev_price'], kwargs['occupancy'], kwargs['capacity'])
    elif MODEL_NUMBER == 2:
        return model_2_pricing(kwargs['base_price'], kwargs['occupancy'], kwargs['capacity'],
                               kwargs['queue'], kwargs['traffic'], kwargs['is_special'], kwargs['vehicle_type'])
    elif MODEL_NUMBER == 3:
        return model_3_pricing(kwargs['base_price'], kwargs['occupancy'], kwargs['capacity'],
                               kwargs['queue'], kwargs['traffic'], kwargs['is_special'],
                               kwargs['vehicle_type'], kwargs['competitor_price'], kwargs['distance'])
    else:
        raise ValueError("Invalid MODEL_NUMBER. Please choose 1, 2, or 3.")


In [7]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location


In [8]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=100)

In [9]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Step 2: Making a simple pricing function

In [10]:
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
        cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
    )
    .with_columns(
        # Compute the price using a simple dynamic pricing formula:
        #
        # Pricing Formula:
        #     price = base_price + demand_fluctuation
        #     where:
        #         base_price = 10 (fixed minimum price)
        #         demand_fluctuation = (occ_max - occ_min) / cap
        #
        # Intuition:
        # - The greater the difference between peak and low occupancy in a day,
        #   the more volatile the demand is, indicating potential scarcity.
        # - Dividing by capacity normalizes the fluctuation (to stay in [0,1] range).
        # - This fluctuation is added to the base price of 10 to set the final price.
        # - Example: If occ_max = 90, occ_min = 30, cap = 100
        #            => price = 10 + (90 - 30)/100 = 10 + 0.6 = 10.6

        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [11]:
vehicle_weights = {"car": 1.0, "bike": 0.5, "truck": 1.5}

def model_1(prev_price, occ, cap):
    alpha = 2.0
    return prev_price + alpha * (occ / cap if cap else 0)

def model_2(base_price, occ, cap, queue, traffic, is_special, vehicle_type):
    alpha, beta, gamma, delta, epsilon, lambda_ = 1.0, 0.8, 0.5, 1.2, 1.0, 0.5
    vt_weight = vehicle_weights.get(vehicle_type, 1.0)
    demand = (alpha * (occ / cap if cap else 0) +
              beta * queue - gamma * traffic +
              delta * is_special + epsilon * vt_weight)
    norm_demand = min(max(demand, -1), 1)
    return base_price * (1 + lambda_ * norm_demand)

def model_3(base_price, occ, cap, queue, traffic, is_special, vehicle_type, comp_price, dist):
    price2 = model_2(base_price, occ, cap, queue, traffic, is_special, vehicle_type)
    if dist < 0.5:
        return max(base_price * 0.8, comp_price - 1)
    elif dist > 2:
        return min(base_price * 1.5, price2 + 1)
    return price2


In [14]:
def price_plotter(source):
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pricing Models Comparison (Real-Time)",
        x_axis_type="datetime",
        x_axis_label="Time",
        y_axis_label="Price"
    )
    fig.line("t", "price_model_1", source=source, line_width=2, color="blue", legend_label="Model 1")
    fig.line("t", "price_model_2", source=source, line_width=2, color="green", legend_label="Model 2")
    fig.line("t", "price_model_3", source=source, line_width=2, color="red", legend_label="Model 3")
    fig.legend.location = "top_left"
    return fig


In [15]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()



In [16]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()

KeyboardInterrupt: 

## 💡 Integrate Model into Real-Time Loop

In [17]:

# Example simulation loop applying the pricing model
# This should be part of your streaming logic

# Assume a stream of rows like this:
example_stream = [
    {'prev_price': 10, 'base_price': 10, 'occupancy': 8, 'capacity': 10, 'queue': 2,
     'traffic': 3, 'is_special': 1, 'vehicle_type': 'car', 'competitor_price': 9, 'distance': 0.3},
    {'prev_price': 11, 'base_price': 10, 'occupancy': 10, 'capacity': 10, 'queue': 4,
     'traffic': 2, 'is_special': 0, 'vehicle_type': 'truck', 'competitor_price': 12, 'distance': 0.5},
]

# Apply selected pricing model
for row in example_stream:
    if MODEL_NUMBER == 3:
        price, reroute = compute_price(**row)
        print(f"Model 3 → Price: ${price:.2f}, Reroute: {reroute}")
    else:
        price = compute_price(**row)
        print(f"Model {MODEL_NUMBER} → Price: ${price:.2f}")


Model 1 → Price: $11.60
Model 1 → Price: $13.00


## 📈 Real-Time Pricing Visualization with Bokeh

In [18]:

from bokeh.plotting import figure, show, output_notebook
from bokeh.models import ColumnDataSource
from bokeh.layouts import layout
import random
import time

output_notebook()

# Sample simulation
time_steps = list(range(10))
prices = []
prev_price = 10

for t in time_steps:
    # simulate changing values
    row = {
        'prev_price': prev_price,
        'base_price': 10,
        'occupancy': random.randint(4, 10),
        'capacity': 10,
        'queue': random.randint(0, 5),
        'traffic': random.randint(1, 4),
        'is_special': random.randint(0, 1),
        'vehicle_type': random.choice(['car', 'bike', 'truck']),
        'competitor_price': random.uniform(8, 14),
        'distance': random.uniform(0.1, 1.0),
    }

    if MODEL_NUMBER == 3:
        price, _ = compute_price(**row)
    else:
        price = compute_price(**row)

    prices.append(price)
    prev_price = price

# Plotting
source = ColumnDataSource(data=dict(x=time_steps, y=prices))
p = figure(title="Real-Time Price Prediction", x_axis_label='Time Step', y_axis_label='Price ($)', width=700, height=300)
p.line(x='x', y='y', source=source, line_width=3)

show(p)
