In [3]:
pip install pathway bokeh  # This cell may take a few seconds to execute.

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [5]:
pip install panel

Defaulting to user installation because normal site-packages is not writeable
Collecting panel
  Downloading panel-1.4.5-py3-none-any.whl.metadata (25 kB)
Collecting param<3.0,>=2.1.0 (from panel)
  Downloading param-2.2.1-py3-none-any.whl.metadata (6.6 kB)
Collecting pyviz-comms>=2.0.0 (from panel)
  Downloading pyviz_comms-3.0.6-py3-none-any.whl.metadata (7.7 kB)
Collecting linkify-it-py (from panel)
  Downloading linkify_it_py-2.0.3-py3-none-any.whl.metadata (8.5 kB)
Collecting mdit-py-plugins (from panel)
  Downloading mdit_py_plugins-0.4.2-py3-none-any.whl.metadata (2.8 kB)
Collecting uc-micro-py (from linkify-it-py->panel)
  Downloading uc_micro_py-1.0.3-py3-none-any.whl.metadata (2.0 kB)
Downloading panel-1.4.5-py3-none-any.whl (24.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.7/24.7 MB[0m [31m814.8 kB/s[0m eta [36m0:00:00[0m00:01[0m00:02[0m
[?25hDownloading param-2.2.1-py3-none-any.whl (119 kB)
Downloading pyviz_comms-3.0.6-py3-none-any.whl (84 

In [16]:



import pandas as pd
import pathway as pw

# Load dataset from Google Drive
df = pd.read_csv("dataset.csv")

# Merge Date + Time columns into one
df['Timestamp'] = pd.to_datetime(
    df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
    format='%d-%m-%Y %H:%M:%S'
)

# Select relevant columns & rename for Pathway
df_model1 = df[['Timestamp', 'SystemCodeNumber', 'Occupancy', 'Capacity']].copy()
df_model1.columns = ['timestamp', 'lot_id', 'occupancy', 'capacity']

# Sort by lot_id and timestamp to ensure correct sequential processing
df_model1 = df_model1.sort_values(['lot_id', 'timestamp'])

# Clean data to avoid CSV parsing issues
df_model1['lot_id'] = df_model1['lot_id'].astype(str).str.replace(',', '_')  # Remove commas
df_model1 = df_model1.dropna()  # Remove any NaN values

print(f"Original data shape: {df_model1.shape}")
print(f"Data types:\n{df_model1.dtypes}")

# Pre-process stateful price calculation with pandas
def calculate_stateful_prices(df):
    df = df.copy()
    df['date'] = df['timestamp'].dt.date.astype(str)
    df['price'] = 0.0

    for lot_id in df['lot_id'].unique():
        lot_mask = df['lot_id'] == lot_id
        lot_data = df[lot_mask].copy().sort_values('timestamp')

        prev_price = 10.0
        last_day = None
        alpha = 5.0
        prices = []

        for idx, row in lot_data.iterrows():
            current_date = row['date']
            if current_date != last_day:
                prev_price = 10.0  # Reset price at start of new day

            # Ensure no division by zero
            if row['capacity'] > 0:
                price = prev_price + alpha * (row['occupancy'] / row['capacity'])
            else:
                price = prev_price

            price = round(min(max(price, 5), 20), 2)  # Clamp price between 5 and 20
            prices.append(price)

            prev_price = price
            last_day = current_date

        df.loc[lot_mask, 'price'] = prices

    return df

# Apply stateful calculation
print("Calculating stateful prices...")
df_with_prices = calculate_stateful_prices(df_model1)

# Clean the timestamp format for Pathway
df_with_prices['timestamp'] = df_with_prices['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Save data with prices for Pathway with proper formatting
df_with_prices.to_csv("full_stream_data_with_prices.csv", index=False, quoting=1)  # Quote all fields

print(f"Data with prices shape: {df_with_prices.shape}")
print("Sample data:")
print(df_with_prices.head())

# Define Schema for data with prices
class ParkingWithPriceSchema(pw.Schema):
    timestamp: str
    lot_id: str
    occupancy: int
    capacity: int
    date: str
    price: float

# Read the pre-processed data into Pathway
try:
    input_table = pw.io.csv.read(
        "full_stream_data_with_prices.csv",
        schema=ParkingWithPriceSchema,
        mode="static"
    )

    # Apply transformations in Pathway
    output_table = input_table.select(
        timestamp=input_table.timestamp,
        lot_id=input_table.lot_id,
        occupancy=input_table.occupancy,
        capacity=input_table.capacity,
        date=input_table.date,
        price=input_table.price
    )

    # Write output to CSV with proper formatting
    pw.io.csv.write(output_table, "full_real_time_prices.csv")

    # Run Pathway pipeline
    print("Running Pathway pipeline...")
    pw.run()

    # Load and display results with error handling
    try:
        df_out = pd.read_csv("full_real_time_prices.csv", quoting=1)
        print("\nResults:")
        print(df_out.head())
        print(f"\nTotal records: {len(df_out)}")

        # Price analysis
        print(f"\nPrice Analysis:")
        print(f"Price range: {df_out['price'].min():.2f} - {df_out['price'].max():.2f}")
        print(f"Average price: {df_out['price'].mean():.2f}")
        print(f"Median price: {df_out['price'].median():.2f}")
        print(f"Standard deviation: {df_out['price'].std():.2f}")

        # Price distribution
        print(f"\nPrice Distribution:")
        print(df_out['price'].value_counts().sort_index().head(10))

        print(f"\nUnique lot_ids: {df_out['lot_id'].nunique()}")

        # Check for any data issues
        print(f"\nData quality check:")
        print(f"Missing values: {df_out.isnull().sum().sum()}")
        print(f"Duplicate rows: {df_out.duplicated().sum()}")

    except pd.errors.ParserError as e:
        print(f"Error reading output CSV: {e}")
        # Try reading with different parameters
        try:
            df_out = pd.read_csv("full_real_time_prices.csv", sep=',', quotechar='"', skipinitialspace=True)
            print("Successfully read with alternative parameters")
            print(df_out.head())
        except Exception as e2:
            print(f"Still failed: {e2}")
            # Show raw file content for debugging
            with open("full_real_time_prices.csv", 'r') as f:
                print("First few lines of output file:")
                for i, line in enumerate(f):
                    if i < 10:
                        print(f"Line {i}: {line.strip()}")
                    else:
                        break

except Exception as e:
    print(f"Error in Pathway pipeline: {e}")
    # Fallback: Just use pandas result
    print("Using pandas result as fallback:")
    print(df_with_prices.head())
    print(f"Total records: {len(df_with_prices)}")

    # Price analysis for fallback
    print(f"\nPrice Analysis:")
    print(f"Price range: {df_with_prices['price'].min():.2f} - {df_with_prices['price'].max():.2f}")
    print(f"Average price: {df_with_prices['price'].mean():.2f}")
    print(f"Median price: {df_with_prices['price'].median():.2f}")
    print(f"Standard deviation: {df_with_prices['price'].std():.2f}")

    # Price distribution
    print(f"\nPrice Distribution:")
    print(df_with_prices['price'].value_counts().sort_index().head(10))

    print(f"\nUnique lot_ids: {df_with_prices['lot_id'].nunique()}")
     

# Install required packages
!pip install bokeh panel --quiet

import pandas as pd
import bokeh.plotting
from bokeh.models import ColumnDataSource
from bokeh.palettes import Category20
import panel as pn

# Activate Panel
pn.extension()

# Load CSV robustly
df = pd.read_csv(
    "full_real_time_prices.csv",
    sep=',',
    quotechar='"',
    skipinitialspace=True,
    on_bad_lines='warn'
)

# Convert timestamp to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')

# Drop missing timestamps, prices, or lot_ids
df = df.dropna(subset=['timestamp', 'price', 'lot_id'])

# Prepare Bokeh figure
fig = bokeh.plotting.figure(
    height=500,
    width=900,
    title="Daily Parking Prices per Lot (Pathway Output)",
    x_axis_type="datetime",
    tools="pan,wheel_zoom,box_zoom,reset,save"
)

# Use Category20 palette (20 distinct colors)
colors = Category20[20]
lot_ids = sorted(df['lot_id'].unique())  # Sort lot IDs for consistent colors

# Plot each parking lot with a unique color
for idx, lot_id in enumerate(lot_ids):
    lot_data = df[df['lot_id'] == lot_id].sort_values('timestamp')
    source = ColumnDataSource(lot_data)

    fig.line("timestamp", "price", source=source, line_width=2,
             color=colors[idx % len(colors)], legend_label=f"Lot {lot_id}")

    fig.circle("timestamp", "price", source=source, size=5,
               color=colors[idx % len(colors)])

# Final touches
fig.xaxis.axis_label = "Timestamp"
fig.yaxis.axis_label = "Price"
fig.legend.location = "top_left"
fig.legend.click_policy = "hide"  # Allow hiding/showing lots interactively

# Display plot
pn.Column(fig).servable()

     

Original data shape: (18368, 4)
Data types:
timestamp    datetime64[ns]
lot_id               object
occupancy             int64
capacity              int64
dtype: object
Calculating stateful prices...


Output()

Data with prices shape: (18368, 6)
Sample data:
             timestamp       lot_id  occupancy  capacity        date  price
0  2016-10-04 07:59:00  BHMBCCMKT01         61       577  2016-10-04  10.53
1  2016-10-04 08:25:00  BHMBCCMKT01         64       577  2016-10-04  11.08
2  2016-10-04 08:59:00  BHMBCCMKT01         80       577  2016-10-04  11.77
3  2016-10-04 09:32:00  BHMBCCMKT01        107       577  2016-10-04  12.70
4  2016-10-04 09:59:00  BHMBCCMKT01        150       577  2016-10-04  14.00
Running Pathway pipeline...



Results:
             timestamp         lot_id  occupancy  capacity        date  price  \
0  2016-11-11 08:27:00       Shopping        758      1920  2016-11-11  13.53   
1  2016-10-22 08:59:00    BHMNCPHST01        135      1200  2016-10-22  11.37   
2  2016-12-13 13:02:00  Others-CCCPS8        859      1322  2016-12-13  20.00   
3  2016-11-03 16:30:00    BHMNCPNST01        297       485  2016-11-03  20.00   
4  2016-11-23 11:57:00    BHMNCPNST01        329       485  2016-11-23  20.00   

            time  diff  
0  1752096257968     1  
1  1752096257968     1  
2  1752096257968     1  
3  1752096257968     1  
4  1752096257968     1  

Total records: 18368

Price Analysis:
Price range: 10.03 - 20.00
Average price: 18.17
Median price: 20.00
Standard deviation: 3.06

Price Distribution:
price
10.03    2
10.05    1
10.06    3
10.08    1
10.09    1
10.10    6
10.11    2
10.12    6
10.13    1
10.14    2
Name: count, dtype: int64

Unique lot_ids: 14

Data quality check:
Missing values: 0



In [18]:



     

import pathway as pw
from pathway.io import csv

file_path = "dataset.csv"

# Load the CSV with inferred schema
data = csv.read(
    file_path,
    schema=pw.schema_from_csv(file_path),
    mode="static",
)

# Data validation (based on your actual data structure)
data = data.filter(
    (data.Occupancy >= 0) &
    (data.QueueLength >= 0) &
    (data.Capacity > 0) &
    ((data.IsSpecialDay == 0) | (data.IsSpecialDay == 1))
)

# Step 2: Extract hour and create datetime components

def extract_hour_from_time(time_str: str) -> int:
    """Extract hour from time string format HH:MM:SS"""
    try:
        return int(time_str.split(':')[0])
    except:
        return 12  # Default to noon if parsing fails

def create_datetime_string(date_str: str, time_str: str) -> str:
    """Combine date and time strings"""
    return date_str + " " + time_str

data_with_time = data.with_columns(
    datetime_str=pw.apply(create_datetime_string, data.LastUpdatedDate, data.LastUpdatedTime),
    hour=pw.apply(extract_hour_from_time, data.LastUpdatedTime),
    day=data.LastUpdatedDate,  # Use date string directly for now
)

# Step 3: Add vehicle type weights and traffic weights

def get_vehicle_weight(vehicle_type: str) -> float:
    """Assign weights based on vehicle type"""
    if vehicle_type == "truck":
        return 1.5  # Trucks take more space/create more demand
    elif vehicle_type == "car":
        return 1.0
    elif vehicle_type == "bike":
        return 0.5  # Bikes take less space
    else:
        return 1.0  # Default for unknown types

def get_traffic_weight(traffic_condition: str) -> float:
    """Assign weights based on traffic condition"""
    if traffic_condition == "high":
        return 1.3  # High traffic increases demand for parking
    elif traffic_condition == "medium":
        return 1.1
    elif traffic_condition == "low":
        return 1.0
    else:
        return 1.0  # Default

# Add weights to the data
weighted_data = data_with_time.with_columns(
    vehicle_weight=pw.apply(get_vehicle_weight, data_with_time.VehicleType),
    traffic_weight=pw.apply(get_traffic_weight, data_with_time.TrafficConditionNearby),
    occupancy_rate=data_with_time.Occupancy / data_with_time.Capacity
)

# Step 4: Compute demand score (Model 2 implementation)

def calculate_demand(occupancy_rate: float, queue: int, traffic_weight: float,
                    special_day: int, vehicle_weight: float, hour: int) -> float:
    """
    Advanced demand function as per Model 2 requirements
    Demand = α·(Occupancy/Capacity) + β·QueueLength + γ·Traffic + δ·IsSpecialDay + ε·VehicleTypeWeight + ζ·HourFactor
    """
    # Coefficients (you can tune these based on business logic)
    alpha = 0.4    # Occupancy rate weight
    beta = 0.25    # Queue length weight
    gamma = 0.15   # Traffic condition weight
    delta = 0.1    # Special day weight
    epsilon = 0.05 # Vehicle type weight
    zeta = 0.05    # Hour factor weight

    # Peak hours (9-11 AM, 1-3 PM, 5-7 PM) get higher weight
    hour_factor = 1.2 if hour in [9, 10, 11, 13, 14, 15, 17, 18, 19] else 1.0

    demand = (alpha * occupancy_rate +
              beta * queue +
              gamma * traffic_weight +
              delta * special_day +
              epsilon * vehicle_weight +
              zeta * hour_factor)

    return demand

scored = weighted_data.with_columns(
    demand_score=pw.apply(
        calculate_demand,
        weighted_data.occupancy_rate,
        weighted_data.QueueLength,
        weighted_data.traffic_weight,
        weighted_data.IsSpecialDay,
        weighted_data.vehicle_weight,
        weighted_data.hour,
    )
)

# Step 5: Simple normalization without groupby (fallback approach)

step2 = scored.with_columns(
    normalized_demand=scored.demand_score / 2.0
)

# Step 6: Price calculation (Model 2 implementation)

BASE_PRICE = 10
LAMBDA = 0.8

# Ensure price variations are bounded (0.5x to 2x base price as per requirements)
final = step2.with_columns(
    price_multiplier=1 + (LAMBDA * step2.normalized_demand),
    unbounded_price=BASE_PRICE * (1 + LAMBDA * step2.normalized_demand)
).with_columns(
    # Bound the price between 0.5x and 2x base price
    price=pw.if_else(
        pw.this.unbounded_price > 2 * BASE_PRICE,
        2 * BASE_PRICE,
        pw.if_else(
            pw.this.unbounded_price < 0.5 * BASE_PRICE,
            0.5 * BASE_PRICE,
            pw.this.unbounded_price
        )
    )
)

# Step 7: Output comprehensive results

output_data = final.select(
    final.ID,
    final.SystemCodeNumber,
    final.datetime_str,
    final.hour,
    final.day,
    final.Latitude,
    final.Longitude,
    final.Capacity,
    final.Occupancy,
    final.occupancy_rate,
    final.QueueLength,
    final.VehicleType,
    final.TrafficConditionNearby,
    final.IsSpecialDay,
    final.vehicle_weight,
    final.traffic_weight,
    final.demand_score,
    final.normalized_demand,
    final.price_multiplier,
    final.price,
)

# Write to CSV (this defines what Pathway will write at pipeline execution)
csv.write(output_data, "model2_output.csv")

# Run the Pathway pipeline (this step actually processes and writes the CSV)
pw.run()

print("Model 2 processing complete!")
print(f"Base price: ${BASE_PRICE}")
print(f"Price range: ${0.5 * BASE_PRICE:.2f} - ${2 * BASE_PRICE:.2f}")
print("Output saved to model2_output.csv")

     

# Install necessary packages
!pip install bokeh panel --quiet

import pandas as pd
import bokeh.plotting
from bokeh.models import ColumnDataSource
from bokeh.palettes import Category20
import panel as pn

# Activate Panel (for interactive plotting)
pn.extension()

# Load the Pathway Model 2 output
df = pd.read_csv("model2_output.csv")

# Convert datetime string to datetime type for plotting
df['timestamp'] = pd.to_datetime(df['datetime_str'], errors='coerce')

# Drop rows with missing timestamps or prices (to avoid plotting issues)
df = df.dropna(subset=['timestamp', 'price', 'SystemCodeNumber'])

# Create Bokeh figure
fig = bokeh.plotting.figure(
    height=500,
    width=900,
    title="Parking Lot-wise Dynamic Pricing (Model 2 Output)",
    x_axis_type="datetime",
    tools="pan,wheel_zoom,box_zoom,reset,save"
)

# Assign colors to parking lots (up to 20 lots)
colors = Category20[20]
lot_ids = sorted(df['SystemCodeNumber'].unique())  # Unique lots

# lot each lot separately
for idx, lot_id in enumerate(lot_ids):
    lot_data = df[df['SystemCodeNumber'] == lot_id].sort_values('timestamp')
    source = ColumnDataSource(lot_data)

    fig.line(
        "timestamp", "price",
        source=source,
        line_width=2,
        color=colors[idx % len(colors)],
        legend_label=f"Lot {lot_id}"
    )

    fig.circle(
        "timestamp", "price",
        source=source,
        size=5,
        color=colors[idx % len(colors)]
    )

# Final plot settings
fig.xaxis.axis_label = "Timestamp"
fig.yaxis.axis_label = "Predicted Price"
fig.legend.location = "top_left"
fig.legend.click_policy = "hide"  # Allow interactive hide/show per lot

# Display the interactive plot
pn.Column(fig).servable()

     

AttributeError: module 'pathway' has no attribute 'schema_from_csv'