1. Imports and setup

In [1]:
# Databricks notebook source
import requests
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, lit

spark = SparkSession.builder.getOrCreate()

# === Configuration ===
API_URL   = "https://environment.data.gov.uk/flood-monitoring/id/floods"
CATALOG   = "flood_dev"
SCHEMA    = "bronze"
TABLE     = "floods_raw"
MAX_RECORDS = 100    # None for full ingestion, or an integer limit for testing

bronze_table = f"{CATALOG}.{SCHEMA}.{TABLE}"


PySparkRuntimeError: [CONNECT_URL_NOT_SET] Cannot create a Spark Connect session because the Spark Connect remote URL has not been set. Please define the remote URL by setting either the 'spark.remote' option or the 'SPARK_REMOTE' environment variable.

Fetch all (or limited) records with pagination

In [None]:
def fetch_flood_data(api_url: str, limit_per_page: int = 500, max_records: int | None = None):
    """
    Fetches flood-monitoring data from the UK Environment Agency API.
    Handles Hydra pagination and optional record limits.
    """
    url = f"{api_url}?_limit={limit_per_page}"
    all_items = []

    while url:
        resp = requests.get(url, timeout=30)
        resp.raise_for_status()
        data = resp.json()

        items = data.get("items", [])
        all_items.extend(items)

        # Respect optional record cap
        if max_records and len(all_items) >= max_records:
            all_items = all_items[:max_records]
            break

        # Hydra-style pagination
        next_link = None
        if "pagination" in data and "next" in data["pagination"]:
            next_link = data["pagination"]["next"]
        elif "@next" in data:
            next_link = data["@next"]

        url = next_link

    return all_items


records = fetch_flood_data(API_URL, max_records=MAX_RECORDS)
print(f"Fetched {len(records)} records from API")


Convert to Spark DataFrame and add metadata

In [None]:
if not records:
    raise ValueError("No data returned from API.")

df = spark.createDataFrame(records)

df = (
    df.withColumn("ingestion_time", current_timestamp())
      .withColumn("source_url", lit(API_URL))
      .withColumn("ingestion_id", lit(datetime.utcnow().strftime("%Y%m%d%H%M%S")))
)


Create and merge into Bronze Delta table

In [None]:
# Ensure the Bronze table exists and is CDF-enabled
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {bronze_table}
USING DELTA
TBLPROPERTIES (delta.enableChangeDataFeed = true)
AS SELECT * FROM VALUES (NULL) AS t(dummy) WHERE false
""")

# Register temp view for MERGE
df.createOrReplaceTempView("source")

merge_condition = "target.id = source.id"  # 'id' is stable in API records

spark.sql(f"""
MERGE INTO {bronze_table} AS target
USING source
ON {merge_condition}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

print(f"âœ… Bronze ingestion complete: {df.count()} records processed.")


Verify ingestion

In [None]:
display(spark.table(bronze_table).limit(10))