In [None]:
#  1. Setup & Installation
%pip install pandas numpy pathway bokeh geopy --quiet

import pandas as pd
import numpy as np
from bokeh.plotting import figure, show, output_notebook
from math import radians, cos, sin, asin, sqrt
output_notebook()




In [None]:
# ================================
df = pd.read_csv("dataset.csv")

# Step 2: Combine date and time columns into one datetime column
df["Timestamp"] = pd.to_datetime(df["LastUpdatedDate"] + " " + df["LastUpdatedTime"],format="%d-%m-%Y %H:%M:%S")

# Optional: rename columns if needed
# df.rename(columns={"Lot_ID": "LotID", "CongestionLevel": "Traffic"}, inplace=True)

df.sort_values("Timestamp", inplace=True)
# 🏷️ Generate LotID based on unique Latitude-Longitude pairs
df["LotID"] = df.groupby(["Latitude", "Longitude"]).ngroup().apply(lambda x: f"Lot_{x}")



In [None]:
#  3. Feature Encoding & Prep
vehicle_weight_map = {"car": 1.0, "bike": 0.7, "truck": 1.3}

def preprocess(row):
    row["OccupancyRate"] = row["Occupancy"] / row["Capacity"]
    row["VehicleWeight"] = vehicle_weight_map.get(row["VehicleType"], 1.0)

    # Encode traffic condition
    traffic_map = {"low": 1, "moderate": 3, "high": 5}
    row["Traffic"] = traffic_map.get(str(row.get("TrafficConditionNearby", "")).lower(), 3)

    return row



In [None]:
# 4. Pricing Models

# Model 1: Baseline
def baseline_price(prev_price, occupancy, capacity, alpha=0.1):
    occupancy_rate = occupancy / capacity
    return prev_price + alpha * occupancy_rate

# Model 2: Demand-Based
def compute_demand(row, alpha=1, beta=0.5, gamma=0.3, delta=0.2, epsilon=0.4):
    return (
        alpha * row["OccupancyRate"] +
        beta * row["QueueLength"] -
        gamma * row["Traffic"] +          # ← now it's numeric
        delta * row["IsSpecialDay"] +
        epsilon * row["VehicleWeight"]
    )

def demand_based_price(base_price, demand, lambda_=0.6):
    norm_demand = np.clip(demand / 10, 0, 2)
    return base_price * (1 + lambda_ * norm_demand)

# Model 3: Competitive Adjustments
def haversine(lat1, lon1, lat2, lon2):
    R = 6371
    dlat, dlon = radians(lat2 - lat1), radians(lon2 - lon1)
    a = sin(dlat / 2)**2 + cos(radians(lat1)) * cos(radians(lat2)) * sin(dlon / 2)**2
    return R * 2 * asin(sqrt(a))

def adjust_for_competition(current_price, queue_length, competitors):
    nearby_cheaper = [c["price"] for c in competitors if c["price"] < current_price and c["distance"] < 2]
    nearby_expensive = [c["price"] for c in competitors if c["price"] > current_price and c["distance"] < 2]

    if queue_length > 5 and nearby_cheaper:
        return current_price * 0.95
    elif nearby_expensive:
        return current_price * 1.05
    else:
        return current_price


In [None]:
# 5. Pricing Logic
def pricing_logic(row, prev_price, competitors=[]):
    row = preprocess(row)
    demand = compute_demand(row)
    price = demand_based_price(base_price=10, demand=demand)

    if competitors:
        price = adjust_for_competition(price, row["QueueLength"], competitors)

    # Smooth price transition
    new_price = 0.7 * prev_price + 0.3 * price
    return round(np.clip(new_price, 5, 20), 2)


In [None]:
#  6. Run Simulation
price_history = {}

for timestamp in sorted(df["Timestamp"].unique()):
    time_slice = df[df["Timestamp"] == timestamp]
    for idx, row in time_slice.iterrows():
        lot = row["LotID"]
        prev_price = price_history.get(lot, [])[-1] if lot in price_history else 10
        # Simulate competitors nearby (for demo, use other lots)
        competitors = []
        for other_idx, other in time_slice.iterrows():
            if other["LotID"] != lot:
                dist = haversine(row["Latitude"], row["Longitude"], other["Latitude"], other["Longitude"])
                competitors.append({"price": price_history.get(other["LotID"], [10])[-1], "distance": dist})
        new_price = pricing_logic(row, prev_price, competitors)
        price_history.setdefault(lot, []).append(new_price)



In [None]:
#  7. Bokeh Visualization
p = figure(title="Real-Time Parking Prices", x_axis_label='Time Steps', y_axis_label='Price ($)', width=800)

for lot, prices in price_history.items():
    p.line(list(range(len(prices))), prices, legend_label=lot, line_width=2)


p.legend.title = "Parking Lots"
show(p)