# Download Mobi Bike Share Data

- Mobi makes several years of trip data available at [on their website](https://www.mobibikes.ca/en/system-data).
- They also adhere to the [GBFS standard](INSERT APPROPRIATE LINK) and have an API available to retrieve live status about stations on the network.
- We will also scrape the [Mobi website](INSERT LINK) in case the data is useful to enrich our product later on.

In this notebook, we will:
1. Download and process available trip history (2018-2025, ~7.6M trips).
2. Create bronze tables for trips, stations and site data with minimal data maniupulation.
3. Examine the contents of the bronze tables and cleanse as appropriate to create silver tables.

Runs on Databricks Unity Catalog (Serverless or cluster).

In [0]:
%pip install requests pandas pyarrow beautifulsoup4 openpyxl mlflow markdownify loguru

%restart_python

In [None]:
# Notebook bootstrap: autoreload, src path, imports
%reload_ext autoreload
%autoreload 2

In [0]:
import sys
from pathlib import Path
from urllib.parse import urlparse

# Add the repository's src directory using a relative path
src_path = Path.cwd() / "src"
if str(src_path) not in sys.path:
    sys.path.insert(0, str(src_path))

import mlflow
import pandas as pd
import re
import requests
from bs4 import BeautifulSoup
from loguru import logger
from pyspark.sql import Window
from pyspark.sql import functions as F

from src.mobi import (
    BasicSiteScraper,
    combine_trip_data,
    download_all_trip_data,
    fetch_station_info_from_gbfs,
    fetch_station_status_from_gbfs,
    save_to_parquet,
)

CONFIG = mlflow.models.ModelConfig(development_config="config.yaml")

CATALOG = CONFIG.get("catalog")
SCHEMA = CONFIG.get("schema")
RAW_VOLUME = CONFIG.get("raw_data_vol")
SHUFFLE_PARTITIONS = CONFIG.get("shuffle_partitions")

logger.remove()
logger.add(
    sink=sys.stderr,
    level="INFO",
    format="{time:YYYY-MM-DD HH:mm:ss} | {level:<8} | {message}",
)
logger = logger.bind(notebook="01_data")

In [0]:
# Project config & UC paths

# Ensure catalog, schema, and volume exist; create if missing
# TODO: add check for catalog
spark.sql(f"CREATE SCHEMA IF NOT EXISTS `{CATALOG}`.`{SCHEMA}`")
spark.sql(f"CREATE VOLUME IF NOT EXISTS `{CATALOG}`.`{SCHEMA}`.`{RAW_VOLUME}`")

# UC Volume root (used by pandas and Spark)
volume_root = Path("/Volumes") / CATALOG / SCHEMA / RAW_VOLUME
trip_data_dir = volume_root / "trip_data"
site_data_dir = volume_root / "mobi_site"

# Spark tuning (shared across all writes)
spark.conf.set("spark.sql.shuffle.partitions", SHUFFLE_PARTITIONS)

logger.info("Using Unity Catalog volume storage")
logger.info(f"catalog.schema = {CATALOG}.{SCHEMA}")
logger.info(f"volume = {RAW_VOLUME}")
logger.info(f"spark.sql.shuffle.partitions = {SHUFFLE_PARTITIONS}")

## Download Trip Data
- Download the trip data to the UC volume you created and combine into a single file.
- We'll use this to create bronze tables.

In [0]:
# Download all files
raw_trip_dir = trip_data_dir / "raw"
logger.info(f"Downloading trip data to {raw_trip_dir}")
files = download_all_trip_data(raw_trip_dir)
logger.success(f"Downloaded {len(files)} files")

# Process and combine
logger.info("Combining downloaded trip extracts")
trips_df = combine_trip_data(files)
logger.success(f"Total trips processed: {len(trips_df):,}")

# Save
trips_output = trip_data_dir / "mobi_trips.parquet"
logger.info(f"Writing trips parquet to {trips_output}")
save_to_parquet(trips_df, trips_output)
logger.success("Trips parquet saved")

## Download Station Data

We now turn our attention to the station data using GBFS (General Bikeshare Feed Specification), the gold standard for bikeshare system info. ðŸš²

**Whatâ€™s happening here?**
- Weâ€™ll fetch live station details (think: locations, capacities, and real-time availability) straight from the GBFS feeds.
- By merging inventory and status, we get both *where* stations are and *whatâ€™s going on* at each one.
- We save it all into our Unity Catalog volume for downstream analytics magic.

**Data cleansing is a critical piece of any ML initiative - some of these files won't load as expected. It's up to you to decide if and what to do about that!**

_Hint: are we sure every file is a csv? ðŸ¤”_

In [0]:
# Fetch from GBFS API
logger.info("Fetching station inventory and status from GBFS")
stations = fetch_station_info_from_gbfs()
status = fetch_station_status_from_gbfs()

# Combine
stations = stations.merge(status, on="station_id", how="left")
logger.success(f"Merged station details: {len(stations)} rows")

# Save
stations_parquet = trip_data_dir / "mobi_stations.parquet"
stations_csv = trip_data_dir / "mobi_stations.csv"
logger.info(f"Writing stations parquet to {stations_parquet}")
save_to_parquet(stations, stations_parquet)
logger.info(f"Exporting stations CSV to {stations_csv}")
stations.to_csv(stations_csv, index=False)
logger.success("Station datasets updated")

## Scrape mobi Webiste
- Our users may want to ask detailed questions about how to use Mobi bike share. We can scrape their website to get the best and most current available info about the system.

In [0]:
# Configure scraper
BASE_URL = CONFIG.get("scrape_url")
START_URL = BASE_URL
SCRAPE_DELAY = float(CONFIG.get("scrape_delay"))
SCRAPE_MAX_DEPTH = int(CONFIG.get("scrape_max_depth"))

scraper = BasicSiteScraper(
    base_url=BASE_URL,
    delay=SCRAPE_DELAY,
    max_depth=SCRAPE_MAX_DEPTH,
)

logger.info(f"Scraping mobi site starting at {START_URL}")
pages = scraper.scrape_recursive(START_URL)
logger.success(f"Pages scraped: {len(pages)}")
preview_urls = list(pages.keys())[: min(10, len(pages))]
logger.info(f"Previewing first {len(preview_urls)} URLs: {preview_urls}")


In [0]:
# Save markdown content per page
raw_site_data_dir = site_data_dir / "raw"
raw_site_data_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Persisting raw site markdown to {raw_site_data_dir}")

def url_to_filename(url: str) -> str:
    parsed = urlparse(url)
    path = parsed.path.rstrip("/") or "index"
    safe = re.sub(r"[^a-zA-Z0-9._-]+", "_", path)
    if not safe.endswith(".md"):
        safe += ".md"
    return safe

count = 0
for url, data in pages.items():
    filepath = raw_site_data_dir / url_to_filename(url)
    filepath.write_text(data.get("content", ""), encoding="utf-8")
    count += 1

logger.success(f"Wrote {count} markdown files to {raw_site_data_dir}")

# Save to parquet
records = []
for url, data in pages.items():
    md = data.get("metadata", {})
    records.append({
        "url": url,
        "title": md.get("title", ""),
        "description": md.get("description", ""),
        "main_heading": md.get("main_heading", ""),
        "scraped_at": md.get("scraped_at", None),
        "status": data.get("status", ""),
        "content_md": data.get("content", ""),
    })

df = pd.DataFrame.from_records(records)
output_path = site_data_dir / "mobibikes_ca_content.parquet"
logger.info(f"Saving site content parquet to {output_path}")
save_to_parquet(df, output_path)
logger.success("Site content parquet saved")

## Load and Explore

In [0]:
# Load
trips = pd.read_parquet(trip_data_dir / "mobi_trips.parquet")
stations = pd.read_parquet(trip_data_dir / "mobi_stations.parquet")
mobi_site = pd.read_parquet(site_data_dir / "mobibikes_ca_content.parquet")

logger.success(f"Trips loaded: {len(trips):,}")
logger.success(f"Stations loaded: {len(stations)}")
logger.success(f"Site pages available: {len(mobi_site)}")

## Create Bronze Tables

In [0]:
trips_pq = str(trip_data_dir / "mobi_trips.parquet")
stations_pq = str(trip_data_dir / "mobi_stations.parquet")
site_pq = str(site_data_dir / "mobibikes_ca_content.parquet")

(
    spark.read.parquet(trips_pq)
    .write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", True)
    .saveAsTable(f'`{CATALOG}`.`{SCHEMA}`.`bronze_trips`')
)

(
    spark.read.parquet(stations_pq)
    .write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", True)
    .saveAsTable(f'`{CATALOG}`.`{SCHEMA}`.`bronze_stations`')
)

(
    spark.read.parquet(site_pq)
    .write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", True)
    .saveAsTable(f'`{CATALOG}`.`{SCHEMA}`.`bronze_site`')
)

## Create Silver Trips

Our bronze trips table mirrors the raw CSVs, so it still contains duplicate columns, misspellings, and inconsistent station labels. In the next steps we will transform that raw feed into a clean silver layer that downstream analysts can join to stations and sites without extra work.

Key goals for the silver pipeline:
- **Normalize columns** â€“ coalesce duplicate names (for example `membership_type` and `memebership_type`), convert metrics to numeric types, and ensure timestamps are cast correctly.
- **Derive trusted flags** â€“ consolidate the electric-bike indicators into a single boolean field.
- **Standardize station names** â€“ strip numeric prefixes, collapse whitespace, and attach station IDs for both departure and return locations.
- **Assign primary keys** â€“ create a deterministic `trip_id` window function so every trip row has a unique identifier.

!!SCREENSHOT: BRONZE TRIPS SAMPLE (OPTIONAL)!!

> **Reminder:** Each transformation cell that follows displays a sample of the intermediate DataFrame so you can confirm the changes before writing the silver tables.

In [None]:
# Inspect bronze tables before silver transforms

def summarize_columns(df: pd.DataFrame, label: str) -> pd.DataFrame:
    """Return a sorted DataFrame detailing column names, dtypes, and null counts."""
    summary = pd.DataFrame(
        {
            "column": df.columns,
            "dtype": df.dtypes.astype(str),
            "null_count": df.isna().sum(),
        }
    )
    logger.info(f"{label} has {len(df):,} rows")
    return summary.sort_values("column").reset_index(drop=True)

bronze_trips_summary = summarize_columns(trips, "bronze_trips")
bronze_stations_summary = summarize_columns(stations, "bronze_stations")
bronze_site_summary = summarize_columns(mobi_site, "bronze_site")

display(bronze_trips_summary)
display(bronze_stations_summary)
display(bronze_site_summary)


In [None]:
# Helper utilities for silver preparation

def clean_station_label_col(column: F.Column) -> F.Column:
    """Return a Spark column with 4-digit numeric prefixes removed and whitespace normalized."""
    return F.regexp_replace(
        F.regexp_replace(F.trim(column), r"^\s*\d{4}[\s\-:]*", ""), r"\s+", " "
    )


def normalize_station_key_col(column: F.Column) -> F.Column:
    """Create a lowercase, single-space station key suitable for joining."""
    return F.lower(F.regexp_replace(F.trim(column), r"\s+", " "))


def coerce_flag_col(column: F.Column) -> F.Column:
    """Convert yes/no style strings to a nullable boolean Spark column.

    TODO: Extend with additional localized values if they appear in new extracts.
    """
    normalized = F.lower(F.trim(column))
    truthy = ["yes", "true", "1", "y"]
    falsy = ["no", "false", "0", "n"]
    return (
        F.when(normalized.isin(*truthy), F.lit(True))
        .when(normalized.isin(*falsy), F.lit(False))
        .otherwise(F.lit(None))
    )


In [None]:
# Prepare silver_stations DataFrame using Spark
bronze_stations_sdf = spark.table(f'`{CATALOG}`.`{SCHEMA}`.`bronze_stations`')

silver_stations = (
    bronze_stations_sdf
    .withColumn("station_name", clean_station_label_col(F.col("name")))
    .withColumn("station_key", normalize_station_key_col(F.col("name")))
    .withColumn("station_pk", F.col("station_id"))
    .dropDuplicates(["station_id"])
)
# TODO: capture additional station metadata (e.g., capacity) if required downstream.
logger.success(f"Prepared silver_stations with {silver_stations.count()} rows")
display(silver_stations.limit(5))


In [None]:
# Load bronze trips as the starting point for the silver transformation
bronze_trips_sdf = spark.table(f'`{CATALOG}`.`{SCHEMA}`.`bronze_trips`')
silver_trips = bronze_trips_sdf
logger.info(f"Loaded bronze_trips rows: {silver_trips.count()}")
display(silver_trips.limit(5))

In [None]:
# Step 1: Coalesce duplicate membership columns
if "memebership_type" in bronze_trips_sdf.columns:
    if "membership_type" in silver_trips.columns:
        silver_trips = silver_trips.withColumn(
            "membership_type",
            F.coalesce(F.col("membership_type"), F.col("memebership_type")),
        )
    else:
        silver_trips = silver_trips.withColumn("membership_type", F.col("memebership_type"))
    silver_trips = silver_trips.drop("memebership_type")

logger.info(f"After membership cleanup: {silver_trips.count()} rows")
display(silver_trips.select("membership_type").limit(5))


In [None]:
# Step 2: Normalize stopover duration values
if "stopover_duration" in silver_trips.columns:
    if "stopover_duration_sec" in silver_trips.columns:
        silver_trips = silver_trips.withColumn(
            "stopover_duration_sec",
            F.coalesce(F.col("stopover_duration_sec"), F.col("stopover_duration")),
        )
    else:
        silver_trips = silver_trips.withColumn(
            "stopover_duration_sec", F.col("stopover_duration").cast("double")
        )
    silver_trips = silver_trips.drop("stopover_duration")

logger.info(f"After stopover normalization: {silver_trips.count()} rows")
display(silver_trips.select("stopover_duration_sec").limit(5))


In [None]:
# Step 3: Apply numeric casts and derived distance
numeric_casts = {
    "bike_id": "bigint",
    "covered_distance_m": "double",
    "duration_sec": "double",
    "stopover_duration_sec": "double",
    "number_of_stopovers": "double",
    "departure_slot": "double",
    "return_slot": "double",
    "lock_duration_sec": "double",
    "number_of_bike_locks": "double",
    "number_of_bike_stopovers": "double",
    "departure_battery_voltage_mv": "double",
    "return_battery_voltage_mv": "double",
}
for column, dtype in numeric_casts.items():
    if column in silver_trips.columns:
        silver_trips = silver_trips.withColumn(column, F.col(column).cast(dtype))

if "covered_distance_m" in silver_trips.columns:
    silver_trips = silver_trips.withColumn(
        "covered_distance_km", F.col("covered_distance_m") / F.lit(1000.0)
    )

logger.info(f"After numeric coercion: {silver_trips.count()} rows")
display(silver_trips.select(*[c for c in ["covered_distance_m", "covered_distance_km", "duration_sec"] if c in silver_trips.columns]).limit(5))


In [None]:
# Step 4: Cast departure and return timestamps (source data already hourly)
# Handle slash and dash formats plus minute-only timestamps without seconds

from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType
import pandas as pd
from pyspark.sql.functions import pandas_udf

# Pandas UDF fallback
@pandas_udf(TimestampType())
def infer_ts(col: pd.Series) -> pd.Series:
    """Fallback parser using pandas.to_datetime() with inference."""
    return pd.to_datetime(col, errors="coerce", infer_datetime_format=True)

# Fast native parse
def fast_timestamp(col):
    """Lightweight parser for most rows."""
    c = F.trim(F.col(col))
    c = F.regexp_replace(c, r"[./]", "-")  # normalize separators
    # reorder M-D-Y -> Y-M-D
    c = F.regexp_replace(c, r"^([0-9]{1,2})-([0-9]{1,2})-([0-9]{4})", r"\3-\1-\2")
    return F.try_to_timestamp(c)

for raw, target in [("departure", "departure_time"), ("return", "return_time")]:
    if raw in silver_trips.columns:
        # fast native parse
        silver_trips = silver_trips.withColumn(target, fast_timestamp(raw))
        # pandas fallback only for nulls
        silver_trips = silver_trips.withColumn(
            target,
            F.when(F.col(target).isNull(), infer_ts(F.col(raw))).otherwise(F.col(target))
        )
        silver_trips = silver_trips.drop(raw)

for colname in ["departure_time", "return_time"]:
    if colname in silver_trips.columns:
        silver_trips = silver_trips.withColumn(colname, fast_timestamp(colname))
        silver_trips = silver_trips.withColumn(
            colname,
            F.when(F.col(colname).isNull(), infer_ts(F.col(colname))).otherwise(F.col(colname))
        )

# Add date based partitioning columns for later
if "departure_time" in silver_trips.columns:
    silver_trips = (
        silver_trips
        .withColumn("departure_year",  F.year("departure_time"))
        .withColumn("departure_month", F.month("departure_time"))
    )

display(silver_trips.select("departure_time", "return_time", "departure_year", "departure_month").limit(5))

In [None]:
# Step 5: Consolidate electric bike indicators (coalesce known boolean columns)
flag_columns = [c for c in ["electric_bike", "electric"] if c in silver_trips.columns]

if flag_columns:
    silver_trips = silver_trips.withColumn(
        "is_electric_bike",
        F.coalesce(*[F.col(c).cast("boolean") for c in flag_columns]),
    )
else:
    silver_trips = silver_trips.withColumn("is_electric_bike", F.lit(None).cast("boolean"))

for column in flag_columns:
    silver_trips = silver_trips.drop(column)

logger.info(f"After electric flag normalization: {silver_trips.count()} rows")
display(silver_trips.select("is_electric_bike").limit(5))


In [None]:
# Step 6: Clean station names (strip numeric prefixes, collapse whitespace)
for prefix in ["departure", "return"]:
    raw_column = f"{prefix}_station"
    if raw_column in silver_trips.columns:
        cleaned = clean_station_label_col(F.col(raw_column))
        silver_trips = silver_trips.withColumn(f"{prefix}_station_name", cleaned)
        silver_trips = silver_trips.drop(raw_column)

logger.info(f"After station name cleanup: {silver_trips.count()} rows")
display(
    silver_trips.select(
        *[
            c
            for c in [
                "departure_station_name",
                "return_station_name",
            ]
            if c in silver_trips.columns
        ]
    ).limit(5)
)


In [None]:
# Step 7: Attach station IDs using normalized names
dep_lookup = silver_stations.select(
    F.col("station_key").alias("dep_station_key"),
    F.col("station_id").alias("departure_station_id"),
)
ret_lookup = silver_stations.select(
    F.col("station_key").alias("ret_station_key"),
    F.col("station_id").alias("return_station_id"),
)

silver_trips = silver_trips.join(
    dep_lookup,
    normalize_station_key_col(F.col("departure_station_name")) == dep_lookup["dep_station_key"],
    "left",
).drop("dep_station_key")

silver_trips = silver_trips.join(
    ret_lookup,
    normalize_station_key_col(F.col("return_station_name")) == ret_lookup["ret_station_key"],
    "left",
).drop("ret_station_key")

logger.info(f"After station ID join: {silver_trips.count()} rows")
display(
    silver_trips.select(
        *[
            c
            for c in [
                "departure_station_name",
                "departure_station_id",
                "return_station_name",
                "return_station_id",
            ]
            if c in silver_trips.columns
        ]
    ).limit(5)
)


In [None]:
# Step 8: Finalize trip_id and column ordering (simple fast version)

# Generate a unique ID per row
silver_trips = silver_trips.withColumn("trip_id", F.monotonically_increasing_id())

# Preserve original column ordering logic
trip_columns_order = [
    "trip_id",
    "departure_time",
    "return_time",
    "bike",
    "is_electric_bike",
    "membership_type",
    "covered_distance_m",
    "covered_distance_km",
    "duration_sec",
    "stopover_duration_sec",
    "number_of_stopovers",
    "number_of_bike_stopovers",
    "departure_slot",
    "return_slot",
    "lock_duration_sec",
    "number_of_bike_locks",
    "departure_temperature_c",
    "return_temperature_c",
    "departure_battery_voltage_mv",
    "return_battery_voltage_mv",
    "departure_station_name",
    "departure_station_id",
    "return_station_name",
    "return_station_id",
    "source_file",
    "formula",
    "manager",
]

existing_trip_columns = [c for c in trip_columns_order if c in silver_trips.columns]
remaining_columns = [c for c in silver_trips.columns if c not in existing_trip_columns]
silver_trips = silver_trips.select(*(existing_trip_columns + remaining_columns))

logger.success(f"Prepared silver_trips with {silver_trips.count()} rows")
display(silver_trips.limit(5))


In [None]:
# Prepare silver_site DataFrame and assign primary key
bronze_site_sdf = spark.table(f'`{CATALOG}`.`{SCHEMA}`.`bronze_site`')

site_window = Window.orderBy("url")
silver_site = (
    bronze_site_sdf
    .withColumn("scraped_at", F.to_timestamp("scraped_at"))
    .dropDuplicates(["url"])
    .withColumn("site_page_id", F.row_number().over(site_window).cast("bigint"))
)
# TODO: revisit primary key strategy if additional natural keys become available.

logger.success(f"Prepared silver_site with {silver_site.count()} pages")
display(silver_site.limit(5))


In [None]:
# Persist silver tables to Unity Catalog Delta

logger.info("Repartitioning silver_trips by year and month")
silver_trips = silver_trips.repartition("departure_year", "departure_month")

# Drop existing tables (if any) to avoid slow schema reconciliation
for t in ["silver_trips", "silver_stations", "silver_site"]:
    spark.sql(f"DROP TABLE IF EXISTS `{CATALOG}`.`{SCHEMA}`.`{t}`")

logger.info(f"Writing silver_trips to `{CATALOG}`.`{SCHEMA}`.silver_trips")
(
    silver_trips.write
    .format("delta")
    .partitionBy("departure_year", "departure_month")
    .saveAsTable(f"`{CATALOG}`.`{SCHEMA}`.`silver_trips`")
)

logger.info(f"Writing silver_stations to `{CATALOG}`.`{SCHEMA}`.silver_stations")
(
    silver_stations.write
    .format("delta")
    .saveAsTable(f"`{CATALOG}`.`{SCHEMA}`.`silver_stations`")
)

logger.info(f"Writing silver_site to `{CATALOG}`.`{SCHEMA}`.silver_site")
(
    silver_site.write
    .format("delta")
    .saveAsTable(f"`{CATALOG}`.`{SCHEMA}`.`silver_site`")
)

logger.success("Silver tables materialized")

silver_trips.printSchema()
silver_stations.printSchema()
silver_site.printSchema()