In [None]:
# 📦 Install necessary packages
!pip install pathway bokeh panel --quiet

# 📥 Import libraries
import pathway as pw
import datetime
import pandas as pd
import numpy as np
import bokeh.plotting
import panel as pn
import sys
import contextlib
pn.extension()

In [None]:
# Load and enrich original data
df = pd.read_csv("dataset.csv")
df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'], format='%d-%m-%Y %H:%M:%S')

# Add necessary demand features
def encode_traffic(x): return {"low": 0, "average": 0.5, "high": 1}.get(x, 0.5)
def encode_vehicle(x): return {"bike": 0.5, "car": 1.0, "truck": 1.5, "cycle": 0.2}.get(x, 1.0)

df['TrafficLevel'] = df['TrafficConditionNearby'].apply(encode_traffic)
df['VehicleWeight'] = df['VehicleType'].apply(encode_vehicle)
df['OccupancyRatio'] = df['Occupancy'] / df['Capacity']
df = df.sort_values("Timestamp")

# Save as simulated stream
df[['Timestamp', 'SystemCodeNumber', 'OccupancyRatio', 'QueueLength', 'TrafficLevel', 'IsSpecialDay', 'VehicleWeight']].to_csv("realtime_stream.csv", index=False)

In [None]:
# Define schema
class StreamSchema(pw.Schema):
    Timestamp: str
    SystemCodeNumber: str
    OccupancyRatio: float
    QueueLength: int
    TrafficLevel: float
    IsSpecialDay: int
    VehicleWeight: float

# Ingest with delay to simulate real-time
stream = pw.demo.replay_csv(
    "realtime_stream.csv",
    schema=StreamSchema,
    input_rate=100  # stream speed
)

In [None]:
# Parse timestamp
fmt = "%Y-%m-%d %H:%M:%S"
stream = stream.with_columns(
    t = stream.Timestamp.dt.strptime(fmt),
    day = stream.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)

# Weighted demand calculation
alpha, beta, gamma, delta, epsilon = 0.1, 0.05, 0.02, 0.02, 0.03
base_price = 10
lambda_scale = 1.5

windowed = (
    stream.windowby(
        time_expr=pw.this.t,
        instance=pw.this.SystemCodeNumber + pw.this.day,
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        SystemCodeNumber = pw.reducers.any(pw.this.SystemCodeNumber),
        day = pw.reducers.any(pw.this.day),
        t = pw.this._pw_window_end,
        sum_demand = pw.reducers.sum(
            alpha * pw.this.OccupancyRatio +
            beta * pw.this.QueueLength +
            gamma * pw.this.TrafficLevel +
            delta * pw.this.IsSpecialDay +
            epsilon * pw.this.VehicleWeight
        ),
        count = pw.reducers.count()
    )
)

windowed = windowed.with_columns(
    demand = pw.this.sum_demand / pw.this.count,
    demand_normalized = pw.apply(lambda d: max(0, min(1, d)), pw.this.sum_demand / pw.this.count),
    price = pw.apply(lambda d: base_price * (1 + lambda_scale * d), pw.this.demand_normalized)
)

In [None]:
# Extract list of lots
lots = pd.read_csv("realtime_stream.csv")["SystemCodeNumber"].unique().tolist()

def plot_price_for_lot(lot):
    df = windowed.select(
        t = pw.this.t,
        price = pw.this.price,
        SystemCodeNumber = pw.this.SystemCodeNumber
    ).filter(pw.this.SystemCodeNumber == lot)

    def make_plot(source):
        fig = bokeh.plotting.figure(
            title=f"Live Price – {lot}",
            x_axis_type="datetime", height=300, width=700
        )
        fig.line("t", "price", source=source, line_width=2)
        fig.circle("t", "price", source=source, size=6)
        return fig

    return df.plot(make_plot, sorting_col="t")

# Create panel layout of 14 plots
plots = [plot_price_for_lot(lot) for lot in lots]
pn.Column("## Demand-Based Pricing for All Parking Lots", *plots).servable()

In [None]:
# Save results as JSON and run
pw.io.jsonlines.write(
    windowed.select(t = pw.this.t, SystemCodeNumber = pw.this.SystemCodeNumber, price = pw.this.price),
    "streaming_output_model2.jsonl"
)

# Execute
with contextlib.redirect_stdout(sys.stdout), contextlib.redirect_stderr(sys.stderr):
    pw.run().await_termination()