# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [15]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

In [16]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [17]:
df = pd.read_csv('/content/dataset.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [18]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [19]:
# Save the selected columns to a CSV file for streaming or downstream processing
selected_columns = [
    "Timestamp",
    "SystemCodeNumber",         # Parking Lot ID
    "Occupancy",
    "Capacity",
    "QueueLength",
    "VehicleType",
    "TrafficConditionNearby",
    "IsSpecialDay"
]

# Save to CSV for streaming
df[selected_columns].to_csv("parking_stream.csv", index=False)

# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [20]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream
import pathway as pw

class ParkingSchema(pw.Schema):
    Timestamp: str                   # Observation time
    SystemCodeNumber: str            # Parking lot ID
    Occupancy: int                   # Occupied slots
    Capacity: int                    # Total slots
    QueueLength: int                 # Vehicles waiting
    VehicleType: str                 # car, bike, truck
    TrafficConditionNearby: str      # low, medium, high
    IsSpecialDay: int                # 1 or 0


In [21]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [22]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't': parsed datetime
# - 'day': date string in ISO format for grouping by day
# - 'lot_id': the parking lot identifier (SystemCodeNumber)

data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00"),
    lot_id = data.SystemCodeNumber  # NEW: to group by parking lot
)



# Step 2: Making a simple pricing function

In [24]:
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Use parsed timestamp for windowing
        instance=(pw.this.day, pw.this.lot_id),  # Group by both day and lot
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t=pw.this._pw_window_end,                        # Timestamp at end of each window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Daily max occupancy
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Daily min occupancy
        cap=pw.reducers.max(pw.this.Capacity),           # Max capacity (usually constant)
        lot_id=pw.reducers.any(pw.this.lot_id)         # Retain lot ID for plotting
    )
    .with_columns(
        Price = 10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap  # Dynamic price
    )
)

# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [25]:
base_price = 10.0
alpha = 0.5

# Model 1 pricing: price increases linearly with occupancy
delta_window = (
    data_with_time
    .windowby(
        pw.this.t,
        instance=pw.this.lot_id + "_" + pw.this.day,
        window=pw.temporal.tumbling(datetime.timedelta(minutes=30)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t=pw.this._pw_window_end,
        occ=pw.reducers.max(pw.this.Occupancy),   # used max instead of last
        cap=pw.reducers.max(pw.this.Capacity),    # max or any – both ok
        lot_id=pw.reducers.any(pw.this.lot_id)    # to retain lot info
    )
    .with_columns(
        price = base_price + alpha * (pw.this.occ / pw.this.cap)
    )
)


In [26]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()



In [None]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()