# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [1]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.8/60.8 MB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m45.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m70.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.3/135.3 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m244.6/244.6 kB[0m [31m17.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [6]:
df = pd.read_csv('/content/parking_stream.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,SystemCodeNumber,Capacity,Occupancy,IsSpecialDay,VehicleType,Latitude,Longitude,TrafficConditionNearby,QueueLength,Timestamp,LastUpdatedDate,LastUpdatedTime
0,0,577,61,0,2,28.5,77.15,3,2,2016-10-04 07:59:42,04-10-2016,07:59:42
1,0,577,64,0,2,28.5,77.15,1,2,2016-10-04 08:25:42,04-10-2016,08:25:42
2,0,577,80,0,2,28.5,77.15,3,2,2016-10-04 08:59:42,04-10-2016,08:59:42
3,0,577,107,0,2,28.5,77.15,3,3,2016-10-04 09:32:46,04-10-2016,09:32:46
4,0,577,150,0,2,28.5,77.15,3,3,2016-10-04 09:59:48,04-10-2016,09:59:48
...,...,...,...,...,...,...,...,...,...,...,...,...
1307,0,577,309,0,1,28.5,77.15,1,5,2016-12-19 14:30:33,19-12-2016,14:30:33
1308,0,577,300,0,2,28.5,77.15,3,4,2016-12-19 15:03:34,19-12-2016,15:03:34
1309,0,577,274,0,4,28.5,77.15,3,3,2016-12-19 15:29:33,19-12-2016,15:29:33
1310,0,577,230,0,3,28.5,77.15,3,2,2016-12-19 16:03:35,19-12-2016,16:03:35


In [7]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [8]:
from sklearn.preprocessing import LabelEncoder

# List your three columns here
cols_to_encode = ['SystemCodeNumber', 'VehicleType', 'TrafficConditionNearby']

# Loop through each column and encode in-place, starting from 1
for col in cols_to_encode:
    le = LabelEncoder()
    df[col] = le.fit_transform(df[col]) + 1  # +1 to start labels from 1

    # Optional: print mapping for reference
    mapping = dict(zip(le.classes_, le.transform(le.classes_) + 1))
    print(f"Mapping for {col}: {mapping}")


Mapping for SystemCodeNumber: {np.int64(0): np.int64(1)}
Mapping for VehicleType: {np.int64(1): np.int64(1), np.int64(2): np.int64(2), np.int64(3): np.int64(3), np.int64(4): np.int64(4)}
Mapping for TrafficConditionNearby: {np.int64(1): np.int64(1), np.int64(2): np.int64(2), np.int64(3): np.int64(3)}


In [9]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[['SystemCodeNumber', 'Capacity', 'Occupancy',
       'LastUpdatedDate', 'LastUpdatedTime', 'IsSpecialDay', 'VehicleType',
       'Latitude', 'Longitude', 'TrafficConditionNearby', 'QueueLength',
       'Timestamp']].to_csv("parking_stream.csv", index=False)

# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [10]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location
    QueueLength: int # Length of the queue for parking spots
    SystemCodeNumber: str # Unique identifier for the parking location
    LastUpdatedDate: str # Date when the data was last updated
    LastUpdatedTime: str # Time when the data was last updated
    IsSpecialDay: int
    VehicleType: int
    Latitude: float
    Longitude: float
    TrafficConditionNearby: int


In [11]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [12]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Step 2: Making a simple pricing function

In [13]:
import datetime

alpha = 1.0
beta = 0.2
gamma = 0.1
delta = 0.3
epsilon = 0.15
lambda_ = 0.7
base_price = 10
demand_min = 0.0  # You may want to compute these from data
demand_max = 5.0  # Adjust based on expected demand range
min_price = 0.5 * base_price
max_price = 2 * base_price

# Step 1: Window and reduce
delta_window = (
    data_with_time.windowby(
        pw.this.t,
        instance=pw.this.day,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t= pw.this._pw_window_end,  # <--- This line ensures 't' is present!
        occ_sum = pw.reducers.sum(pw.this.Occupancy),
        cap_sum = pw.reducers.sum(pw.this.Capacity),
        queue_avg = pw.reducers.avg(pw.this.QueueLength),
        traffic_avg = pw.reducers.avg(pw.this.TrafficConditionNearby),
        special_avg = pw.reducers.avg(pw.this.IsSpecialDay),
        veh_avg = pw.reducers.avg(pw.this.VehicleType),
        n = pw.reducers.count()
    )
)

# Step 1: Compute occ_rate
delta_window = delta_window.with_columns(
    occ_rate = pw.this.occ_sum / pw.this.cap_sum
)

# Step 2: Compute demand
delta_window = delta_window.with_columns(
    demand = alpha * pw.this.occ_rate
          + beta * pw.this.queue_avg
          - gamma * pw.this.traffic_avg
          + delta * pw.this.special_avg
          + epsilon * pw.this.veh_avg
)

# Step 3a: Normalize demand
delta_window = delta_window.with_columns(
    norm_demand = (pw.this.demand - demand_min) / (demand_max - demand_min)
)

# Step 3b: Compute raw_price using norm_demand
delta_window = delta_window.with_columns(
    raw_price = base_price * (1 + lambda_ * pw.this.norm_demand)
)

# Step 3c: Bound the price
delta_window = delta_window.with_columns(
    price = pw.if_else(
        pw.this.raw_price < min_price, min_price,
        pw.if_else(
            pw.this.raw_price > max_price, max_price,
            pw.this.raw_price
        )
    )
)

# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [14]:
# Activate the Panel extension to enable interactive visualizations
pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()



In [15]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()

