# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [1]:
!pip install pathway bokeh --quiet # This cell may take a few seconds to execute.

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn

# Step 1: Importing and Preprocessing the Data

In [3]:
df = pd.read_csv('Modified - modified.csv')
df

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Unnamed: 0.1,Unnamed: 0,SystemCodeNumber,Capacity,Occupancy,LastUpdatedDate,LastUpdatedTime,IsSpecialDay,VehicleType,Latitude,Longitude,TrafficConditionNearby,QueueLength
0,0,BHMBCCMKT01,577,61,04-10-2016,07:59:42,0,car,28.5,77.15,low,2
1,1,BHMBCCMKT01,577,64,04-10-2016,08:25:42,0,car,28.5,77.15,average,2
2,2,BHMBCCMKT01,577,80,04-10-2016,08:59:42,0,car,28.5,77.15,low,2
3,3,BHMBCCMKT01,577,107,04-10-2016,09:32:46,0,car,28.5,77.15,low,3
4,4,BHMBCCMKT01,577,150,04-10-2016,09:59:48,0,car,28.5,77.15,low,3
...,...,...,...,...,...,...,...,...,...,...,...,...
1307,1307,BHMBCCMKT01,577,309,19-12-2016,14:30:33,0,bike,28.5,77.15,average,5
1308,1308,BHMBCCMKT01,577,300,19-12-2016,15:03:34,0,car,28.5,77.15,low,4
1309,1309,BHMBCCMKT01,577,274,19-12-2016,15:29:33,0,truck,28.5,77.15,low,3
1310,1310,BHMBCCMKT01,577,230,19-12-2016,16:03:35,0,cycle,28.5,77.15,low,2


In [4]:
# Combine the 'LastUpdatedDate' and 'LastUpdatedTime' columns into a single datetime column
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                  format='%d-%m-%Y %H:%M:%S')

# Sort the DataFrame by the new 'Timestamp' column and reset the index
df = df.sort_values('Timestamp').reset_index(drop=True)

In [5]:
# Save the selected columns to a CSV file for streaming or downstream processing
df[["Timestamp", "Occupancy", "Capacity"]].to_csv("parking_stream.csv", index=False)

# Note: Only three features are used here for simplicity.
# Participants are expected to incorporate additional relevant features in their models.

In [6]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str   # Timestamp of the observation (should ideally be in ISO format)
    Occupancy: int   # Number of occupied parking spots
    Capacity: int    # Total parking capacity at the location


In [7]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=100)

In [8]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

# Add new columns to the data stream:
# - 't' contains the parsed full datetime
# - 'day' extracts the date part and resets the time to midnight (useful for day-level aggregations)
data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


# Step 2: Making a simple pricing function

# Model 1

In [None]:
# @title Default title text
BASE_PRICE = 10.0
ALPHA = 2.0

# from datetime import timedelta
# import datetime

# base_price = 10.0

# # Step 1: Daily window aggregation
# daily_stats = (
#     data_with_time.windowby(
#         pw.this.t,
#         instance=pw.this.t.dt.floor("D"),  # Corrected
#         window=pw.temporal.tumbling(datetime.timedelta(days=1)),
#         behavior=pw.temporal.exactly_once_behavior()
#     )
#     .reduce(
#         t=pw.this._pw_window_end,
#         occ_max=pw.reducers.max(pw.this.Occupancy),
#         occ_min=pw.reducers.min(pw.this.Occupancy),
#         cap=pw.reducers.max(pw.this.Capacity),
#     )
#     .with_columns(
#         day=pw.this.t.dt.floor("D")  # Also corrected here
#     )
# )


# # Step 2: Shift table forward by one day for previous price reference
# previous_price_table = (
#     daily_stats.select(
#         day=(pw.this.day + datetime.timedelta(days=1)),
#         prev_price=pw.this.price,
#     )
# )

# # Step 3: Join with previous day’s price and compute current price
# daily_with_price = (
#     daily_stats
#     .join(previous_price_table, on=pw.this.day == pw.left.day)
#     .with_columns(
#         price=pw.this.prev_price + 0.5 * (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
#     )
# )

# # Step 4: Handle first day (no previous price)
# final_pricing = daily_with_price.with_columns(
#     price=pw.if_else(
#         pw.this.prev_price.is_none(),
#         base_price + 0.5 * (pw.this.occ_max - pw.this.occ_min) / pw.this.cap,
#         pw.this.price
#     )
# )
from datetime import timedelta

BASE_PRICE = 10.0
ALPHA = 2.0

# Step 1: Stateless pricing function
@pw.udf
def linear_price_update(occupancy, capacity):
    if capacity == 0:
        return BASE_PRICE
    return round(BASE_PRICE + ALPHA * (occupancy / capacity), 2)

# Step 2: Tumbling window using average values
windowed = data_with_time.windowby(
    pw.this.t,
    window=pw.temporal.tumbling(duration=timedelta(minutes=30)),
    behavior=pw.temporal.exactly_once_behavior(),
).reduce(
    timestamp=pw.this._pw_window_end,
    occupancy=pw.reducers.avg(pw.this.Occupancy),
    capacity=pw.reducers.avg(pw.this.Capacity),
)

# Step 3: Apply pricing logic (stateless)
price_updates = windowed.select(
    t=windowed.timestamp,
    price=linear_price_update(windowed.occupancy, windowed.capacity),
)

# Step 4: Output (optional)
# Write price updates to a CSV file
pw.io.csv.write(
    price_updates,"output_prices.csv"
)


# Step 5: Run pipeline
pw.run()


Output()

    https://beartype.readthedocs.io/en/latest/api_roar/#pep-585-deprecations
  warn(


# Model 2

In [9]:
df

Unnamed: 0.1,Unnamed: 0,SystemCodeNumber,Capacity,Occupancy,LastUpdatedDate,LastUpdatedTime,IsSpecialDay,VehicleType,Latitude,Longitude,TrafficConditionNearby,QueueLength,Timestamp
0,0,BHMBCCMKT01,577,61,04-10-2016,07:59:42,0,car,28.5,77.15,low,2,2016-10-04 07:59:42
1,1,BHMBCCMKT01,577,64,04-10-2016,08:25:42,0,car,28.5,77.15,average,2,2016-10-04 08:25:42
2,2,BHMBCCMKT01,577,80,04-10-2016,08:59:42,0,car,28.5,77.15,low,2,2016-10-04 08:59:42
3,3,BHMBCCMKT01,577,107,04-10-2016,09:32:46,0,car,28.5,77.15,low,3,2016-10-04 09:32:46
4,4,BHMBCCMKT01,577,150,04-10-2016,09:59:48,0,car,28.5,77.15,low,3,2016-10-04 09:59:48
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1307,1307,BHMBCCMKT01,577,309,19-12-2016,14:30:33,0,bike,28.5,77.15,average,5,2016-12-19 14:30:33
1308,1308,BHMBCCMKT01,577,300,19-12-2016,15:03:34,0,car,28.5,77.15,low,4,2016-12-19 15:03:34
1309,1309,BHMBCCMKT01,577,274,19-12-2016,15:29:33,0,truck,28.5,77.15,low,3,2016-12-19 15:29:33
1310,1310,BHMBCCMKT01,577,230,19-12-2016,16:03:35,0,cycle,28.5,77.15,low,2,2016-12-19 16:03:35


In [10]:
df.groupby('TrafficConditionNearby').count()

Unnamed: 0_level_0,Unnamed: 0,SystemCodeNumber,Capacity,Occupancy,LastUpdatedDate,LastUpdatedTime,IsSpecialDay,VehicleType,Latitude,Longitude,QueueLength,Timestamp
TrafficConditionNearby,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
average,446,446,446,446,446,446,446,446,446,446,446,446
high,282,282,282,282,282,282,282,282,282,282,282,282
low,584,584,584,584,584,584,584,584,584,584,584,584


In [11]:
df['TrafficConditionNearby'] = pd.Categorical(df['TrafficConditionNearby']).codes

In [12]:
df['TrafficConditionNearby']

Unnamed: 0,TrafficConditionNearby
0,2
1,0
2,2
3,2
4,2
...,...
1307,0
1308,2
1309,2
1310,2


In [13]:
df.groupby('TrafficConditionNearby').count()

Unnamed: 0_level_0,Unnamed: 0,SystemCodeNumber,Capacity,Occupancy,LastUpdatedDate,LastUpdatedTime,IsSpecialDay,VehicleType,Latitude,Longitude,QueueLength,Timestamp
TrafficConditionNearby,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
0,446,446,446,446,446,446,446,446,446,446,446,446
1,282,282,282,282,282,282,282,282,282,282,282,282
2,584,584,584,584,584,584,584,584,584,584,584,584


In [14]:
df[["Timestamp", "Occupancy", "Capacity","QueueLength","TrafficConditionNearby","IsSpecialDay","VehicleType"]].to_csv("model2.csv", index=False)

In [15]:
import pathway as pw
from datetime import timedelta

# Define schema for input CSV
class ParkingSchema(pw.Schema):
    Timestamp: str
    Occupancy: int
    Capacity: int
    QueueLength: int
    TrafficConditionNearby: int
    IsSpecialDay: int           # 0 or 1
    VehicleType: str            # car, bike, truck

# Load streaming data
data = pw.demo.replay_csv("model2.csv", schema=ParkingSchema, input_rate=100)

# Convert timestamp to usable datetime
fmt = "%Y-%m-%d %H:%M:%S"
data_with_time = data.select(
    t = data.Timestamp.dt.strptime(fmt),
    Occupancy = data.Occupancy,
    Capacity = data.Capacity,
    QueueLength = data.QueueLength,
    TrafficConditionNearby = data.TrafficConditionNearby,
    IsSpecialDay = data.IsSpecialDay,
    VehicleType = data.VehicleType,
)

# Windowed aggregation: 30-min tumbling
# Tumbling window without vehicle_type
windowed = data_with_time.windowby(
    pw.this.t,
    window=pw.temporal.tumbling(duration=timedelta(minutes=30)),
    behavior=pw.temporal.exactly_once_behavior(),
).reduce(
    timestamp=pw.this._pw_window_end,
    occupancy=pw.reducers.avg(pw.this.Occupancy),
    capacity=pw.reducers.avg(pw.this.Capacity),
    queue_length=pw.reducers.avg(pw.this.QueueLength),
    traffic=pw.reducers.avg(pw.this.TrafficConditionNearby),
    special_day=pw.reducers.avg(pw.this.IsSpecialDay),
    t_floor = pw.this._pw_window_end,
)

data_floored = data_with_time.select(
    t_floor = pw.this.t.dt.floor("30min"),
    vehicle_type = pw.this.VehicleType)

joined = windowed.join(
    data_floored,
    windowed.t_floor == data_floored.t_floor,
)


# Map vehicle type to numeric weight
@pw.udf
def vehicle_weight(vtype: str) -> float:
    return {"car": 1.0, "bike": 0.5, "truck": 1.5}.get(vtype.lower(), 1.0)

# Add vehicle weight column
joined_with_weight = joined.select(
    t = joined.timestamp,
    occupancy =joined.occupancy,
    capacity = joined.capacity,
    queue = joined.queue_length,
    traffic = joined.traffic,
    special = joined.special_day,
    v_weight = vehicle_weight(joined.vehicle_type)
)

# Define demand function
ALPHA, BETA, GAMMA, DELTA, EPSILON, LAMBDA = 2, 0.5, 1.0, 1.5, 1.0, 0.6
BASE_PRICE = 10.0

@pw.udf
def compute_price(occ, cap, queue, traffic, special, v_weight):
    if cap == 0:
        return BASE_PRICE
    demand = (
        ALPHA * (occ / cap)
        + BETA * queue
        - GAMMA * traffic
        + DELTA * special
        + EPSILON * v_weight
    )
    norm_demand = max(0, min(1, demand / 10))  # Normalize to [0,1]
    price = BASE_PRICE * (1 + LAMBDA * norm_demand)
    return round(price, 2)

# Apply pricing
price_stream = joined_with_weight.select(
    t = joined_with_weight.t,
    price = compute_price(
        joined_with_weight.occupancy,
        joined_with_weight.capacity,
        joined_with_weight.queue,
        joined_with_weight.traffic,
        joined_with_weight.special,
        joined_with_weight.v_weight
    )
)

# Output (to file or use pw.io.print(price_stream) to test)
pw.io.csv.write(price_stream, "model2_prices.csv")

pw.run()


Output()

    https://beartype.readthedocs.io/en/latest/api_roar/#pep-585-deprecations
  warn(


# Model 3

In [16]:
df

Unnamed: 0.1,Unnamed: 0,SystemCodeNumber,Capacity,Occupancy,LastUpdatedDate,LastUpdatedTime,IsSpecialDay,VehicleType,Latitude,Longitude,TrafficConditionNearby,QueueLength,Timestamp
0,0,BHMBCCMKT01,577,61,04-10-2016,07:59:42,0,car,28.5,77.15,2,2,2016-10-04 07:59:42
1,1,BHMBCCMKT01,577,64,04-10-2016,08:25:42,0,car,28.5,77.15,0,2,2016-10-04 08:25:42
2,2,BHMBCCMKT01,577,80,04-10-2016,08:59:42,0,car,28.5,77.15,2,2,2016-10-04 08:59:42
3,3,BHMBCCMKT01,577,107,04-10-2016,09:32:46,0,car,28.5,77.15,2,3,2016-10-04 09:32:46
4,4,BHMBCCMKT01,577,150,04-10-2016,09:59:48,0,car,28.5,77.15,2,3,2016-10-04 09:59:48
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1307,1307,BHMBCCMKT01,577,309,19-12-2016,14:30:33,0,bike,28.5,77.15,0,5,2016-12-19 14:30:33
1308,1308,BHMBCCMKT01,577,300,19-12-2016,15:03:34,0,car,28.5,77.15,2,4,2016-12-19 15:03:34
1309,1309,BHMBCCMKT01,577,274,19-12-2016,15:29:33,0,truck,28.5,77.15,2,3,2016-12-19 15:29:33
1310,1310,BHMBCCMKT01,577,230,19-12-2016,16:03:35,0,cycle,28.5,77.15,2,2,2016-12-19 16:03:35


In [17]:
df[["Timestamp","Latitude","Longitude", "Occupancy", "Capacity","QueueLength","TrafficConditionNearby","IsSpecialDay","VehicleType"]].to_csv("model3.csv", index=False)

In [18]:
import math


# --- Step 1: Schema ---
class LotSchema(pw.Schema):
    Timestamp: str
    Latitude: float
    Longitude: float
    Occupancy: int
    Capacity: int
    QueueLength: int
    TrafficConditionNearby: float
    IsSpecialDay: int
    VehicleType: str

# --- Step 2: Load data in static mode (no hang) ---
data = pw.demo.replay_csv("model3.csv", schema=LotSchema, input_rate=100)
# --- Step 2: Parse time ---
fmt = "%Y-%m-%d %H:%M:%S"
data_with_time = data.select(
    t = data.Timestamp.dt.strptime(fmt),
    lat = data.Latitude,
    lon = data.Longitude,
    occ = data.Occupancy,
    cap = data.Capacity,
    queue = data.QueueLength,
    traffic = data.TrafficConditionNearby,
    special = data.IsSpecialDay,
    vtype = data.VehicleType
)

# --- Step 3: Add t_floor for 30-min windows ---
floored = data_with_time.select(
    t_floor = pw.this.t.dt.floor("30min"),
    **{col: pw.this[col] for col in data_with_time.schema if col != "t"}
)

# --- Step 4: Demand Price (Model 2 core) ---
@pw.udf
def vtype_weight(v):
    return {"car": 1.0, "bike": 0.5, "truck": 1.5}.get(v.lower(), 1.0)

@pw.udf
def demand_price(occ, cap, queue, traffic, special, w):
    base = 10.0
    if cap == 0:
        return base
    demand = 2 * (occ / cap) + 0.5 * queue - 1 * traffic + 1.5 * special + 1 * w
    demand = max(0, min(1, demand / 10))
    return round(base * (1 + 0.6 * demand), 2)

with_price = floored.select(
    t = pw.this.t_floor,
    LotID = pw.this.LotID,
    lat = pw.this.lat,
    lon = pw.this.lon,
    base_price = demand_price(
        pw.this.occ,
        pw.this.cap,
        pw.this.queue,
        pw.this.traffic,
        pw.this.special,
        vtype_weight(pw.this.vtype)
    )
)

# --- Step 5: Compute Nearby Lot Prices ---
RADIUS_KM = 0.5

@pw.udf
def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Earth radius km
    dlat = math.radians(lat2 - lat1)
    dlon = math.radians(lon2 - lon1)
    a = math.sin(dlat/2)**2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2)**2
    return R * 2 * math.asin(math.sqrt(a))

# Self-join to find nearby competitors
competitors = with_price.join(
    with_price,
    how="inner",
    on=with_price.t == with_price.t,
).select(
    lot = pw.left.LotID,
    t = pw.left.t,
    self_price = pw.left.base_price,
    neighbor_id = pw.right.LotID,
    neighbor_price = pw.right.base_price,
    dist = haversine(pw.left.lat, pw.left.lon, pw.right.lat, pw.right.lon)
).filter(
    (pw.this.lot != pw.this.neighbor_id) & (pw.this.dist <= RADIUS_KM)
)

# --- Step 6: Adjust price based on competition ---
@pw.udf
def adjust_price(self_price, neighbor_prices):
    if not neighbor_prices:
        return self_price
    avg_neighbor = sum(neighbor_prices) / len(neighbor_prices)
    if self_price > avg_neighbor:
        return round(self_price - 1.0, 2)  # reduce if overpriced
    elif self_price < avg_neighbor:
        return round(self_price + 0.5, 2)  # raise if underpriced
    return self_price

# Group competitor prices per lot and timestamp
adjusted = competitors.groupby(pw.this.lot, pw.this.t).reduce(
    lot = pw.this.lot,
    t = pw.this.t,
    self_price = pw.this.self_price.first(),
    competitor_prices = pw.reducers.collect(pw.this.neighbor_price)
).select(
    lot = pw.this.lot,
    t = pw.this.t,
    final_price = adjust_price(pw.this.self_price, pw.this.competitor_prices)
)

# --- Step 7: Write output ---
pw.io.csv.write(adjusted, "model3_prices.csv")
pw.run()


KeyError: 0