# Parky: Dynamic Pricing for Urban Parking Lots

This notebook implements a dynamic pricing engine for urban parking lots
using real-time data streams with Pathway and visualizes the results
with Bokeh. The pricing model is built from scratch using NumPy and Pandas,
considering demand, competition, and real-time conditions across all
14 parking spaces.

# Step 1: Setup and Imports

In [1]:
!pip install pathway bokeh --quiet

In [2]:
# Importing essential libraries
import numpy as np
import pandas as pd
import datetime
from datetime import datetime
import pathway as pw
import bokeh.plotting as bp
import panel as pn
from bokeh.models import ColumnDataSource, DatetimeTickFormatter
from bokeh.plotting import figure, show
from bokeh.layouts import column
from sklearn.linear_model import LinearRegression
from functools import partial

# **Step 2: Load Data and Initial Preprocessing**
Loading dataset.csv. For the sake of simulation with Pathway, we'll treat this static file as our stream source.

In [3]:
# Dataset : https://drive.google.com/file/d/1-60mIjS1sDvBJR8P0Zh4fGdFOqJh0AwY/view?usp=drive_link

file_path = '/content/dataset.csv'
try:
    # Read the entire CSV file into a pandas DataFrame
    df = pd.read_csv(file_path)

    # Display the first 5 rows of the dataset to verify it's loaded correctly.
    print("Successfully loaded the dataset. Here are the first 5 rows:")
    print(df.head())

except FileNotFoundError:
    print(f"Error: The file '{file_path}' was not found.")
except Exception as e:
    print(f"An error occurred while reading the file: {e}")

Successfully loaded the dataset. Here are the first 5 rows:
   ID SystemCodeNumber  Capacity   Latitude  Longitude  Occupancy VehicleType  \
0   0      BHMBCCMKT01       577  26.144536  91.736172         61         car   
1   1      BHMBCCMKT01       577  26.144536  91.736172         64         car   
2   2      BHMBCCMKT01       577  26.144536  91.736172         80         car   
3   3      BHMBCCMKT01       577  26.144536  91.736172        107         car   
4   4      BHMBCCMKT01       577  26.144536  91.736172        150        bike   

  TrafficConditionNearby  QueueLength  IsSpecialDay LastUpdatedDate  \
0                    low            1             0      04-10-2016   
1                    low            1             0      04-10-2016   
2                    low            2             0      04-10-2016   
3                    low            2             0      04-10-2016   
4                    low            2             0      04-10-2016   

  LastUpdatedTime  
0     

In [4]:
 df['Timestamp'] = pd.to_datetime(df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
                                     format='%d-%m-%Y %H:%M:%S')
print("\n'LastUpdatedDate' and 'LastUpdatedTime' columns combined into 'Timestamp'.")

# Sort the DataFrame by the new 'Timestamp' column and reset the index

df = df.sort_values('Timestamp').reset_index(drop=True)
print("DataFrame sorted by 'Timestamp' and index reset.")


'LastUpdatedDate' and 'LastUpdatedTime' columns combined into 'Timestamp'.
DataFrame sorted by 'Timestamp' and index reset.


# Step 3: Feature Engineering Functions (Pure Python/Numpy/Pandas)
These will be applied to each incoming record (row)

In [5]:
# Define the list of relevant columns based on the problem statement
relevant_columns = [
    "Timestamp",
    "SystemCodeNumber",
    "Capacity",
    "Latitude",
    "Longitude",
    "Occupancy",
    "VehicleType",
    "TrafficConditionNearby",
    "QueueLength",
    "IsSpecialDay"
]

# Select only the relevant columns
df_stream = df[relevant_columns]

# Save the selected columns to a CSV file for streaming or downstream processing
df_stream.to_csv("parking_stream.csv", index=False)

print(f"Selected {len(relevant_columns)} columns saved to 'parking_stream.csv'.")
print("Columns saved:")
for col in relevant_columns:
    print(f"- {col}")
df_stream.head()

Selected 10 columns saved to 'parking_stream.csv'.
Columns saved:
- Timestamp
- SystemCodeNumber
- Capacity
- Latitude
- Longitude
- Occupancy
- VehicleType
- TrafficConditionNearby
- QueueLength
- IsSpecialDay


Unnamed: 0,Timestamp,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay
0,2016-10-04 07:59:00,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0
1,2016-10-04 07:59:00,BHMNCPHST01,1200,26.140014,91.731,237,bike,low,2,0
2,2016-10-04 07:59:00,BHMMBMMBX01,687,20.000035,78.000003,264,car,low,2,0
3,2016-10-04 07:59:00,BHMNCPNST01,485,26.140048,91.730972,249,car,low,2,0
4,2016-10-04 07:59:00,Shopping,1920,26.150504,91.733531,614,cycle,low,2,0


In [6]:
print("\n--- Defining Pathway Schema ---")

# Define the schema for the streaming data using Pathway.

class ParkingSchema(pw.Schema):
    Timestamp: str
    SystemCodeNumber: str
    Capacity: int
    Latitude: float
    Longitude: float
    Occupancy: int
    VehicleType: str
    TrafficConditionNearby: str
    QueueLength: int
    IsSpecialDay: int

print("Pathway schema 'ParkingSchema' defined successfully.")


--- Defining Pathway Schema ---
Pathway schema 'ParkingSchema' defined successfully.


In [7]:
print("\n--- Loading Data as Simulated Stream with Pathway ---")

data = pw.demo.replay_csv("parking_stream.csv", schema=ParkingSchema, input_rate=1000)

print("Data loaded as a simulated stream using Pathway's replay_csv.")
print(f"Stream object created: {data}")


--- Loading Data as Simulated Stream with Pathway ---
Data loaded as a simulated stream using Pathway's replay_csv.
Stream object created: <pathway.Table schema={'Timestamp': <class 'str'>, 'SystemCodeNumber': <class 'str'>, 'Capacity': <class 'int'>, 'Latitude': <class 'float'>, 'Longitude': <class 'float'>, 'Occupancy': <class 'int'>, 'VehicleType': <class 'str'>, 'TrafficConditionNearby': <class 'str'>, 'QueueLength': <class 'int'>, 'IsSpecialDay': <class 'int'>}>


In [8]:
# --- Preprocessing for Model Training (Batch) ---
print("\n--- Preprocessing for Model Training (Batch) ---")

# Traffic mapping: Convert categorical traffic levels to numerical values
TRAFFIC_MAP = {
    "low": 0.0,
    "medium": 1.0,
    "high": 2.0
}

# Vehicle type weights: Assign a numerical weight to each vehicle type
VEHICLE_TYPE_WEIGHTS = {
    "car": 1.0,
    "bike": 0.5,
    "truck": 1.5
}

# Create a working copy of the DataFrame for preprocessing and model application
processed_df = df.copy()

# Add numerical features to the working DataFrame for model training
processed_df['TrafficConditionNearby_numeric'] = processed_df['TrafficConditionNearby'].map(TRAFFIC_MAP)
processed_df['VehicleType_weight'] = processed_df['VehicleType'].map(VEHICLE_TYPE_WEIGHTS)
# Ensure 'Capacity' is not zero before division
processed_df['OccupancyRatio'] = processed_df.apply(lambda row: row['Occupancy'] / row['Capacity'] if row['Capacity'] > 0 else 0, axis=1)

print("Numerical features for TrafficConditionNearby and VehicleType added to DataFrame.")


--- Preprocessing for Model Training (Batch) ---
Numerical features for TrafficConditionNearby and VehicleType added to DataFrame.


# Step 4: Pricing Model Implementations (Model 1,2 and 3)

In [9]:
print("\n--- Pricing Model Implementations ---")

# Define constants for the pricing model
BASE_PRICE = 10.0
ALPHA = 5.0  # Alpha parameter for linear price increase
OCCUPANCY_THRESHOLD = 0.95
QUEUE_LENGTH_THRESHOLD = 5
MAX_DISTANCE_KM = 1.0


--- Pricing Model Implementations ---


# Model 1: Baseline Linear Model
A simple model where the next price is a function of the previous price and current
occupancy:

• Linear price increase as occupancy increases

• Acts as a reference point

Example: $Price_{t+1}$ = $Price_{t}$ + $\alpha$ * ($\frac{occupancy}{Capacity}$)

### Haversine distance function

In [10]:
# --- Haversine distance function ---
def haversine(lat1, lon1, lat2, lon2):
    R = 6371  # Earth radius in km
    phi1, phi2 = np.radians(lat1), np.radians(lat2)
    dphi = np.radians(lat2 - lat1)
    dlambda = np.radians(lon2 - lon1)
    a = np.sin(dphi/2)**2 + np.cos(phi1)*np.cos(phi2)*np.sin(dlambda/2)**2
    return 2*R*np.arcsin(np.sqrt(a))


### Baseline linear pricing model

In [11]:
# --- Baseline linear pricing model ---
def baseline_linear_price(prev_price, occupancy, capacity):
    occupancy_ratio = occupancy / capacity if capacity > 0 else 0
    next_price = prev_price + ALPHA * occupancy_ratio
    # Ensure price is within bounds [5, 30] as per previous user-provided function
    next_price = np.clip(next_price, 5, 30)
    return round(next_price, 2)

### Rerouting suggestion function

In [12]:
# --- Rerouting suggestion function ---
def suggest_reroute(current_lot_row, all_lots_at_timestamp):
    """
    Suggests rerouting if the current lot is overburdened.
    Considers other available lots within MAX_DISTANCE_KM at the same timestamp.
    """
    # Check if current lot is overburdened based on occupancy or queue length
    if (current_lot_row['Occupancy'] / current_lot_row['Capacity'] >= OCCUPANCY_THRESHOLD) or \
       (current_lot_row['QueueLength'] >= QUEUE_LENGTH_THRESHOLD):

        available_nearby_lots = []
        for idx, lot in all_lots_at_timestamp.iterrows():
            # Skip the current lot itself
            if lot['SystemCodeNumber'] == current_lot_row['SystemCodeNumber']:
                continue

            # Check if the lot has available capacity
            if lot['Occupancy'] < lot['Capacity']:
                # Calculate distance
                dist = haversine(current_lot_row['Latitude'], current_lot_row['Longitude'],
                                 lot['Latitude'], lot['Longitude'])

                # If within max distance, consider it available
                if dist <= MAX_DISTANCE_KM:
                    available_nearby_lots.append((lot['SystemCodeNumber'], dist))

        # Sort available lots by distance
        available_nearby_lots.sort(key=lambda x: x[1])

        # Return list of SystemCodeNumbers for available lots, or a descriptive string
        # Convert list to a comma-separated string for DataFrame assignment
        if available_nearby_lots:
            return ", ".join([lot_id for lot_id, _ in available_nearby_lots])
        else:
            return "No nearby alternatives"
    else:
        return "Not overburdened"

# --- Integration function: apply models to the pandas DataFrame ---
def apply_model_1_batch(input_df):
    """
    Applies the baseline linear pricing model and rerouting logic in a batch fashion
    to the entire pandas DataFrame.
    """
    # Create a copy to avoid modifying the original DataFrame
    df_processed = input_df.copy()

    # Sort data to ensure correct previous price calculation per parking space
    df_processed = df_processed.sort_values(['SystemCodeNumber', 'Timestamp']).reset_index(drop=True)

    # Initialize Price and RerouteSuggestion columns
    df_processed['CalculatedPrice_Model1'] = BASE_PRICE
    df_processed['RerouteSuggestion_Model1'] = "Not overburdened" # Default state
    unique_timestamps = df_processed['Timestamp'].unique()

    for ts in unique_timestamps:
        current_timestamp_data = df_processed[df_processed['Timestamp'] == ts].copy()

        for idx_row, row_data in current_timestamp_data.iterrows():
            system_code = row_data['SystemCodeNumber']

            # Find the previous price for this specific parking lot
            # This requires looking back in the *sorted* DataFrame.
            prev_price = BASE_PRICE
            # Find previous entry for the same SystemCodeNumber
            prev_entries = df_processed[
                (df_processed['SystemCodeNumber'] == system_code) &
                (df_processed['Timestamp'] < ts)
            ].sort_values('Timestamp', ascending=False)

            if not prev_entries.empty:
                prev_price = prev_entries.iloc[0]['CalculatedPrice_Model1']

            # Calculate new price
            occupancy = row_data['Occupancy']
            capacity = row_data['Capacity']
            new_price = baseline_linear_price(prev_price, occupancy, capacity)

            # Update the price in the main DataFrame
            df_processed.loc[idx_row, 'CalculatedPrice_Model1'] = new_price

            # Suggest reroute
            reroute_suggestion = suggest_reroute(row_data, current_timestamp_data)
            df_processed.loc[idx_row, 'RerouteSuggestion_Model1'] = reroute_suggestion

    return df_processed

# Apply Model 1 to the preprocessed DataFrame 'processed_df'
parking_prices_df_model1 = apply_model_1_batch(processed_df)

print("Baseline Linear Model (Model 1) and Rerouting Logic applied in batch mode.")
print("The 'parking_prices_df_model1' DataFrame now contains price predictions and rerouting suggestions.")
print("Sample of 'parking_prices_df_model1':")
print(parking_prices_df_model1.head())

Baseline Linear Model (Model 1) and Rerouting Logic applied in batch mode.
The 'parking_prices_df_model1' DataFrame now contains price predictions and rerouting suggestions.
Sample of 'parking_prices_df_model1':
   ID SystemCodeNumber  Capacity   Latitude  Longitude  Occupancy VehicleType  \
0   0      BHMBCCMKT01       577  26.144536  91.736172         61         car   
1   1      BHMBCCMKT01       577  26.144536  91.736172         64         car   
2   2      BHMBCCMKT01       577  26.144536  91.736172         80         car   
3   3      BHMBCCMKT01       577  26.144536  91.736172        107         car   
4   4      BHMBCCMKT01       577  26.144536  91.736172        150        bike   

  TrafficConditionNearby  QueueLength  IsSpecialDay LastUpdatedDate  \
0                    low            1             0      04-10-2016   
1                    low            1             0      04-10-2016   
2                    low            2             0      04-10-2016   
3                

# Model 2: Demand-Based Price Function
A more advanced model

$Demand = \alpha*(\frac{occupancy}{Capacity}) + \beta * QueueLength - \gamma * Traffic + \delta * IsSpecialDay + \epsilon * VehicleTypeWeight$

$Price_t = BasePrice * (1 + \lambda * NormalizedDemand)$

In [13]:
max_queue_length = processed_df['QueueLength'].max()
scaled_queue_length = processed_df['QueueLength'] / max_queue_length if max_queue_length > 0 else 0

processed_df['Demand_Proxy'] = (
    1.0 * processed_df['OccupancyRatio'] +
    0.005 * scaled_queue_length +
    0.005 * processed_df['TrafficConditionNearby_numeric'] +
    0.001 * processed_df['IsSpecialDay'] +
    0.001 * processed_df['VehicleType_weight']
)

# --- Prepare Features (X) and Target (y) for Linear Regression ---
features = [
    'OccupancyRatio',
    'QueueLength', # Use raw QueueLength as feature, scaling is in proxy or handled by regression
    'TrafficConditionNearby_numeric',
    'IsSpecialDay',
    'VehicleType_weight'
]
X = processed_df[features]
y = processed_df['Demand_Proxy']

# Handle potential NaN values in features (e.g., from mapping unknown categories or division by zero)
# Also handle NaN values in the target 'y'
combined_data = pd.concat([X, y], axis=1)
combined_data = combined_data.dropna() # Drop rows where any feature or target is NaN

X = combined_data[features]
y = combined_data['Demand_Proxy']

# --- Train Linear Regression Model ---
demand_model = LinearRegression()
demand_model.fit(X, y)

# --- Extract Learned Coefficients ---
DEMAND_ALPHA = demand_model.coef_[features.index('OccupancyRatio')]
DEMAND_BETA = demand_model.coef_[features.index('QueueLength')]

DEMAND_GAMMA_RAW = demand_model.coef_[features.index('TrafficConditionNearby_numeric')]
DEMAND_DELTA = demand_model.coef_[features.index('IsSpecialDay')]
DEMAND_EPSILON = demand_model.coef_[features.index('VehicleType_weight')]

# Price adjustment coefficient based on normalized demand (still manually set for now)
PRICE_LAMBDA = 0.5

# --- Dynamically Determine Demand Normalization Bounds ---
# Predict demand on the entire dataset using the learned coefficients

processed_df['Predicted_RawDemand'] = demand_model.predict(processed_df[features].fillna(0))

MIN_DEMAND_BOUND = processed_df['Predicted_RawDemand'].min()
MAX_DEMAND_BOUND = processed_df['Predicted_RawDemand'].max()

# Add a small epsilon to avoid division by zero if min == max
if MAX_DEMAND_BOUND - MIN_DEMAND_BOUND < 1e-6:
    MAX_DEMAND_BOUND += 1e-6


# Price bounds for Model 2: not more than 2x or less than 0.5x base price
MIN_MODEL2_PRICE_BOUND = 0.5 * BASE_PRICE # 5.0
MAX_MODEL2_PRICE_BOUND = 2.0 * BASE_PRICE # 20.0

print(f"Learned Demand Coefficients: Alpha={DEMAND_ALPHA:.4f}, Beta={DEMAND_BETA:.4f}, Gamma_Raw={DEMAND_GAMMA_RAW:.4f}, Delta={DEMAND_DELTA:.4f}, Epsilon={DEMAND_EPSILON:.4f}")
print(f"Dynamic Demand Normalization Bounds: Min={MIN_DEMAND_BOUND:.4f}, Max={MAX_DEMAND_BOUND:.4f}")


def calculate_demand_learned(occupancy, capacity, queue_length, traffic_condition, is_special_day, vehicle_type):
    """
    Calculates the raw demand based on the specified formula, using learned coefficients.
    """
    occupancy_ratio = occupancy / capacity if capacity > 0 else 0
    traffic_numeric = TRAFFIC_MAP.get(traffic_condition, 0.0)
    vehicle_type_weight = VEHICLE_TYPE_WEIGHTS.get(vehicle_type, 1.0)

    raw_demand = (
        DEMAND_ALPHA * occupancy_ratio +
        DEMAND_BETA * queue_length +
        DEMAND_GAMMA_RAW * traffic_numeric +
        DEMAND_DELTA * is_special_day +
        DEMAND_EPSILON * vehicle_type_weight
    )
    return raw_demand

def normalize_demand_learned(raw_demand):
    """
    Normalizes the raw demand to a [0, 1] range using learned bounds.
    """
    normalized_demand = (raw_demand - MIN_DEMAND_BOUND) / (MAX_DEMAND_BOUND - MIN_DEMAND_BOUND)
    return np.clip(normalized_demand, 0.0, 1.0)

def demand_based_price_learned(normalized_demand):
    """
    Calculates the price based on normalized demand.
    """
    next_price = BASE_PRICE * (1 + PRICE_LAMBDA * normalized_demand)
    return np.clip(next_price, MIN_MODEL2_PRICE_BOUND, MAX_MODEL2_PRICE_BOUND)

def apply_model_2_batch(input_df):
    """
    Applies the demand-based pricing model and rerouting logic in a batch fashion.
    """
    df_processed = input_df.copy()
    df_processed = df_processed.sort_values(['SystemCodeNumber', 'Timestamp']).reset_index(drop=True)

    df_processed['RawDemand_Model2'] = 0.0
    df_processed['NormalizedDemand_Model2'] = 0.0
    df_processed['CalculatedPrice_Model2'] = BASE_PRICE
    df_processed['RerouteSuggestion_Model2'] = "Not overburdened"

    unique_timestamps = df_processed['Timestamp'].unique()

    for ts in unique_timestamps:
        current_timestamp_data = df_processed[df_processed['Timestamp'] == ts].copy()

        for idx_row, row_data in current_timestamp_data.iterrows():
            raw_demand = calculate_demand_learned(
                row_data['Occupancy'],
                row_data['Capacity'],
                row_data['QueueLength'],
                row_data['TrafficConditionNearby'],
                row_data['IsSpecialDay'],
                row_data['VehicleType']
            )
            normalized_demand = normalize_demand_learned(raw_demand)

            # Calculate price based on demand
            price = demand_based_price_learned(normalized_demand)

            # Update the price and demand in the main DataFrame
            df_processed.loc[idx_row, 'RawDemand_Model2'] = raw_demand
            df_processed.loc[idx_row, 'NormalizedDemand_Model2'] = normalized_demand
            df_processed.loc[idx_row, 'CalculatedPrice_Model2'] = price

            # Suggest reroute (re-using the logic from Model 1, as it's a general requirement)
            reroute_suggestion = suggest_reroute(row_data, current_timestamp_data)
            df_processed.loc[idx_row, 'RerouteSuggestion_Model2'] = reroute_suggestion

    return df_processed

# Apply Model 2 to the pandas DataFrame that already has Model 1 results
parking_prices_df_model2 = apply_model_2_batch(parking_prices_df_model1)

print("Demand-Based Price Function (Model 2) with learned coefficients and Rerouting Logic applied in batch mode.")
print("The 'parking_prices_df_model2' DataFrame now contains price predictions, demand values, and rerouting suggestions.")
print("Sample of 'parking_prices_df_model2':")
print(parking_prices_df_model2.head())

Learned Demand Coefficients: Alpha=1.0000, Beta=0.0003, Gamma_Raw=0.0050, Delta=0.0010, Epsilon=0.0010
Dynamic Demand Normalization Bounds: Min=0.0050, Max=1.0570
Demand-Based Price Function (Model 2) with learned coefficients and Rerouting Logic applied in batch mode.
The 'parking_prices_df_model2' DataFrame now contains price predictions, demand values, and rerouting suggestions.
Sample of 'parking_prices_df_model2':
   ID SystemCodeNumber  Capacity   Latitude  Longitude  Occupancy VehicleType  \
0   0      BHMBCCMKT01       577  26.144536  91.736172         61         car   
1   1      BHMBCCMKT01       577  26.144536  91.736172         64         car   
2   2      BHMBCCMKT01       577  26.144536  91.736172         80         car   
3   3      BHMBCCMKT01       577  26.144536  91.736172        107         car   
4   4      BHMBCCMKT01       577  26.144536  91.736172        150        bike   

  TrafficConditionNearby  QueueLength  IsSpecialDay  ...           Timestamp  \
0         

# Model 3 (Optional): Competitive Pricing Model
This model adds location intelligence and simulates real-world competition

In [14]:
HYPOTHETICAL_COMPETITOR_AVG_PRICE = 12.0

print("Competitive Pricing Model (Model 3) logic will be applied directly within the unified stream")

Competitive Pricing Model (Model 3) logic will be applied directly within the unified stream


# Step 5: Real-Time Simulation with Bokeh

### Combining Pricing Models for Unified Stream

In [15]:
occupancy_ratio_expr = pw.this.Occupancy / pw.this.Capacity
traffic_numeric_expr = pw.apply(lambda x: TRAFFIC_MAP.get(x, 0.0), pw.this.TrafficConditionNearby)
vehicle_type_weight_expr = pw.apply(lambda x: VEHICLE_TYPE_WEIGHTS.get(x, 1.0), pw.this.VehicleType)

# Define a UDF to convert string timestamp to milliseconds since epoch
def convert_timestamp_to_ms(timestamp_str):
    # Ensure the format matches the input string exactly
    dt_object = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
    return dt_object.timestamp() * 1000 # Convert to milliseconds


timestamp_ms_expr = pw.apply(convert_timestamp_to_ms, pw.this.Timestamp)


# --- Model 1 (Baseline Linear Model) Price Calculation ---

calculated_price_model1_expr = pw.if_else(
    occupancy_ratio_expr > 0.5, # Simple condition for price increase
    BASE_PRICE + ALPHA * occupancy_ratio_expr,
    BASE_PRICE
)

# --- Model 2 (Demand-Based Price Function) Price Calculation ---

# Define a UDF for raw demand calculation
def calculate_raw_demand_udf(occupancy_ratio, queue_length, traffic_numeric, is_special_day, vehicle_type_weight,
                             demand_alpha, demand_beta, demand_gamma_raw, demand_delta, demand_epsilon):
    return (
        demand_alpha * occupancy_ratio +
        demand_beta * queue_length +
        demand_gamma_raw * traffic_numeric +
        demand_delta * is_special_day +
        DEMAND_EPSILON * vehicle_type_weight
    )

# Define a UDF for normalized demand calculation
def normalize_demand_udf(raw_demand, min_demand_bound, max_demand_bound):
    # Handle potential division by zero if max_demand_bound - min_demand_bound is too small
    denominator = max_demand_bound - min_demand_bound
    if abs(denominator) < 1e-6:
        return 0.0
    normalized_demand = (raw_demand - min_demand_bound) / denominator
    return np.clip(normalized_demand, 0.0, 1.0)

# Define a UDF for demand-based price calculation
def demand_based_price_udf(normalized_demand, base_price, price_lambda, min_model2_price_bound, max_model2_price_bound):
    next_price = base_price * (1 + price_lambda * normalized_demand)
    return np.clip(next_price, min_model2_price_bound, max_model2_price_bound)

raw_demand_model2_expr = pw.apply(
    calculate_raw_demand_udf,
    occupancy_ratio_expr,
    pw.this.QueueLength,
    traffic_numeric_expr,
    pw.this.IsSpecialDay,
    vehicle_type_weight_expr,
    DEMAND_ALPHA,
    DEMAND_BETA,
    DEMAND_GAMMA_RAW,
    DEMAND_DELTA,
    DEMAND_EPSILON
)

normalized_demand_model2_expr = pw.apply(
    normalize_demand_udf,
    raw_demand_model2_expr,
    MIN_DEMAND_BOUND,
    MAX_DEMAND_BOUND
)

calculated_price_model2_expr = pw.apply(
    demand_based_price_udf,
    normalized_demand_model2_expr,
    BASE_PRICE,
    PRICE_LAMBDA,
    MIN_MODEL2_PRICE_BOUND,
    MAX_MODEL2_PRICE_BOUND
)


# --- Model 3 (Competitive Pricing Model) Price Calculation ---

def calculate_competitive_adjustment_udf(occupancy_ratio, price_model2_val, hypothetical_competitor_avg_price):
    if (occupancy_ratio > 0.8) and (price_model2_val > hypothetical_competitor_avg_price):
        return -0.5
    elif (occupancy_ratio < 0.3) and (price_model2_val < hypothetical_competitor_avg_price):
        return 0.5
    else:
        return 0.0

def clamp_final_price_udf(final_price_val, base_price):
    min_bound = base_price * 0.5
    max_bound = base_price * 2.0
    return np.clip(final_price_val, min_bound, max_bound)


competitive_adjustment_model3_expr = pw.apply(
    calculate_competitive_adjustment_udf,
    occupancy_ratio_expr,
    calculated_price_model2_expr,
    HYPOTHETICAL_COMPETITOR_AVG_PRICE
)

# Wrap the addition in a UDF to ensure Pathway compatibility
final_price_model3_expr = pw.apply(
    lambda p2_price, adjustment: p2_price + adjustment,
    calculated_price_model2_expr,
    competitive_adjustment_model3_expr
)

clamped_final_price_model3_expr = pw.apply(
    clamp_final_price_udf,
    final_price_model3_expr,
    BASE_PRICE
)


combined_prices_stream = data.select(
    Timestamp=timestamp_ms_expr,
    SystemCodeNumber=pw.this.SystemCodeNumber,
    Capacity=pw.this.Capacity,
    Latitude=pw.this.Latitude,
    Longitude=pw.this.Longitude,
    Occupancy=pw.this.Occupancy,
    VehicleType=pw.this.VehicleType,
    TrafficConditionNearby=pw.this.TrafficConditionNearby,
    QueueLength=pw.this.QueueLength,
    IsSpecialDay=pw.this.IsSpecialDay,
    OccupancyRatio=occupancy_ratio_expr,
    TrafficConditionNearby_numeric=traffic_numeric_expr,
    VehicleType_weight=vehicle_type_weight_expr,
    CalculatedPrice_Model1=calculated_price_model1_expr,
    RawDemand_Model2=raw_demand_model2_expr,
    NormalizedDemand_Model2=normalized_demand_model2_expr,
    CalculatedPrice_Model2=calculated_price_model2_expr,
    CompetitiveAdjustment_Model3=competitive_adjustment_model3_expr,
    CalculatedPrice_Model3=clamped_final_price_model3_expr
)

print("All pricing models combined into a single unified Pathway stream 'combined_prices_stream'.")
print("This stream is now ready for multi-model visualization and further analysis.")

All pricing models combined into a single unified Pathway stream 'combined_prices_stream'.
This stream is now ready for multi-model visualization and further analysis.


# Real Time Visualisation

In [16]:
def create_parking_plot(source, parking_spot_id):
    """
    Creates a Bokeh figure for real-time price visualization for a single parking spot.
    """
    fig = bp.figure(
        height=400,
        width=800,
        title=f"Parky: Up-to-the-Minute Pricing for {parking_spot_id}",
        x_axis_type="datetime",
        x_axis_label="Time",
        y_axis_label="Price ($)",
        tools="pan,wheel_zoom,box_zoom,reset,save",
        sizing_mode="scale_width" # Making the plot responsive
    )

    # Adding lines for each pricing model
    # Ensure 'y' values match the column names in combined_prices_stream
    fig.line(x='Timestamp', y='CalculatedPrice_Model1', source=source, legend_label="Model 1 (Baseline)", line_color="blue", line_width=2)
    fig.line(x='Timestamp', y='CalculatedPrice_Model2', source=source, legend_label="Model 2 (Demand-Based)", line_color="green", line_width=2)
    fig.line(x='Timestamp', y='CalculatedPrice_Model3', source=source, legend_label="Model 3 (Competitive)", line_color="red", line_width=2)

    # Add circles for Model 3 points for emphasis
    fig.circle(x='Timestamp', y='CalculatedPrice_Model3', source=source, size=6, color="orange", alpha=0.6, legend_label="Model 3 Points")


    fig.legend.location = "top_left"
    fig.legend.click_policy = "hide"
    fig.xaxis.formatter = DatetimeTickFormatter(
        seconds="%H:%M:%S",
        minsec="%H:%M:%S",
        minutes="%H:%M",
        hours="%H:%M",
        days="%Y-%m-%d",
        months="%Y-%m",
        years="%Y"
    )
    fig.x_range.range_padding = 0
    return fig

In [17]:
# Get unique parking spot IDs from the initial DataFrame (df)
unique_parking_spots = df['SystemCodeNumber'].unique()

# Create a dictionary to hold a Bokeh plot for each parking spot
parking_spot_plots = {}

# Iterate through each unique parking spot and create a plot
for spot_id in unique_parking_spots:
    # Filter the combined_prices_stream for the current parking spot
    spot_data_stream = combined_prices_stream.filter(pw.this.SystemCodeNumber == spot_id)
    plot = spot_data_stream.plot(partial(create_parking_plot, parking_spot_id=spot_id), sorting_col="Timestamp")
    parking_spot_plots[spot_id] = plot

print(f"Created {len(unique_parking_spots)} Bokeh plots, one for each unique parking spot.")
print("Plots are now ready to be displayed in a Panel dashboard.")



Created 14 Bokeh plots, one for each unique parking spot.
Plots are now ready to be displayed in a Panel dashboard.




In [18]:
all_plots_column = pn.Column(
    "# Parky: Dynamic Parking Pricing Dashboard",
    "## Real-time Price Fluctuations Across Parking Lots",
    *[plot for plot_id, plot in parking_spot_plots.items()]
)
print("Attempting to display Panel dashboard inline...")
all_plots_column.servable()

Attempting to display Panel dashboard inline...


In [None]:
# Start the Pathway pipeline execution in the background.
# This triggers the real-time data stream processing and updates the Bokeh plots continuously.
# %%capture --no-display suppresses output in the notebook interface, but the Bokeh plots will still update live.

# Note: This cell will run indefinitely until interrupted.

# It's important to run this cell last to allow all Pathway definitions and Bokeh plot setups to be complete.


%%capture --no-display
print("Starting Pathway pipeline. Plots will update below...")
pw.run()

Output()