# Inroduction


This sample notebook demonstrates how to process live data streams using Pathway. The dataset used here is a subset of the one provided — specifically, it includes data for only a single parking spot. You are expected to implement your model across all parking spots.

Please note that the pricing model used in this notebook is a simple baseline. You are expected to design and implement a more advanced and effective model.


In [1]:
!pip install pathway bokeh panel --quiet

print("Hello")

^C
[31mERROR: Operation cancelled by user[0m[31m
[0mHello


In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting
import panel as pn
import numpy as np
import pandas as pd
import pathway as pw
import bokeh.plotting
import panel as pn
from bokeh.io import output_notebook, show

print("Hello")

  __import__("pkg_resources").declare_namespace(__name__)  # type: ignore


Hello


# Step 1: Importing and Preprocessing the Data

In [2]:
# Load the dataset (replace with your actual file if needed)
df = pd.read_csv('dataset.csv')

# Combine date and time columns if needed
if 'LastUpdatedDate' in df.columns and 'LastUpdatedTime' in df.columns:
    df['Timestamp'] = pd.to_datetime(
        df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
        format='%d-%m-%Y %H:%M:%S'
    )
else:
    df['Timestamp'] = pd.to_datetime(df['Timestamp'])

# Sort by time
df = df.sort_values('Timestamp').reset_index(drop=True)

# Ensure a unique lot identifier column (LotID)
if 'LotID' not in df.columns:
    df['LotID'] = df['Latitude'].astype(str) + '_' + df['Longitude'].astype(str)

# Save a streaming-ready CSV for Pathway
df[["Timestamp", "Occupancy", "Capacity", "LotID"]].to_csv("parking_stream.csv", index=False)

print("Hello")
df.columns

# You can find the sample dataset here: https://drive.google.com/file/d/1D479FLjp9aO3Mg8g6Lpj9oRViWacurA6/view?usp=sharing

Hello


Index(['ID', 'SystemCodeNumber', 'Capacity', 'Latitude', 'Longitude',
       'Occupancy', 'VehicleType', 'TrafficConditionNearby', 'QueueLength',
       'IsSpecialDay', 'LastUpdatedDate', 'LastUpdatedTime', 'Timestamp',
       'LotID'],
      dtype='object')

In [3]:
# Define the schema for the streaming data using Pathway
# This schema specifies the expected structure of each data row in the stream

class ParkingSchema(pw.Schema):
    Timestamp: str
    Occupancy: int
    Capacity: int
    LotID: str
print("Hello World")

Hello World


In [4]:
# Load the data as a simulated stream using Pathway's replay_csv function
# This replays the CSV data at a controlled input rate to mimic real-time streaming
# input_rate=1000 means approximately 1000 rows per second will be ingested into the stream.

data = pw.demo.replay_csv(
    "parking_stream.csv",
    schema=ParkingSchema,
    input_rate=100
)
print("Hello")

Hello


In [5]:
# Define the datetime format to parse the 'Timestamp' column
fmt = "%Y-%m-%d %H:%M:%S"

data_with_time = data.with_columns(
    t = data.Timestamp.dt.strptime(fmt),
    day = data.Timestamp.dt.strptime(fmt).dt.strftime("%Y-%m-%dT00:00:00")
)


print("Hello")

Hello


# Step 2: Making a simple pricing function

In [6]:
# # Define a daily tumbling window over the data stream using Pathway
# # This block performs temporal aggregation and computes a dynamic price for each day
# import datetime

# delta_window = (
#     data_with_time.windowby(
#         pw.this.t,  # Event time column to use for windowing (parsed datetime)
#         instance=pw.this.day,  # Logical partitioning key: one instance per calendar day
#         window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # Fixed-size daily window
#         behavior=pw.temporal.exactly_once_behavior()  # Guarantees exactly-once processing semantics
#     )
#     .reduce(
#         t=pw.this._pw_window_end,                        # Assign the end timestamp of each window
#         occ_max=pw.reducers.max(pw.this.Occupancy),      # Highest occupancy observed in the window
#         occ_min=pw.reducers.min(pw.this.Occupancy),      # Lowest occupancy observed in the window
#         cap=pw.reducers.max(pw.this.Capacity),           # Maximum capacity observed (typically constant per spot)
#     )
#     .with_columns(
#         # Compute the price using a simple dynamic pricing formula:
#         #
#         # Pricing Formula:
#         #     price = base_price + demand_fluctuation
#         #     where:
#         #         base_price = 10 (fixed minimum price)
#         #         demand_fluctuation = (occ_max - occ_min) / cap
#         #
#         # Intuition:
#         # - The greater the difference between peak and low occupancy in a day,
#         #   the more volatile the demand is, indicating potential scarcity.
#         # - Dividing by capacity normalizes the fluctuation (to stay in [0,1] range).
#         # - This fluctuation is added to the base price of 10 to set the final price.
#         # - Example: If occ_max = 90, occ_min = 30, cap = 100
#         #            => price = 10 + (90 - 30)/100 = 10 + 0.6 = 10.6

#         price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
#     )
# )

# print("Hello")






import datetime

delta_window = (
    data_with_time.windowby(
        pw.this.t,                 # Event time column for windowing
        instance=pw.this.day,      # Partition by calendar day
        window=pw.temporal.tumbling(datetime.timedelta(days=1)),  # 1-day tumbling window
        behavior=pw.temporal.exactly_once_behavior()
    )
    .reduce(
        t=pw.this._pw_window_end,                        # End timestamp of window
        occ_max=pw.reducers.max(pw.this.Occupancy),      # Max occupancy in window
        occ_min=pw.reducers.min(pw.this.Occupancy),      # Min occupancy in window
        cap=pw.reducers.max(pw.this.Capacity),           # Max capacity (usually constant)
    )
    .with_columns(
        price=10 + (pw.this.occ_max - pw.this.occ_min) / pw.this.cap
    )
)

print("Hello")


Hello




# Step 3: Visualizing Daily Price Fluctuations with a Bokeh Plot

**Note:** The Bokeh plot in the next cell will only be generated after you run the `pw.run()` cell (i.e., the final cell).


In [7]:
# # Activate the Panel extension to enable interactive visualizations
# pn.extension()

# Define a custom Bokeh plotting function that takes a data source (from Pathway) and returns a figure
def price_plotter(source):
    # Create a Bokeh figure with datetime x-axis
    fig = bokeh.plotting.figure(
        height=400,
        width=800,
        title="Pathway: Daily Parking Price",
        x_axis_type="datetime",  # Ensure time-based data is properly formatted on the x-axis
    )
    # Plot a line graph showing how the price evolves over time
    fig.line("t", "price", source=source, line_width=2, color="navy")

    # Overlay red circles at each data point for better visibility
    fig.circle("t", "price", source=source, size=6, color="red")

    return fig

# Use Pathway's built-in .plot() method to bind the data stream (delta_window) to the Bokeh plot
# - 'price_plotter' is the rendering function
# - 'sorting_col="t"' ensures the data is plotted in time order
viz = delta_window.plot(price_plotter, sorting_col="t")

# Create a Panel layout and make it servable as a web app
# This line enables the interactive plot to be displayed when the app is served
pn.Column(viz).servable()
print("Hello")



# pn.extension()

# def price_plotter(source):
#     fig = bokeh.plotting.figure(
#         height=400,
#         width=800,
#         title="Pathway: Daily Parking Price",
#         x_axis_type="datetime",
#     )
#     fig.line("t", "price", source=source, line_width=2, color="navy")
#     fig.scatter("t", "price", source=source, size=6, color="red")
#     return fig

# viz = delta_window.plot(price_plotter, sorting_col="t")
# pn.Column(viz).servable()
# print("Hello")


Hello




In [None]:


# # Start the Pathway pipeline execution in the background
# # - This triggers the real-time data stream processing defined above
# # - %%capture --no-display suppresses output in the notebook interface

# %%capture --no-display
pw.run()
print("Hello")



Output()