Capstone project SA25

In [5]:
!pip install -q pathway 'bigframes>=2.8.0' --upgrade 'google-cloud-bigquery[bqstorage,pandas]>=3.31.0'

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.4/60.4 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.3/60.3 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.3/60.3 kB[0m [31m3.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.3/60.3 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.3/60.3 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.3/60.3 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m60.3/60.3 kB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━

In [6]:
!pip install bokeh --quiet

In [7]:
import pandas as pd
import numpy as np
import pathway as pw
import io
import time
from datetime import datetime
import pathway as pw
from bokeh.plotting import figure, show
from bokeh.models import ColumnDataSource
from bokeh.io import output_notebook, push_notebook

Data Loading

In [11]:
from google.colab import drive
drive.mount('/content/drive')

import os
file_path = '/content/drive/MyDrive/dataset.csv'
if os.path.exists(file_path):
    print("File exists.")
else:
    print("File does not exist.")

df = pd.read_csv('/content/drive/MyDrive/dataset.csv')
df.head()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
File exists.


Unnamed: 0,ID,SystemCodeNumber,Capacity,Latitude,Longitude,Occupancy,VehicleType,TrafficConditionNearby,QueueLength,IsSpecialDay,LastUpdatedDate,LastUpdatedTime
0,0,BHMBCCMKT01,577,26.144536,91.736172,61,car,low,1,0,04-10-2016,07:59:00
1,1,BHMBCCMKT01,577,26.144536,91.736172,64,car,low,1,0,04-10-2016,08:25:00
2,2,BHMBCCMKT01,577,26.144536,91.736172,80,car,low,2,0,04-10-2016,08:59:00
3,3,BHMBCCMKT01,577,26.144536,91.736172,107,car,low,2,0,04-10-2016,09:32:00
4,4,BHMBCCMKT01,577,26.144536,91.736172,150,bike,low,2,0,04-10-2016,09:59:00


Pricing Logic

In [21]:
BASE_PRICE = 10.0
PARAMS = {                                            #Define Model pricing Parameters
    'alpha': 0.4, 'beta': 0.3, 'gamma': 0.15,
    'delta': 0.1, 'epsilon': 0.05, 'lambda': 0.8
}
TRAFFIC_MAP = {'low': 1, 'medium': 2, 'high': 3}
VEHICLE_WEIGHT_MAP = {'car': 1.0, 'bike': 0.5, 'truck': 1.5}


@pw.udf
def calculate_dynamic_price(table: pw.Table) -> pw.Table:
    """
    Pathway User-Defined Function to calculate demand and price for a batch of data.
    It takes a Pathway Table, converts it to a pandas DataFrame, adds new columns,
    and returns a new Pathway Table.
    """

    df = table.to_pandas_dataframe()  # Convert Pathway Table to pandas DataFrame

    # Combine date and time columns
    df['LastUpdated'] = df['LastUpdatedDate'] + ' ' + df['LastUpdatedTime']

    df['occupancy_rate'] = df['Occupancy'] / df['Capacity']


    df['traffic_weight'] = df['TrafficConditionNearby'].map(TRAFFIC_MAP).fillna(1)
    df['vehicle_weight'] = df['VehicleType'].map(VEHICLE_WEIGHT_MAP).fillna(1.0)


    df['demand'] = (PARAMS['alpha'] * df['occupancy_rate'] +
                    PARAMS['beta'] * df['QueueLength'] -
                    PARAMS['gamma'] * df['traffic_weight'] +
                    PARAMS['delta'] * df['IsSpecialDay'] +
                    PARAMS['epsilon'] * df['vehicle_weight'])


    normalized_demand = np.tanh(df['demand'])                                               # Normalize demand
    price_adjustment = (1 + (PARAMS['lambda'] * normalized_demand)).clip(0.5, 2.0)
    df['dynamic_price'] = BASE_PRICE * price_adjustment


    df['timestamp'] = pd.to_datetime(df['LastUpdated'], format='%d-%m-%Y %H:%M:%S', errors='coerce')   # Create a proper timestamp

    return pw.Table.from_pandas_dataframe(df[['SystemCodeNumber', 'timestamp', 'dynamic_price']])

Pathway-Real-Time Data Pipeline

In [28]:
#schema
class ParkingInputStream(pw.Schema):
    SystemCodeNumber: str
    Capacity: int
    Occupancy: int
    LastUpdatedDate: str
    LastUpdatedTime: str
    Latitude: float
    Longitude: float
    VehicleType: str
    TrafficConditionNearby: str
    QueueLength: int
    IsSpecialDay: int



input_stream = pw.io.csv.read(
    file_path,
    schema=ParkingInputStream,
    mode='streaming',
    autocommit_duration_ms=10000,
)


pw.run()

Output()

Visualization

In [29]:

output_notebook()
source = ColumnDataSource(data=dict(time=[], price=[], competitor_price=[]))
p = figure(height=400, width=800, title="Real-Time Dynamic Parking Price",
           x_axis_type="datetime", x_axis_label='Time', y_axis_label='Price (USD)')
p.line(x='time', y='price', source=source, legend_label="Dynamic Price", color="navy", alpha=0.8, line_width=2)
p.line(x='time', y='competitor_price', source=source, legend_label="Competitor Price (Static)", color="firebrick", alpha=0.6, line_width=2, line_dash="dashed")
handle = show(p, notebook_handle=True)



for i in range(30): # Loop for 60 seconds
        time.sleep(2)
        try:
            current_output = output_table.to_pandas()

            if not current_output.empty and 'timestamp' in current_output.columns:

                current_output = current_output.sort_values(by='timestamp')


                current_output['competitor_price'] = BASE_PRICE * 1.25

                source.data = {
                    'time': current_output['timestamp'],
                    'price': current_output['dynamic_price'],
                    'competitor_price': current_output['competitor_price']
                }
                push_notebook(handle=handle)
        except Exception as e:
            print(f"inor read error ({e})")


inor read error (name 'output_table' is not defined)
inor read error (name 'output_table' is not defined)
inor read error (name 'output_table' is not defined)
inor read error (name 'output_table' is not defined)


KeyboardInterrupt: 