# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [3]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.
!pip install panel



In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn


# Step 1: Importing and Preprocessing the Data

In [6]:
df = pd.read_csv('dataset.csv')
df
df.isna().sum()  # Check for missing values
df.VehicleType.unique()  # Check unique vehicle types
df.TrafficConditionNearby.unique()  # Check unique traffic conditions
df['VehicleType']= df['VehicleType'].replace({'car': 2, 'bike':1 , 'truck': 3, 'cycle': 0})
df['TrafficConditionNearby'] = df['TrafficConditionNearby'].replace({'low': 0, 'average': 1, 'high': 2})
df['Occupancy'] = df['Occupancy'].astype(float)
df['Capacity'] = df['Capacity'].astype(float)
df['QueueLength'] = df['QueueLength'].astype(float)
df['TrafficConditionNearby'] = df['TrafficConditionNearby'].astype(float)
df['IsSpecialDay'] = df['IsSpecialDay'].astype(float)
df['VehicleType'] = df['VehicleType'].astype(float)


  df['VehicleType']= df['VehicleType'].replace({'car': 2, 'bike':1 , 'truck': 3, 'cycle': 0})
  df['TrafficConditionNearby'] = df['TrafficConditionNearby'].replace({'low': 0, 'average': 1, 'high': 2})


In [7]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)



In [8]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["Timestamp", "Occupancy", "Capacity","QueueLength","TrafficConditionNearby","IsSpecialDay","VehicleType"]].to_csv("parking_stream.csv", index=False)

# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [9]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: float   # Number of occupied parking spots
    Capacity: float
    QueueLength: float  # Length of the queue for parking spots
    TrafficConditionNearby: float # Traffic condition nearby (0: low, 1: medium, 2: high)
    IsSpecialDay: float  # Indicator for special days (0: no, 1: yes)
    VehicleType: float  # Type of vehicle 

In [10]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [11]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Step 2: Making a simple pricing function

In [13]:
# Define a daily tumbling window over the data stream using Pathway
# This block performs temporal aggregation and computes a dynamic price for each day
import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,  # Event time column to use for windowing (parsed datetime)
        instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
        window=pw.temporal.tumbling(datetime.timedelta(hours = 1)),  # Fixed-size daily window
        behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
    )
    .reduce(
        t=pw.this._pw_window_end,                        
        occ_max=pw.reducers.max(pw.this.Occupancy),      
        occ_min=pw.reducers.min(pw.this.Occupancy),      
        cap=pw.reducers.max(pw.this.Capacity),           
        queue_length_max=pw.reducers.max(pw.this.QueueLength),  
        traffic_condition_max=pw.reducers.max(pw.this.TrafficConditionNearby),  
        is_special_day=pw.reducers.max(pw.this.IsSpecialDay),  
        vehicle_type_max=pw.reducers.max(pw.this.VehicleType)  
    )
    .with_columns(
        # Model-2
        price = 10 + ((pw.this.occ_max) / pw.this.cap) + (pw.this.traffic_condition_max) + (pw.this.queue_length_max * 0.2)  + (pw.this.is_special_day) + (pw.this.vehicle_type_max*0.5)
    )
)


# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [16]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()



In [17]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface


pw.run()

Output()