# Real-Time Taxi Suppy & Repositioning System (Interim)

### Objective:
#### Analyze Singapore cab availability data to identify high-demand areas, busiest times, and optimize driver allocation using Postgres for data management and Python for analysis and visualization.

### Data Source:
##### [1. API - LTA Data Mall (Taxi Availability)](https://datamall.lta.gov.sg/content/datamall/en/search_datasets.html?searchText=taxi)
##### [2. Excel - Singapore City Geo-Coordinates](https://www.kaggle.com/datasets/shymammoth/singapore-city-geo-coordinates-more-reliable?resource=download)

In [1]:
!pip install psycopg2-binary sqlalchemy ipython-sql



In [1]:
import pandas as pd
import requests
from datetime import datetime
import math
import time
import folium
from IPython.display import display, clear_output, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
from sqlalchemy import create_engine, inspect, text, types as satypes
import sqlite3
import urllib.parse

#### Function 1: Extract live taxi availability data

In [2]:
lta_api_key = '4f302dbbf247989be1536d500b3fe0e4' # Store LTA API key (replace if needed)
taxi_api_url = 'https://api.data.gov.sg/v1/transport/taxi-availability' # Store API URL to get real-time taxi availability data

def fetch_taxi_availability():
    """
    Fetch real-time taxi locations from the Data.gov.sg Taxi Availability API.

    Overview
    --------
    Sends a GET request to the public endpoint defined in the module-level
    constant `taxi_api_url`, using the LTA account key stored in
    `lta_api_key` (passed in the 'AccountKey' header for authorisation).
    The JSON response is parsed to extract taxi coordinates and a local
    timestamp is attached to every row.

    Returns
    -------
    pandas.DataFrame
        A DataFrame with one row per taxi at the time of the request, with
        columns:
            - 'longitude' : float
            - 'latitude'  : float
            - 'timestamp' : pandas.Timestamp (client-side time from pd.Timestamp.now())

        If the HTTP request fails (status != 200) or the response structure is not
        as expected, an **empty** DataFrame is returned and an explanatory message
        is printed.

    Behaviour
    ---------
    - Performs a single network call; no retry or backoff logic is included.
    - Uses client machine time for the 'timestamp' column.
    - Does not write to any database or files; it only returns a DataFrame.
    - Depends on the module-level constants `lta_api_key` and `taxi_api_url`.

    Examples
    --------
    >>> df = fetch_taxi_availability()
    >>> df.head()
       longitude   latitude                  timestamp
    0  103.8xxx    1.29xxx   2025-08-21 12:34:56.789012
    """

    headers = {'AccountKey': lta_api_key} # Set the headers to include API key for authorisation
    response = requests.get(taxi_api_url, headers=headers) # Send a GET request to the API using the URL and headers

    # Check if the response status is not 200 (which means success)
    # If the API request fails, print the error code and return an empty DataFrame
    if response.status_code != 200:
        print(f"API Error: {response.status_code}")
        return pd.DataFrame()

    data = response.json() # Convert API response to JSON format

    timestamp = pd.Timestamp.now() # Get the current date and time (used as a timestamp for the data)

    # Try to extract the taxi coordinates from the API response
    try:

        # Navigate through JSON structure to get coordinates by:
        # 1. Accessing the 'features' list from the JSON,
        # 2. Taking the first item [0] (which holds the taxi data),
        # 3. Going into its 'geometry' section,
        # 4. Then retrieving the list of 'coordinates' (longitude, latitude pairs)
        coords = data['features'][0]['geometry']['coordinates']

        taxi_data = pd.DataFrame(coords, columns=['longitude', 'latitude']) # Each coordinate represents a taxi's location
        taxi_data['timestamp'] = timestamp # Adds a new column called timestamp, to record when the data was fetched
        return taxi_data # Return needed for data to be used outside of this function

    # Handle the case where the expected data structure is not found
    except KeyError as e:

        print(f"Data structure error: {e}. Check the API response format.") # Handle errors when expected keys are missing from the JSON
        return pd.DataFrame() # Return an empty DataFrame if the expected data is not found

#### Function 2: Extract Singapore geo-coordinates data

In [3]:
geolocations_csv_path = "singapore_city_coordinates.csv"

def load_geolocations(csv_path):
    """
    Load the geolocations CSV file into a pandas DataFrame.

    Parameters
    ----------
    csv_path : str
        The file path to a CSV containing place metadata and bounding boxes.
        Typical columns include:
          ['Place', 'City', 'Area', 'latitude', 'longitude',
           'bounding_box_1', 'bounding_box_2', 'bounding_box_3', 'bounding_box_4'].

    Returns
    -------
    pandas.DataFrame
        The raw geolocations data as read from disk. No cleaning or validation
        is performed here; downstream steps (e.g., clean_dataframe) handle that.

    Notes
    -----
    • This function simply reads the file; it does not modify global state.
    • Use `geolocations_csv_path` as a convenient default when calling this function.
    """

    geolocations_data = pd.read_csv(csv_path) # Read and store csv data
    return geolocations_data # Return needed for data to be used outside of this function

#### Function 3: Clean data

In [4]:
def clean_dataframe(df):
    """
    Clean a pandas DataFrame for downstream use.

    Steps performed:
      1) Drop any rows that contain missing values (NaN/None/NaT) in any column.
      2) Trim leading and trailing whitespace from the column names only.
         (Note: cell values are not trimmed.)
      3) Reset the index to a simple 0..n-1 range after rows are dropped.

    Parameters
    ----------
    df : pandas.DataFrame
        The input table to clean.

    Returns
    -------
    pandas.DataFrame
        A new DataFrame with the above cleaning steps applied. The original
        DataFrame passed in is not modified.
    """

    # Null policy: drop rows with any missing values across the row
    df = df.dropna()

    # Strip whitespace from column names only (not from the cell values)
    new_columns = []
    for col in df.columns:
        col_clean = col.strip()
        new_columns.append(col_clean)

    df.columns = new_columns

    # Reset index after dropping rows so it runs from 0 to n-1
    df = df.reset_index(drop=True)

    return df

#### Function 4: Data Visualisation (Live service)

In [5]:
def plot_taxi_availability_live(
    taxi_data_func,           # Takes in Function 1
    geolocations_df,          # Takes in output from Function 3
    refresh_count=10,
    sleep_seconds=3,
    include_unknown=True,
    map_width_percent=60,     # Map takes 60% of the width
    table_max_height_px=600,
    nearby_km=5.0,            # Threshold for what's considered near in kilometres
    low_count_threshold=5     # Threshold for low taxi = current_count < 5
):

    """
    Function 4 — Render a live view of taxi availability and produce batch data.

    Purpose
    -------
    Repeatedly fetches live taxi locations using the callable passed in (typically
    Function 1: `fetch_taxi_availability`), assigns each taxi to a Place based on
    bounding boxes from `geolocations_df` (cleaned via Function 3), renders a
    Folium map alongside a table of counts, and builds:
      • A final snapshot table after the last refresh in this batch.
      • A raw history DataFrame containing all taxi rows observed this batch.

    Parameters
    ----------
    taxi_data_func : callable
        A zero-argument function that returns a DataFrame with columns
        ['longitude', 'latitude', 'timestamp'] (e.g., Function 1).
    geolocations_df : pandas.DataFrame
        Cleaned geolocation/bounding-box data (e.g., output of Function 3) with
        columns ['Place', 'latitude', 'longitude', 'bounding_box_1'..'bounding_box_4'].
    refresh_count : int, default 10
        Number of live refresh cycles to run for this batch.
    sleep_seconds : int or float, default 3
        Seconds to pause between refreshes (to avoid hammering the API/UI).
    include_unknown : bool, default True
        If True, taxis that do not fall into any bounding box are labelled 'Unknown'
        and included in counts; otherwise they are filtered out.
    map_width_percent : int, default 60
        Percentage width allocated to the map in the split layout (min 50, max 90).
    table_max_height_px : int, default 600
        Maximum height for the right-hand table panel (scrolls if exceeded).
    nearby_km : float, default 5.0
        Distance threshold (kilometres) for “nearby place” recommendations.
    low_count_threshold : int, default 5
        If a Place’s current_count is below this threshold, a nearby alternative
        with more taxis is suggested.

    Returns
    -------
    final_output : pandas.DataFrame
        Snapshot after the last refresh with columns:
        ['Place', 'current_count', 'prev_count', 'recommendation', 'timestamp'].
    taxis_raw_history : pandas.DataFrame
        All raw taxi rows captured across all refreshes this batch with columns:
        ['longitude', 'latitude', 'timestamp', 'Place', 'refresh_index'].

    Notes
    -----
    • Designed for use in a Jupyter environment: it clears and re-renders HTML output.
    • This function performs no database I/O; it only prepares DataFrames and UI.
    • The “recommendation” column suggests a nearby Place with a higher count when
      the current Place’s count is low.
    """

    # --------------------------- Inner/nested functions: Only usable within Function 4 ---------------------------

    # Inner Function 4.1
    # - Reads latitude and longtitude of each row in cleaned_geolocations_data to check if it is within the bounding range of a Place (e.g., Toa Payoh, Bishan)
    # - Min and max used to check the boundaries
    # - If no places match by the end of the loop, it returns 'Unknown'
    def assign_place(lat, lon, places_df):
        for _, row in places_df.iterrows():
            lat_min = min(row['bounding_box_1'], row['bounding_box_2'])
            lat_max = max(row['bounding_box_1'], row['bounding_box_2'])
            lon_min = min(row['bounding_box_3'], row['bounding_box_4'])
            lon_max = max(row['bounding_box_3'], row['bounding_box_4'])
            if (lat_min <= lat <= lat_max) and (lon_min <= lon <= lon_max):
                return row['Place']
        return 'Unknown'

    # Inner Function 4.2
    # - Compute the great-circle distance (shortest path over the Earth’s surface) between two latitude/longitude points, and return it in kilometres
    # - Used to compare the current place’s centroid and each candidate’s centroid to power the ‘nearby place’ recommendation
    def haversine_km(lat1, lon1, lat2, lon2):
        R = 6371.0
        phi1 = math.radians(lat1)
        phi2 = math.radians(lat2)
        dphi = math.radians(lat2 - lat1)
        dlmb = math.radians(lon2 - lon1)
        a = math.sin(dphi/2)**2 + math.cos(phi1)*math.cos(phi2)*math.sin(dlmb/2)**2
        c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
        return R * c

    centroids = (
        geolocations_df[['Place', 'latitude', 'longitude']]
        .dropna(subset=['Place', 'latitude', 'longitude'])
        .drop_duplicates(subset=['Place'])
        .rename(columns={'latitude': 'place_lat', 'longitude': 'place_lon'})
    )

    # Inner Function 4.3
    # - Map total taxis plus timestamp to each Place
    # - Used to get current_count and prev_count
    def build_counts(taxi_df, places_df, allow_unknown):
        taxi_df = taxi_df.copy()
        # Vectorised place assignment using precomputed bbox_df (faster than per-row .apply)
        taxi_df['Place'] = 'Unknown'
        # Assign per Place in batches (avoids inner Python loop over taxis)
        for row in bbox_df.itertuples(index=False):
            mask = (
                (taxi_df['Place'] == 'Unknown') &
                (taxi_df['latitude'].between(row.lat_min, row.lat_max)) &
                (taxi_df['longitude'].between(row.lon_min, row.lon_max))
            )
            taxi_df.loc[mask, 'Place'] = row.Place

        if allow_unknown:
            counts = taxi_df.groupby('Place', dropna=False).size().reset_index(name='current_count')
        else:
            counts = (
                taxi_df[taxi_df['Place'] != 'Unknown']
                .groupby('Place', dropna=False).size().reset_index(name='current_count')
            )
        counts['timestamp'] = taxi_df['timestamp'].iloc[0]
        return counts, taxi_df

    # Inner Function 4.4
    # Purpose:
    # - Fill a new column called 'recommendation' for each Place.
    # - If a Place has a low taxi count (current_count < low_threshold), suggest another Place that has a higher taxi count and is reasonably nearby
    # - If no good suggestion is found (or the Place is not low), put '-' instead
    def add_recommendations(merged_df, centroids_df, nearby_threshold_km, low_threshold):
        candidates = (
            merged_df[['Place', 'current_count']]
            .merge(centroids_df, on='Place', how='left')
            .dropna(subset=['place_lat', 'place_lon'])
            .copy()
        )
        recommendations = []
        for _, row in merged_df.iterrows():
            this_count = int(row['current_count'])
            this_lat   = row.get('place_lat', None)
            this_lon   = row.get('place_lon', None)
            suggestion = '-'
            if pd.notna(this_lat) and pd.notna(this_lon):
                higher = candidates[candidates['current_count'] > this_count].copy()
                if not higher.empty:
                    # Vectorise distance calculation for the candidates
                    lat2 = higher['place_lat'].to_numpy()
                    lon2 = higher['place_lon'].to_numpy()
                    # Compute distances in a small loop to keep code simple and comments intact
                    dists = [haversine_km(this_lat, this_lon, lat2[i], lon2[i]) for i in range(len(higher))]
                    higher = higher.assign(dist_km=dists)

                    within = higher[higher['dist_km'] <= nearby_threshold_km]
                    if not within.empty:
                        best = within.sort_values(['dist_km','current_count'], ascending=[True,False]).iloc[0]
                    else:
                        best = higher.sort_values('dist_km').iloc[0]
                    suggestion = f"{best['Place']} ({int(best['current_count'])}) ~{best['dist_km']:.1f} km"
            recommendations.append(suggestion if this_count < low_threshold and suggestion != '-' else '-')
        merged_df = merged_df.copy()
        merged_df['recommendation'] = recommendations
        return merged_df

    # --------------------------- Pre-compute centroids (used for distance calculations) ---------------------------

    map_width_percent = max(50, min(map_width_percent, 90))
    prev_counts = pd.DataFrame(columns=['Place', 'current_count'])
    total_rows_shown = 0

    raw_history_rows = []  # raw data per-taxi rows per refresh
    final_output = pd.DataFrame(columns=['Place','current_count','prev_count','recommendation','timestamp'])

    # Precompute bbox mins/maxes once to avoid doing it every refresh
    bbox_df = geolocations_df.copy()
    bbox_df['lat_min'] = bbox_df[['bounding_box_1','bounding_box_2']].min(axis=1)
    bbox_df['lat_max'] = bbox_df[['bounding_box_1','bounding_box_2']].max(axis=1)
    bbox_df['lon_min'] = bbox_df[['bounding_box_3','bounding_box_4']].min(axis=1)
    bbox_df['lon_max'] = bbox_df[['bounding_box_3','bounding_box_4']].max(axis=1)
    bbox_df = bbox_df[['Place','lat_min','lat_max','lon_min','lon_max']].dropna()

    # ----------------------------------------------- Main refresh loop --------------------------------------------

    # Step 1 - Run the refresh loop once per iteration
    for refresh_index in range(1, refresh_count + 1):
        t0 = time.perf_counter()  # Print timings for each refresh

        # Step 2 - Fetch live taxi data
        taxi_df = taxi_data_func()
        if taxi_df is None or taxi_df.empty:
            clear_output(wait=True)
            display(HTML("<p><strong>No taxi data returned. Trying again…</strong></p>"))
            time.sleep(sleep_seconds)
            continue

        # Step 3 - Capture aggregated (taxi counts by place) this refresh
        counts, taxi_df = build_counts(taxi_df, geolocations_df, include_unknown)

        # Step 4 - Capture non-aggregated (raw data) taxi info per row
        raw = taxi_df[['longitude', 'latitude', 'timestamp', 'Place']].copy()
        if not include_unknown:
            raw = raw[raw['Place'] != 'Unknown']
        raw['refresh_index'] = refresh_index       # label each raw row with the refresh number
        raw_history_rows.append(raw)               # store for stitching together at the end

        # Step 5 - Join current counts with previous counts and prepare table columns
        merged = counts.merge(prev_counts, on='Place', how='left', suffixes=('', '_prev'))
        merged.rename(columns={'current_count_prev': 'prev_count'}, inplace=True)
        merged['prev_count'] = (
            pd.to_numeric(merged['prev_count'], errors='coerce')  # convert safely to numeric
            .fillna(0)                                            # no previous value → 0
            .astype('int64')                                      # make it an integer column
        )

        # Step 6 - Add centroids and compute the recommendation text for low-count Places
        merged = merged.merge(centroids, on='Place', how='left')
        merged = add_recommendations(merged, centroids, nearby_km, low_count_threshold)

        # Step 7 - Keep only the columns needed for display/return in the desired order
        merged = merged[['Place', 'current_count', 'prev_count', 'recommendation', 'timestamp']]

        # Step 8 - Save this refresh’s final table and update status counters
        final_output = merged.copy()                 # snapshot to return after the loop finishes
        total_rows_shown += len(raw)                 # used only for the status line on the right

        # Step 9 - Build left panel (map)
        m = folium.Map(location=[1.3521, 103.8198], zoom_start=12)
        # Faster iteration for markers, same visual result
        for r in taxi_df.itertuples(index=False):
            folium.CircleMarker(
                location=[r.latitude, r.longitude],
                radius=2, color='blue', fill=True, fill_opacity=0.6
            ).add_to(m)

        # Step 10 - Build right panel (status + table)
        table_df = (
            merged[['Place','current_count','prev_count','recommendation']]
            .sort_values('current_count', ascending=True)
            .reset_index(drop=True)
        )
        status_html = (
            "<div style='font-size:12px;color:#aaa;margin:0 0 8px 0;'>"
            f"Refresh: <strong>{refresh_index}</strong> of <strong>{refresh_count}</strong> &nbsp;|&nbsp; "
            f"Time: <strong>{str(merged['timestamp'].iloc[0])}</strong> &nbsp;|&nbsp; "
            f"Rows stored (cumulative): <strong>{total_rows_shown}</strong> &nbsp;|&nbsp; "
            f"Total taxis this refresh: <strong>{len(taxi_df)}</strong>"
            "</div>"
        )
        map_html = m._repr_html_()
        table_html = "<h3 style='margin:0 0 8px 0;'>Taxi availability by Place</h3>" + status_html + table_df.to_html(index=False, border=0)
        split_html = f"""
        <div style="display:flex; gap:12px; align-items:flex-start;">
            <div style="flex:0 0 {map_width_percent}%; min-width:400px; border:1px solid #ddd; border-radius:6px; overflow:hidden;">
                {map_html}
            </div>
            <div style="flex:1; border:1px solid #ddd; border-radius:6px; padding:12px; overflow:auto; max-height:{int(table_max_height_px)}px;">
                {table_html}
            </div>
        </div>
        """
        clear_output(wait=True)
        display(HTML(split_html))

        prev_counts = counts[['Place','current_count']].copy()
        time.sleep(sleep_seconds)

    # ------------------------------ Save accumulated data into a single DataFrame ---------------------------------------

    # Raw data saved to variable
    taxis_raw_history = pd.concat(raw_history_rows, ignore_index=True) if raw_history_rows else \
        pd.DataFrame(columns=['longitude','latitude','timestamp','Place','refresh_index'])

    return final_output, taxis_raw_history

#### Function 5: Transform data

In [6]:
import pandas as pd

def transform_data(df):
    """
    Prepare two derived DataFrames from the input geolocations DataFrame and
    return a copy of the input with tidy column names.

    Produces two module-level globals (used later by the database loader):
      1) transformed_places_df
         - One row per unique place (identified by Place, City, Area, latitude, longitude).
         - Adds a local surrogate key 'place_id' (starting at 1) for easy joining.

      2) transformed_bounding_boxes_df
         - LONG shape (one row per corner per place).
         - Columns: ['place_id', 'corner_no', 'coordinate'] where corner_no is 1..4.

    Returns:
      df_out: a copy of the original DataFrame with column names trimmed,
              lower-cased, and pluralised (to keep downstream code consistent).

    Notes:
      • This function does not create any database tables. It only prepares
        in-memory DataFrames; actual database writes happen elsewhere.
      • The 'place_id' created here is a local, in-memory identifier to help
        with joins. Your database may maintain its own primary keys.
    """

    # Work on a copy so the caller’s DataFrame is not altered
    df_input = df.copy()

    # --------------------------- Prepare columns needed to represent places and bounding boxes --------------------------

    # Ensure text columns exist (use empty strings if missing) so selection/joins do not fail
    if 'City' not in df_input.columns:
        df_input['City'] = ''
    if 'Area' not in df_input.columns:
        df_input['Area'] = ''

    # Ensure all columns that define a unique place exist; fill sensible defaults if missing
    needed_for_places = ['Place', 'City', 'Area', 'latitude', 'longitude']
    for col in needed_for_places:
        if col not in df_input.columns:
            df_input[col] = '' if col in ['Place', 'City', 'Area'] else None

    # --------------------------- Build the 'places' derived DataFrame (one row per unique place) ----------------------

    # Keep only identifying columns, drop exact duplicates, then add a local surrogate key
    places_df = (
        df_input[needed_for_places]
        .drop_duplicates()
        .reset_index(drop=True)
    )
    places_df['place_id'] = places_df.index + 1  # local, in-memory ID for joining

    # Attach the local place_id back to each original row using the identifying columns
    df_merged = df_input.merge(
        places_df,
        on=['Place', 'City', 'Area', 'latitude', 'longitude'],
        how='left'
    )

    # ---------------------- Build the LONG 'bounding_boxes' derived DataFrame (one row per corner) -------------------

    # Guarantee the four bounding-box columns exist so iteration is safe
    bbox_cols = ['bounding_box_1', 'bounding_box_2', 'bounding_box_3', 'bounding_box_4']
    for col in bbox_cols:
        if col not in df_merged.columns:
            df_merged[col] = None

    # For each row, emit up to four records: (place_id, corner_no, coordinate)
    bounding_boxes = []
    for _, row in df_merged.iterrows():
        for i, col in enumerate(bbox_cols, start=1):
            if pd.notna(row[col]):
                bounding_boxes.append({
                    'place_id': int(row['place_id']),
                    'corner_no': i,
                    'coordinate': row[col]
                })

    bounding_boxes_df = pd.DataFrame(bounding_boxes, columns=['place_id', 'corner_no', 'coordinate'])

    # ---------------------------- Expose the two derived DataFrames as module-level globals ---------------------------

    # These are read by the database-loading function to write into PostgreSQL.
    global transformed_places_df
    global transformed_bounding_boxes_df
    transformed_places_df = places_df
    transformed_bounding_boxes_df = bounding_boxes_df

    # --------------------------- Return a normalised copy of the input with tidy headers -----------------------

    # Trim whitespace, lower-case names, and add a trailing 's' if not present (simple pluralisation)
    df_out = df.copy()
    new_columns = []
    for col in df_out.columns:
        col_trans = str(col).strip().lower()
        if not col_trans.endswith('s'):
            col_trans += 's'
        new_columns.append(col_trans)
    df_out.columns = new_columns

    return df_out

#### Function 6: Create Schema

In [7]:
def ensure_schema():
    """
    Create the PostgreSQL tables required by the pipeline if they do not already exist.

    Behaviour:
      • Uses a single transaction (`engine.begin()`) so either all tables are ensured or none are changed.
      • Uses `CREATE TABLE IF NOT EXISTS` → idempotent (safe to call many times).
      • Does not alter existing table shapes or migrate data; it only creates tables when missing.

    Tables and relationships (as defined below):
      1) places
         - Columns: place_id (SERIAL, PK), "Place" (UNIQUE, NOT NULL), "City", "Area", latitude, longitude.
         - "Place" acts as a natural key for upserts in the loader.

      2) bounding_boxes  (per-corner / LONG form)
         - Columns: place_id, corner_no (1..4), coordinate.
         - Composite primary key (place_id, corner_no) → at most one coordinate per corner per place.
         - Foreign key place_id → places(place_id) with ON DELETE CASCADE.
         - Relationship: places 1 → many bounding_boxes (one row per corner).

      3) taxis  (raw history)
         - Columns: taxi_id (BIGSERIAL, PK), longitude, latitude, timestamp, place, refresh_index.
         - Foreign key place → places("Place") with ON DELETE SET NULL (keeps taxi history even if a place is removed).
         - Relationship: places 1 → many taxis via the place name.
    """

    engine = create_engine('postgresql://postgres:8wnryjaG@localhost:5432/SGCabs')

    # Use a single transaction for all DDL; commits automatically on success.
    with engine.begin() as conn:
        # 1) places: master list of known places (natural key = "Place")
        conn.execute(text("""
        CREATE TABLE IF NOT EXISTS places (
            place_id SERIAL PRIMARY KEY,
            "Place" VARCHAR(128) UNIQUE NOT NULL,
            "City"  VARCHAR(128),
            "Area"  VARCHAR(128),
            latitude  DOUBLE PRECISION,
            longitude DOUBLE PRECISION
        );
        """))

        # 2) bounding_boxes: one row per corner (LONG form), linked to places by place_id
        conn.execute(text("""
        CREATE TABLE IF NOT EXISTS bounding_boxes (
            place_id   INT NOT NULL,
            corner_no  INT NOT NULL,
            coordinate DOUBLE PRECISION,
            CONSTRAINT pk_bounding_boxes PRIMARY KEY (place_id, corner_no),
            CONSTRAINT fk_bbox_place FOREIGN KEY (place_id)
                REFERENCES places(place_id)
                ON DELETE CASCADE
        );
        """))

        # 3) taxis: append-only observations; linked to places by place name
        conn.execute(text("""
        CREATE TABLE IF NOT EXISTS taxis (
            taxi_id       BIGSERIAL PRIMARY KEY,
            longitude     DOUBLE PRECISION,
            latitude      DOUBLE PRECISION,
            timestamp     TIMESTAMP,
            place         VARCHAR(128),
            refresh_index INT,
            CONSTRAINT fk_taxis_place FOREIGN KEY (place)
                REFERENCES places("Place")
                ON DELETE SET NULL
        );
        """))

    print("Schema ensured: places, bounding_boxes, taxis")

#### Function 7: Load Postgres

In [8]:
def load_postgres():
    """
    Load pre-prepared DataFrames into PostgreSQL.

    Assumptions:
      • Tables (places, bounding_boxes, taxis) already exist and constraints were
        created elsewhere (e.g., in ensure_schema()).
      • The following globals were set earlier (e.g., by transform_data()):
          - transformed_places_df               : one row per place (local place_id added)
          - transformed_bounding_boxes_df       : LONG shape; one row per corner (1..4) per place
          - transformed_taxis_raw_history       : raw taxi rows collected during the batch

    What this function does:
      1) places
         - Upsert by natural key "Place" (so no local place_id is written here).
         - If a place already exists, its city/area/coords are updated.

      2) bounding_boxes  (LONG/per-corner)
         - Join local place_id → Place (from transformed_places_df).
         - Resolve Place → database place_id (by reading from places).
         - Upsert per corner using (place_id, corner_no) as the conflict target.

      3) taxis
         - Append raw history rows as-is.
    """

    engine = create_engine('postgresql://postgres:8wnryjaG@localhost:5432/SGCabs')
    insp = inspect(engine)

    # --------------------------------------- 1) UPDATE/INSERT into places ---------------------------------------

    # Use "Place" as the unique natural key
    if 'transformed_places_df' in globals() and isinstance(transformed_places_df, pd.DataFrame):
        df_places = transformed_places_df.copy()

        # Keep only the columns that actually belong in the 'places' table
        needed_cols = ['Place', 'City', 'Area', 'latitude', 'longitude']
        df_places = df_places[[c for c in needed_cols if c in df_places.columns]]

        # Avoid redundant work by de-duplicating on "Place"
        if 'Place' in df_places.columns:
            df_places = df_places.drop_duplicates(subset=['Place'])
        else:
            df_places = pd.DataFrame(columns=needed_cols)

        if df_places.empty:
            print("Nothing to write for 'places'.")
        else:
            # ON CONFLICT ensures duplicates on "Place" will update rather than fail
            rows = df_places.to_dict(orient='records')
            upsert_sql_places = text("""
                INSERT INTO places ("Place","City","Area",latitude,longitude)
                VALUES (:Place, :City, :Area, :latitude, :longitude)
                ON CONFLICT ("Place") DO UPDATE
                SET "City" = EXCLUDED."City",
                    "Area" = EXCLUDED."Area",
                    latitude = EXCLUDED.latitude,
                    longitude = EXCLUDED.longitude;
            """)
            # Use a transaction so all rows succeed or none do
            with engine.begin() as conn:
                conn.execute(upsert_sql_places, rows)
            print(f"Upserted {len(rows)} place(s) into 'places'.")

        # After upserting places, read back the authoritative mapping
        # Place → database place_id for later joins
        with engine.connect() as conn:
            id_map = pd.read_sql(
                text('SELECT place_id, "Place" FROM places'),
                conn
            )
            # Rename for clarity in subsequent merges
            id_map.rename(columns={'place_id': 'place_id_db'}, inplace=True)

    else:
        print("Variable 'transformed_places_df' not found or empty. Skipping 'places'.")
        id_map = pd.DataFrame(columns=['place_id_db', 'Place'])

    # --------------------------------- 2) UPDATE/INSERT into bounding_boxes -------------------------------------

    # Build LONG/per-corner rows with the correct database place_id:
    #   Step A: local place_id → Place (from transformed_places_df)
    #   Step B: Place → DB place_id (from id_map)
    #   Step C: ensure unique (place_id, corner_no), then upsert
    if 'transformed_bounding_boxes_df' in globals() and isinstance(transformed_bounding_boxes_df, pd.DataFrame):
        if transformed_bounding_boxes_df.empty:
            print("Nothing to write for 'bounding_boxes'.")
        elif id_map.empty:
            print("No Place → place_id mapping available; skipping 'bounding_boxes'.")
        elif 'transformed_places_df' not in globals() or not isinstance(transformed_places_df, pd.DataFrame) or transformed_places_df.empty:
            print("No transformed_places_df available to attach Place names; skipping 'bounding_boxes'.")
        else:
            # Step A: attach the Place name to each bounding box row using the local mapping
            #   transformed_bounding_boxes_df has: place_id (local), corner_no, coordinate
            #   transformed_places_df has:        place_id (local), Place
            local_map = transformed_places_df[['place_id', 'Place']].drop_duplicates()
            df_bbox = transformed_bounding_boxes_df.merge(
                local_map, on='place_id', how='left'
            )

            # Step B: resolve database place_id via Place
            df_bbox = df_bbox.merge(
                id_map[['place_id_db', 'Place']], on='Place', how='inner'
            )

            # Step C: select final columns, drop incomplete/duplicate keys and upsert
            df_bbox_final = df_bbox[['place_id_db', 'corner_no', 'coordinate']].copy()
            df_bbox_final.rename(columns={'place_id_db': 'place_id'}, inplace=True)
            df_bbox_final = df_bbox_final.dropna(subset=['place_id', 'corner_no'])
            df_bbox_final = df_bbox_final.drop_duplicates(subset=['place_id', 'corner_no'])

            if df_bbox_final.empty:
                print("Nothing to write for 'bounding_boxes' after mapping.")
            else:
                rows = df_bbox_final.to_dict(orient='records')
                upsert_sql_bbox = text("""
                    INSERT INTO bounding_boxes (place_id, corner_no, coordinate)
                    VALUES (:place_id, :corner_no, :coordinate)
                    ON CONFLICT (place_id, corner_no) DO UPDATE
                    SET coordinate = EXCLUDED.coordinate;
                """)
                with engine.begin() as conn:
                    conn.execute(upsert_sql_bbox, rows)
                print(f"Upserted {len(rows)} row(s) into 'bounding_boxes'.")
    else:
        print("Variable 'transformed_bounding_boxes_df' not found or empty. Skipping 'bounding_boxes'.")

    # --------------------------------------- 3) APPEND into taxis ----------------------------------------

    # 'taxis' is an append-only raw history table. Standardise columns, coerce timestamp, then append
    if 'transformed_taxis_raw_history' in globals() and isinstance(transformed_taxis_raw_history, pd.DataFrame):
        df_taxi = transformed_taxis_raw_history.copy()

        # Standardise expected names (harmless if already correct)
        df_taxi.rename(columns={
            'longitudes': 'longitude',
            'latitudes': 'latitude',
            'timestamps': 'timestamp',
            'places': 'place',
            'refresh_indexs': 'refresh_index',
        }, inplace=True)

        # Keep only the columns that belong in the 'taxis' table
        taxi_cols = ['longitude', 'latitude', 'timestamp', 'place', 'refresh_index']
        df_taxi = df_taxi[[c for c in taxi_cols if c in df_taxi.columns]]

        # Ensure the timestamp is a proper datetime
        if 'timestamp' in df_taxi.columns:
            df_taxi['timestamp'] = pd.to_datetime(df_taxi['timestamp'], errors='coerce')

        if df_taxi.empty:
            print("Nothing to write for 'taxis'.")
        else:
            # Append rows; foreign key to places("Place") is enforced by the database
            df_taxi.to_sql('taxis', engine, if_exists='append', index=False)
            print(f"Appended {len(df_taxi)} row(s) into 'taxis'.")
    else:
        print("No transformed taxis data found. Skipping 'taxis'.")

#### Function 8: Orchestrating function

In [9]:
def run_taxi_pipeline(num_batches=50, refresh_count=5, sleep_seconds=5, include_unknown=True):
    """
    Orchestrates the full taxi data pipeline by reusing previously defined functions.

    Functions used/reused in order of appearance:
      • Function 6: ensure_schema()
          Ensures the required PostgreSQL tables and relationships exist before any data is processed.
      • Function 2: load_geolocations(csv_path)
          Reads the geolocations CSV from disk.
      • Function 3: clean_dataframe(df)
          Applies simple cleaning (drop NA rows, trim headers, reset index).
      • Function 4: plot_taxi_availability_live(taxi_data_func, geolocations_df, ...)
          Repeatedly fetches live taxi locations using Function 1, renders an interactive map/table,
          and returns (a) a final snapshot DataFrame and (b) all raw taxi rows gathered this batch.
      • Function 5: transform_data(df)
          Prepares in-memory “derived” tables (places, bounding_boxes) and a normalised copy of the input.
      • Function 7: load_postgres()
          Writes the transformed DataFrames into PostgreSQL (upserts places/bounding_boxes; appends taxis).

    Parameters:
      num_batches     – Number of batches to run.
      refresh_count   – Number of live refreshes per batch (passed to Function 4).
      sleep_seconds   – Delay between refreshes within Function 4 (and a fixed pause at the end of each batch).
      include_unknown – Whether taxis outside any bounding box should be kept as 'Unknown'.
    """

    # ----------------------------------------------- Step 1 -----------------------------------------------

    # Function 6: ensure that PostgreSQL schema exists before any I/O happens
    ensure_schema()

    # Declare globals so that Function 6 (load_postgres) can access these DataFrames
    global geolocations_data
    global cleaned_geolocations_data
    global transformed_geolocations_data
    global transformed_taxis_raw_history

    # ----------------------------------------------- Step 2 -----------------------------------------------

    # Function 2 & 3: load the geolocations dataset from CSV and clean it once at the start
    geolocations_data = load_geolocations(geolocations_csv_path)
    cleaned_geolocations_data = clean_dataframe(geolocations_data)

    # Prepare placeholders to store the most recent batch outputs (handy if you later return them)
    last_final_output = pd.DataFrame()
    last_transformed_taxis_raw_history = pd.DataFrame()

    # ----------------------------------------------- Step 3 -----------------------------------------------

    # Function 1 & 4: iterate over batches, fetching live data and rendering the map/table each time
    for batch_number in range(1, num_batches + 1):
        print(f"Starting batch {batch_number} of {num_batches}...")

        # Collect taxi availability repeatedly (refresh_count times), render live output, and obtain:
        #   - final_output:      snapshot after the last refresh of this batch
        #   - taxis_raw_history: all raw taxi rows collected during this batch
        final_output, taxis_raw_history = plot_taxi_availability_live(
            taxi_data_func=fetch_taxi_availability,
            geolocations_df=cleaned_geolocations_data,
            refresh_count=refresh_count,
            sleep_seconds=sleep_seconds,
            include_unknown=include_unknown
        )

        # ----------------------------------------------- Step 4 -----------------------------------------------

        # Function 5: transform both the per-batch taxi rows and the geolocations data
        # (Calling transform_data on geolocations again each batch keeps the flow explicit and consistent.)
        transformed_taxis_raw_history = transform_data(taxis_raw_history.copy())
        transformed_geolocations_data = transform_data(cleaned_geolocations_data.copy())

        # Keep the latest results so they are available after the loop (optional)
        last_final_output = final_output.copy()
        last_transformed_taxis_raw_history = transformed_taxis_raw_history.copy()

        # ----------------------------------------------- Step 4 -----------------------------------------------

        # Function 7: write transformed DataFrames into PostgreSQL
        # - Upserts places/bounding_boxes based on the transformed geolocations
        # - Appends the taxi history for this batch
        load_postgres()

        # Batch summary:
        # - Number of taxi rows processed in this batch
        # - The most recent snapshot timestamp
        loaded_rows = len(transformed_taxis_raw_history) if isinstance(transformed_taxis_raw_history, pd.DataFrame) else 0
        if not final_output.empty and 'timestamp' in final_output.columns:
            latest_time = str(final_output['timestamp'].iloc[0])
        else:
            latest_time = "N/A"

        print(f"Batch {batch_number} completed. Taxi rows loaded (this batch): {loaded_rows}. Latest snapshot time: {latest_time}.")

        # Short pause before starting the next batch to keep logs readable
        time.sleep(5)

In [10]:
run_taxi_pipeline()

Place,current_count,prev_count,recommendation
Admiralty,1,1,Woodlands East (5) ~1.6 km
Toh Tuck,1,1,West Coast (20) ~1.6 km
Rochor Canal,1,1,Bugis (4) ~0.7 km
People's Park,1,1,Cecil (38) ~0.9 km
Nicoll,1,1,Bugis (4) ~0.9 km
Mandai Estate,1,1,Yew Tee (5) ~1.7 km
Lim Chu Kang,1,1,Kranji (2) ~5.3 km
Ghim Moh,1,2,Ulu Pandan (30) ~1.1 km
Farrer Court,1,1,Selegie (5) ~1.2 km
Yuhua East,1,0,Lakeside (Business) (4) ~1.2 km


Upserted 314 place(s) into 'places'.
Upserted 1256 row(s) into 'bounding_boxes'.
Appended 10540 row(s) into 'taxis'.
Batch 7 completed. Taxi rows loaded (this batch): 10540. Latest snapshot time: 2025-08-27 17:02:23.783509.


KeyboardInterrupt: 