# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [None]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m149.4/149.4 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m69.7/69.7 MB[0m [31m11.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.6/77.6 kB[0m [31m6.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m777.6/777.6 kB[0m [31m47.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.2/139.2 kB[0m [31m12.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m26.5/26.5 MB[0m [31m97.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.5/45.5 kB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [None]:
from google.colab import files
uploaded = files.upload()

Saving dataset.csv to dataset.csv


In [None]:
df = pd.read_csv('dataset.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00
...,...,...,...,...,...,...,...,...,...,...,...,...
18363,18363,Shopping,1920,26.150504,91.733531,1517,truck,average,6,0,19-12-2016,14:30:00
18364,18364,Shopping,1920,26.150504,91.733531,1487,car,low,3,0,19-12-2016,15:03:00
18365,18365,Shopping,1920,26.150504,91.733531,1432,cycle,low,3,0,19-12-2016,15:29:00
18366,18366,Shopping,1920,26.150504,91.733531,1321,car,low,2,0,19-12-2016,16:03:00


In [None]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [None]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["Timestamp", "Occupancy", "Capacity"]].to_csv("parking_stream.csv", index=False)

# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [None]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location


In [None]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

In [None]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Step 2: Making a simple pricing function

In [None]:
import pathway as pw

# Parameters
ALPHA = 2.0
BASE_PRICE = 10.0
MAX_PRICE = 2 * BASE_PRICE
MIN_PRICE = 0.5 * BASE_PRICE

# Define pricing function
@pw.udf
def compute_price(occupancy: int, capacity: int) -> float:
    if capacity == 0:
        return BASE_PRICE
    raw_price = BASE_PRICE + ALPHA * (occupancy / capacity)
    return max(min(raw_price, MAX_PRICE), MIN_PRICE)

# Apply pricing logic directly on streamed table
# Assuming `streamed_data` has 'occupancy' and 'capacity' columns
pricing_table = data.with_columns(
    price=compute_price(data.Occupancy, data.Capacity)
)

# Optional: write to file for output check or visualization
pw.io.jsonlines.write(pricing_table, '/tmp/model1_output.jsonl')


In [None]:
pw.run()


Output()



In [None]:
!head /tmp/model1_output.jsonl


{"Timestamp":"2016-10-23 11:26:00","Occupancy":600,"Capacity":2937,"price":10.408580183861083,"diff":1,"time":1751798936506}
{"Timestamp":"2016-10-23 13:59:00","Occupancy":280,"Capacity":1200,"price":10.466666666666667,"diff":1,"time":1751798936506}
{"Timestamp":"2016-10-23 09:59:00","Occupancy":185,"Capacity":2803,"price":10.132001427042454,"diff":1,"time":1751798936506}
{"Timestamp":"2016-10-23 11:59:00","Occupancy":313,"Capacity":387,"price":11.617571059431524,"diff":1,"time":1751798936506}
{"Timestamp":"2016-10-22 16:26:00","Occupancy":154,"Capacity":470,"price":10.65531914893617,"diff":1,"time":1751798936506}
{"Timestamp":"2016-10-23 09:26:00","Occupancy":584,"Capacity":3883,"price":10.300798351789853,"diff":1,"time":1751798936506}
{"Timestamp":"2016-10-22 16:26:00","Occupancy":1073,"Capacity":3103,"price":10.69158878504673,"diff":1,"time":1751798936506}
{"Timestamp":"2016-10-23 12:26:00","Occupancy":119,"Capacity":690,"price":10.344927536231884,"diff":1,"time":1751798936506}
{"Ti

# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [None]:
import pandas as pd
from bokeh.plotting import figure, show, output_notebook
from bokeh.models import ColumnDataSource

# Enable Bokeh output in notebook
output_notebook()

# Load the output JSONL written by Pathway
df = pd.read_json('/tmp/model1_output.jsonl', lines=True)

# Convert Timestamp to datetime (if not already)
df['Timestamp'] = pd.to_datetime(df['Timestamp'])

# Sort by time for clean plotting
df = df.sort_values('Timestamp')

# Create a ColumnDataSource for Bokeh
source = ColumnDataSource(df)

# Build the plot
fig = figure(title="Price Evolution Over Time",
             x_axis_type="datetime",
             width=800, height=400)

fig.line(x='Timestamp', y='price', source=source, line_width=2, color='navy', legend_label="Price")
fig.circle(x='Timestamp', y='price', source=source, size=6, color='red')

fig.xaxis.axis_label = "Time"
fig.yaxis.axis_label = "Price (USD)"
fig.legend.location = "top_left"

# Show the plot
show(fig)




In [None]:
# Start the Pathway pipeline execution in the background
# - This triggers the real-time data stream processing defined above
# - %%capture --no-display suppresses output in the notebook interface

%%capture --no-display
pw.run()

Output()

