In [8]:
# ===================== Phase 0 — CONFIG & IMPORTS =====================

# Destination table in the Lakehouse
TARGET_TABLE = "tbl_bronze_weather_raw"

# Daily variables to be requested from Open-Meteo
DAILY_VARS = "temperature_2m_max,temperature_2m_min,precipitation_sum,wind_speed_10m_max"
TZ         = "Europe/Lisbon"

# HTTP / retries
HTTP_TIMEOUT = 300
MAX_WORKERS  = 2
RETRY_TRIES  = 6
RETRY_BASE_S = 1.0
JITTER_S     = 0.5

# Batching & logging
BATCH_SIZE     = 24
COOLDOWN_S     = 2
PROGRESS_EVERY = 5

# Precision for lat/lon (aligned with dim_location_silver)
DECIMAL_SCALE     = 5
# up to 3 integer digits (e.g. 180) + decimal places
DECIMAL_PRECISION = 3 + DECIMAL_SCALE

# Canonical column order before creating the Spark DataFrame
COL_ORDER = [
    "date",
    "temperature_2m_max",
    "temperature_2m_min",
    "precipitation_sum",
    "wind_speed_10m_max",
    "latitude",
    "longitude",
    "parish"
]

# --- imports ---
from pyspark.sql import functions as F, types as T
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests, pandas as pd, numpy as np, math, time, random
from requests.adapters import HTTPAdapter
try:
    from urllib3.util import Retry
except Exception:
    from urllib3.util.retry import Retry
from datetime import datetime


def ensure_col_order_and_types(pdf: pd.DataFrame) -> pd.DataFrame:
    """Ensure the DataFrame has all columns in COL_ORDER with proper basic types."""
    if "date" in pdf.columns:
        pdf["date"] = pd.to_datetime(pdf["date"], errors="coerce").dt.date

    for c in COL_ORDER:
        if c not in pdf.columns:
            pdf[c] = np.nan

    return pdf[COL_ORDER]


StatementMeta(, ec19c72b-9cbb-4050-9871-321cfb1811ea, 10, Finished, Available, Finished)

In [9]:
# ===================== Phase 1 — Date window (DIM_DATE) =====================
dim_date = (
    spark.read.table("dim_date_silver")
    .select(F.col("Registration Date").cast("date").alias("date"))
)

minmax = (
    dim_date
    .agg(F.min("date").alias("start"), F.max("date").alias("end"))
    .collect()[0]
)

if minmax["start"] is None or minmax["end"] is None:
    raise ValueError("dim_date_silver does not contain any values in 'Registration Date'.")

start_date = minmax["start"].strftime("%Y-%m-%d")
end_date   = minmax["end"].strftime("%Y-%m-%d")

N_DAYS = (
    datetime.strptime(end_date, "%Y-%m-%d")
    - datetime.strptime(start_date, "%Y-%m-%d")
).days + 1

print(f"Date window in dim_date_silver: {start_date} → {end_date} (inclusive) | N_DAYS={N_DAYS}")


StatementMeta(, ec19c72b-9cbb-4050-9871-321cfb1811ea, 11, Finished, Available, Finished)

Date window in dim_date_silver: 2020-01-01 → 2024-12-31 (inclusive) | N_DAYS=1827


In [10]:
# ===================== Phase 2 — Parish coordinates (centroids) =====================
dim_loc_raw = spark.read.table("dim_location_silver")

# Schema validation (1 row per parish with these fields)
cols = {field.name for field in dim_loc_raw.schema.fields}
required = {"Parish Name", "Latitude_Centroid", "Longitude_Centroid"}

if not required.issubset(cols):
    raise ValueError(
        "dim_location_silver must contain the columns: "
        "'Parish Name', 'Latitude_Centroid', 'Longitude_Centroid'."
    )

# Normalisation and typing
dl = (
    dim_loc_raw
    .select(
        F.upper(F.trim(F.col("Parish Name"))).alias("parish"),
        F.col("Latitude_Centroid").cast("double").alias("lat"),
        F.col("Longitude_Centroid").cast("double").alias("lon")
    )
    .dropna(subset=["parish", "lat", "lon"])
    .dropDuplicates(["parish"])
)

# DISTINCT list of parishes
TARGET_PARISHES = [
    row["parish"] for row in dl.select("parish").dropDuplicates().collect()
]
print(f"Detected parishes: {len(TARGET_PARISHES)}")

# Optional whitelist
# WHITELIST = ["ALVALADE", "ARROIOS", ...]
# TARGET_PARISHES = [
#     p for p in TARGET_PARISHES
#     if p in {x.upper().strip() for x in WHITELIST}
# ]
dl = dl.where(F.col("parish").isin(TARGET_PARISHES))

# Prepare list of coordinates
coords = [(row["lat"], row["lon"], row["parish"]) for row in dl.collect()]

if not coords:
    raise ValueError(
        "dim_location_silver returned no coordinates "
        "(check parish field names and values)."
    )

print(
    f"Parishes to process: {len(coords)} | "
    f"Expected rows ≈ {len(coords) * N_DAYS:,}"
)


StatementMeta(, ec19c72b-9cbb-4050-9871-321cfb1811ea, 12, Finished, Available, Finished)

Detected parishes: 24
Parishes to process: 24 | Expected rows ≈ 43,848


In [11]:
# ===================== Phase 3 — HTTP session + Open-Meteo fetch =====================
session = requests.Session()
retry_cfg = Retry(
    total=RETRY_TRIES,
    backoff_factor=RETRY_BASE_S,
    status_forcelist=[429, 500, 502, 503, 504],
    allowed_methods=["GET"],
    respect_retry_after_header=True
)
adapter = HTTPAdapter(
    max_retries=retry_cfg,
    pool_connections=MAX_WORKERS,
    pool_maxsize=MAX_WORKERS
)
session.mount("https://", adapter)
session.headers.update({"User-Agent": "fabric-notebook-open-meteo/1.0"})

BASE_URL = "https://archive-api.open-meteo.com/v1/archive"


def fetch_daily(lat: float, lon: float, parish: str) -> pd.DataFrame:
    """
    Calls Open-Meteo for a given (lat, lon) and returns a pandas DataFrame
    with columns in COL_ORDER.
    """
    if lat is None or lon is None or math.isnan(lat) or math.isnan(lon):
        raise ValueError("Invalid coordinates.")

    params = dict(
        latitude=lat,
        longitude=lon,
        daily=DAILY_VARS,
        start_date=start_date,
        end_date=end_date,
        timezone=TZ
    )

    for i in range(RETRY_TRIES):
        try:
            # Small random jitter to avoid thundering herd
            time.sleep(JITTER_S * random.random())

            r = session.get(BASE_URL, params=params, timeout=HTTP_TIMEOUT)

            # Handle rate limiting with respect to Retry-After header
            if r.status_code == 429:
                ra = r.headers.get("Retry-After")
                wait = (
                    int(ra) if ra and ra.isdigit()
                    else (RETRY_BASE_S * (2 ** i))
                ) + JITTER_S * random.random()
                time.sleep(wait)
                continue

            # Retry on 5xx
            if r.status_code >= 500:
                raise RuntimeError(f"HTTP {r.status_code}: {r.text[:300]}")

            # Treat any non-200 (other than 429 above) as an error
            if r.status_code != 200:
                raise RuntimeError(f"HTTP {r.status_code}: {r.text[:300]}")

            j = r.json()
            if "daily" not in j or "time" not in j["daily"]:
                raise RuntimeError("Response does not contain 'daily.time'.")

            d = j["daily"]
            pdf = pd.DataFrame(d)

            if pdf.empty:
                pdf = pd.DataFrame(columns=COL_ORDER)
            else:
                pdf["date"] = pd.to_datetime(pdf["time"], errors="coerce").dt.date
                pdf.drop(columns=["time"], inplace=True, errors="ignore")

                pdf["latitude"] = float(lat)
                pdf["longitude"] = float(lon)
                pdf["parish"] = parish

                for c in [
                    "temperature_2m_max",
                    "temperature_2m_min",
                    "precipitation_sum",
                    "wind_speed_10m_max"
                ]:
                    if c in pdf.columns:
                        pdf[c] = pd.to_numeric(pdf[c], errors="coerce")

            return ensure_col_order_and_types(pdf)

        except Exception as e:
            if i < RETRY_TRIES - 1:
                # Exponential backoff with jitter
                time.sleep(RETRY_BASE_S * (2 ** i) + JITTER_S * random.random())
            else:
                raise RuntimeError(
                    f"Open-Meteo request failed after {RETRY_TRIES} attempts | "
                    f"parish={parish}, lat={lat}, lon={lon}, "
                    f"start={start_date}, end={end_date} | error={e}"
                )


StatementMeta(, ec19c72b-9cbb-4050-9871-321cfb1811ea, 13, Finished, Available, Finished)

In [12]:
# ===================== Phase 4 — Batched execution with progress =====================
frames = []
total = len(coords)
completed = 0
rows_done = 0
expected_rows = total * N_DAYS

print(
    f"Start: {total} coordinates | threads={MAX_WORKERS} | "
    f"batch={BATCH_SIZE} | timeout={HTTP_TIMEOUT}s"
)
t0 = time.time()

for start in range(0, total, BATCH_SIZE):
    batch = coords[start:start + BATCH_SIZE]
    failures = []

    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as ex:
        futures = {
            ex.submit(fetch_daily, lat, lon, parish): (lat, lon, parish)
            for (lat, lon, parish) in batch
        }

        for fut in as_completed(futures):
            lat, lon, parish = futures[fut]
            try:
                pdf = fut.result()
                if not pdf.empty:
                    frames.append(pdf)
                    # Approximately N_DAYS rows per coordinate
                    rows_done += len(pdf)
            except Exception as e:
                failures.append((lat, lon, parish, str(e)))

            completed += 1
            if completed % PROGRESS_EVERY == 0:
                elapsed = int(time.time() - t0)
                pct_coords = 100.0 * completed / total
                pct_rows = (
                    100.0 * rows_done / expected_rows
                    if expected_rows
                    else 0.0
                )
                eta_s = int(
                    elapsed * (total / max(1, completed) - 1)
                )
                print(
                    f"[progress] coords {completed}/{total} "
                    f"({pct_coords:.1f}%) | "
                    f"rows {rows_done}/{expected_rows} "
                    f"({pct_rows:.1f}%) | "
                    f"batch_failures={len(failures)} | "
                    f"elapsed={elapsed}s | eta~{eta_s}s"
                )

    time.sleep(COOLDOWN_S)

if not frames:
    raise RuntimeError(
        "No data returned by Open-Meteo "
        "(check date window and coordinates)."
    )

all_pdf = pd.concat(frames, ignore_index=True)
print(f"Total records retrieved: {len(all_pdf):,}")


StatementMeta(, ec19c72b-9cbb-4050-9871-321cfb1811ea, 14, Finished, Available, Finished)

Start: 24 coordinates | threads=2 | batch=24 | timeout=300s
[progress] coords 5/24 (20.8%) | rows 9135/43848 (20.8%) | batch_failures=0 | elapsed=1s | eta~3s
[progress] coords 10/24 (41.7%) | rows 18270/43848 (41.7%) | batch_failures=0 | elapsed=2s | eta~2s
[progress] coords 15/24 (62.5%) | rows 27405/43848 (62.5%) | batch_failures=0 | elapsed=32s | eta~19s
[progress] coords 20/24 (83.3%) | rows 36540/43848 (83.3%) | batch_failures=0 | elapsed=33s | eta~6s
Total records retrieved: 43,848


In [13]:
# ===================== Phase 5 — FULL LOAD (staging, without surrogate keys) =====================
schema = T.StructType([
    T.StructField("date",               T.DateType(),   False),
    T.StructField("temperature_2m_max", T.DoubleType(), True),
    T.StructField("temperature_2m_min", T.DoubleType(), True),
    T.StructField("precipitation_sum",  T.DoubleType(), True),
    T.StructField("wind_speed_10m_max", T.DoubleType(), True),
    T.StructField("latitude",           T.DoubleType(), True),
    T.StructField("longitude",          T.DoubleType(), True),
    T.StructField("parish",             T.StringType(), True)
])

# Align columns and create base DataFrame
all_pdf = ensure_col_order_and_types(all_pdf.copy())

df_out = (
    spark.createDataFrame(all_pdf[COL_ORDER], schema=schema)
         .withColumn("parish", F.upper(F.trim(F.col("parish"))))
         # Store rounded lat/lon for audit/troubleshooting; they are not keys
         .withColumn(
             "latitude",
             F.round(F.col("latitude"), DECIMAL_SCALE).cast(
                 T.DecimalType(DECIMAL_PRECISION, DECIMAL_SCALE)
             )
         )
         .withColumn(
             "longitude",
             F.round(F.col("longitude"), DECIMAL_SCALE).cast(
                 T.DecimalType(DECIMAL_PRECISION, DECIMAL_SCALE)
             )
         )
         .dropna(subset=["date", "parish"])
         .dropDuplicates(["date", "parish"])
)

# Full overwrite (no partitions). Surrogate keys and joins are handled later in the Dataflow.
(df_out.write
     .format("delta")
     .mode("overwrite")
     .option("overwriteSchema", "true")
     .saveAsTable(TARGET_TABLE))

print(f"FULL LOAD completed: {TARGET_TABLE} (rows: {df_out.count():,})")

# Optional: logical clustering by date to speed up temporal filters
spark.sql(f"OPTIMIZE {TARGET_TABLE} ZORDER BY (date)")


StatementMeta(, ec19c72b-9cbb-4050-9871-321cfb1811ea, 15, Finished, Available, Finished)

FULL LOAD completed: tbl_bronze_weather_raw (rows: 43,848)


DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,numFilesUpdatedWithoutRewrite:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesUpdatedWithoutRewrite:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemovedBreakdown:array<struct<reason:string,metrics:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>>>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,