In [None]:
!pip install pathway --quiet
from google.colab import drive
drive.mount('/content/drive')
import pandas as pd
import pathway as pw

# Load dataset from Google Drive
df = pd.read_csv("/content/drive/MyDrive/Data_sets/dataset.csv")

# Merge Date + Time columns into one
df['Timestamp'] = pd.to_datetime(
    df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime'],
    format='%d-%m-%Y %H:%M:%S'
)

# Select relevant columns & rename for Pathway
df_model1 = df[['Timestamp', 'SystemCodeNumber', 'Occupancy', 'Capacity']].copy()
df_model1.columns = ['timestamp', 'lot_id', 'occupancy', 'capacity']

# Sort by lot_id and timestamp to ensure correct sequential processing
df_model1 = df_model1.sort_values(['lot_id', 'timestamp'])

# Clean data to avoid CSV parsing issues
df_model1['lot_id'] = df_model1['lot_id'].astype(str).str.replace(',', '_')  # Remove commas
df_model1 = df_model1.dropna()  # Remove any NaN values

print(f"Original data shape: {df_model1.shape}")
print(f"Data types:\n{df_model1.dtypes}")

# Pre-process stateful price calculation with pandas
def calculate_stateful_prices(df):
    df = df.copy()
    df['date'] = df['timestamp'].dt.date.astype(str)
    df['price'] = 0.0

    for lot_id in df['lot_id'].unique():
        lot_mask = df['lot_id'] == lot_id
        lot_data = df[lot_mask].copy().sort_values('timestamp')

        prev_price = 10.0
        last_day = None
        alpha = 5.0
        prices = []

        for idx, row in lot_data.iterrows():
            current_date = row['date']
            if current_date != last_day:
                prev_price = 10.0  # Reset price at start of new day

            # Ensure no division by zero
            if row['capacity'] > 0:
                price = prev_price + alpha * (row['occupancy'] / row['capacity'])
            else:
                price = prev_price

            price = round(min(max(price, 5), 20), 2)  # Clamp price between 5 and 20
            prices.append(price)

            prev_price = price
            last_day = current_date

        df.loc[lot_mask, 'price'] = prices

    return df

# Apply stateful calculation
print("Calculating stateful prices...")
df_with_prices = calculate_stateful_prices(df_model1)

# Clean the timestamp format for Pathway
df_with_prices['timestamp'] = df_with_prices['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')

# Save data with prices for Pathway with proper formatting
df_with_prices.to_csv("full_stream_data_with_prices.csv", index=False, quoting=1)  # Quote all fields

print(f"Data with prices shape: {df_with_prices.shape}")
print("Sample data:")
print(df_with_prices.head())

# Define Schema for data with prices
class ParkingWithPriceSchema(pw.Schema):
    timestamp: str
    lot_id: str
    occupancy: int
    capacity: int
    date: str
    price: float

# Read the pre-processed data into Pathway
try:
    input_table = pw.io.csv.read(
        "full_stream_data_with_prices.csv",
        schema=ParkingWithPriceSchema,
        mode="static"
    )

    # Apply transformations in Pathway
    output_table = input_table.select(
        timestamp=input_table.timestamp,
        lot_id=input_table.lot_id,
        occupancy=input_table.occupancy,
        capacity=input_table.capacity,
        date=input_table.date,
        price=input_table.price
    )

    # Write output to CSV with proper formatting
    pw.io.csv.write(output_table, "full_real_time_prices.csv")

    # Run Pathway pipeline
    print("Running Pathway pipeline...")
    pw.run()

    # Load and display results with error handling
    try:
        df_out = pd.read_csv("full_real_time_prices.csv", quoting=1)
        print("\nResults:")
        print(df_out.head())
        print(f"\nTotal records: {len(df_out)}")

        # Price analysis
        print(f"\nPrice Analysis:")
        print(f"Price range: {df_out['price'].min():.2f} - {df_out['price'].max():.2f}")
        print(f"Average price: {df_out['price'].mean():.2f}")
        print(f"Median price: {df_out['price'].median():.2f}")
        print(f"Standard deviation: {df_out['price'].std():.2f}")

        # Price distribution
        print(f"\nPrice Distribution:")
        print(df_out['price'].value_counts().sort_index().head(10))

        print(f"\nUnique lot_ids: {df_out['lot_id'].nunique()}")

        # Check for any data issues
        print(f"\nData quality check:")
        print(f"Missing values: {df_out.isnull().sum().sum()}")
        print(f"Duplicate rows: {df_out.duplicated().sum()}")

    except pd.errors.ParserError as e:
        print(f"Error reading output CSV: {e}")
        # Try reading with different parameters
        try:
            df_out = pd.read_csv("full_real_time_prices.csv", sep=',', quotechar='"', skipinitialspace=True)
            print("Successfully read with alternative parameters")
            print(df_out.head())
        except Exception as e2:
            print(f"Still failed: {e2}")
            # Show raw file content for debugging
            with open("full_real_time_prices.csv", 'r') as f:
                print("First few lines of output file:")
                for i, line in enumerate(f):
                    if i < 10:
                        print(f"Line {i}: {line.strip()}")
                    else:
                        break

except Exception as e:
    print(f"Error in Pathway pipeline: {e}")
    # Fallback: Just use pandas result
    print("Using pandas result as fallback:")
    print(df_with_prices.head())
    print(f"Total records: {len(df_with_prices)}")

    # Price analysis for fallback
    print(f"\nPrice Analysis:")
    print(f"Price range: {df_with_prices['price'].min():.2f} - {df_with_prices['price'].max():.2f}")
    print(f"Average price: {df_with_prices['price'].mean():.2f}")
    print(f"Median price: {df_with_prices['price'].median():.2f}")
    print(f"Standard deviation: {df_with_prices['price'].std():.2f}")

    # Price distribution
    print(f"\nPrice Distribution:")
    print(df_with_prices['price'].value_counts().sort_index().head(10))

    print(f"\nUnique lot_ids: {df_with_prices['lot_id'].nunique()}")

In [None]:
# Install required packages
!pip install bokeh panel --quiet

import pandas as pd
import bokeh.plotting
from bokeh.models import ColumnDataSource
from bokeh.palettes import Category20
import panel as pn

# Activate Panel
pn.extension()

# Load CSV robustly
df = pd.read_csv(
    "full_real_time_prices.csv",
    sep=',',
    quotechar='"',
    skipinitialspace=True,
    on_bad_lines='warn'
)

# Convert timestamp to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')

# Drop missing timestamps, prices, or lot_ids
df = df.dropna(subset=['timestamp', 'price', 'lot_id'])

# Prepare Bokeh figure
fig = bokeh.plotting.figure(
    height=500,
    width=900,
    title="Daily Parking Prices per Lot (Pathway Output)",
    x_axis_type="datetime",
    tools="pan,wheel_zoom,box_zoom,reset,save"
)

# Use Category20 palette (20 distinct colors)
colors = Category20[20]
lot_ids = sorted(df['lot_id'].unique())  # Sort lot IDs for consistent colors

# Plot each parking lot with a unique color
for idx, lot_id in enumerate(lot_ids):
    lot_data = df[df['lot_id'] == lot_id].sort_values('timestamp')
    source = ColumnDataSource(lot_data)

    fig.line("timestamp", "price", source=source, line_width=2,
             color=colors[idx % len(colors)], legend_label=f"Lot {lot_id}")

    fig.circle("timestamp", "price", source=source, size=5,
               color=colors[idx % len(colors)])

# Final touches
fig.xaxis.axis_label = "Timestamp"
fig.yaxis.axis_label = "Price"
fig.legend.location = "top_left"
fig.legend.click_policy = "hide"  # Allow hiding/showing lots interactively

# Display plot
pn.Column(fig).servable()
