In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType, StringType
from functools import reduce

In [0]:
# Full ISD (fixed-width) data
ISD_ROOT = "s3://noaa-isd-pds/data" 

YEARS = ["2016"]

# KSFO – San Francisco International Airport
# (USAF 724940, WBAN 23234 => 724940-23234)
STATION_IDS = ["724940-23234"]

CATALOG = "energy_data_platform_project"
SCHEMA = "bronze"
BRONZE_TABLE = f"{CATALOG}.{SCHEMA}.noaa_isd_ksfo_bronze"

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

In [0]:
# ---- 1. Fixed-width parsing helpers ----

def scale_or_null(col, missing_values, scale=None):
    """
    Take a string column, null-out known missing codes, cast to DoubleType,
    optionally apply scaling (divide by scale).
    """
    c = F.trim(col)
    expr = F.when(c.isNull(), None)
    for mv in missing_values:
        expr = expr.when(c == mv, None)
    expr = expr.otherwise(c.cast(DoubleType()))
    if scale is not None:
        expr = expr / F.lit(scale)
    return expr


def parse_isd_record(df, year_value: str, source_path: str):
    """
    Parse a DataFrame with a single 'value' column (raw ISD line).

    Positions (1-based, inclusive) from ISD format doc:
      USAF:     5-10
      WBAN:     11-15
      DATE:     16-23  (YYYYMMDD)
      TIME:     24-27  (HHMM)
      LAT:      29-34  (deg * 1000, +99999 missing)
      LON:      35-41  (deg * 1000, +999999 missing)
      ELEV:     47-51  (meters, +9999 missing)
      WIND DIR: 61-63  (deg, 999 missing)
      WIND SPD: 66-69  (m/s * 10, 9999 missing)
      CEILING:  71-75  (m, 99999 missing)
      VIS:      79-84  (m, 999999 missing)
      TEMP:     88-92  (°C * 10, +9999 missing)
      DEW:      94-98  (°C * 10, +9999 missing)
    """
    v = F.col("value")

    usaf = F.substring(v, 5, 6)
    wban = F.substring(v, 11, 5)

    date_str = F.substring(v, 16, 8)   # YYYYMMDD
    time_str = F.substring(v, 24, 4)   # HHMM

    ts_str = F.concat_ws("", date_str, time_str)
    obs_time = F.to_timestamp(ts_str, "yyyyMMddHHmm")

    lat_raw = F.substring(v, 29, 6)
    lon_raw = F.substring(v, 35, 7)

    latitude = scale_or_null(lat_raw, ["+99999", "99999"], scale=1000.0)
    longitude = scale_or_null(lon_raw, ["+999999", "999999"], scale=1000.0)

    elev_raw = F.substring(v, 47, 5)
    elevation_m = scale_or_null(elev_raw, ["+9999", "9999"], scale=None)

    wind_dir_raw = F.substring(v, 61, 3)
    wind_dir_deg = scale_or_null(wind_dir_raw, ["999"], scale=None)

    wind_speed_raw = F.substring(v, 66, 4)
    wind_speed_ms = scale_or_null(wind_speed_raw, ["9999"], scale=10.0)

    ceiling_raw = F.substring(v, 71, 5)
    ceiling_m = scale_or_null(ceiling_raw, ["99999"], scale=None)

    vis_raw = F.substring(v, 79, 6)
    visibility_m = scale_or_null(vis_raw, ["999999"], scale=None)

    temp_raw = F.substring(v, 88, 5)
    dew_raw  = F.substring(v, 94, 5)

    temperature_c = scale_or_null(temp_raw, ["+9999", "9999"], scale=10.0)
    dewpoint_c    = scale_or_null(dew_raw,  ["+9999", "9999"], scale=10.0)

    df_out = (
        df
        .withColumn("usaf", usaf.cast(StringType()))
        .withColumn("wban", wban.cast(StringType()))
        .withColumn("station_id", F.concat_ws("-", "usaf", "wban"))
        .withColumn("obs_time", obs_time)
        .withColumn("obs_date", F.to_date("obs_time"))
        .withColumn("year", F.lit(year_value).cast("int"))
        .withColumn("latitude", latitude)
        .withColumn("longitude", longitude)
        .withColumn("elevation_m", elevation_m)
        .withColumn("wind_dir_deg", wind_dir_deg)
        .withColumn("wind_speed_ms", wind_speed_ms)
        .withColumn("ceiling_m", ceiling_m)
        .withColumn("visibility_m", visibility_m)
        .withColumn("temperature_c", temperature_c)
        .withColumn("dewpoint_c", dewpoint_c)
        .withColumn("source_file", F.lit(source_path))
        .withColumn("ingest_ts", F.current_timestamp())
        # leave pressure & precip NULL in Bronze; fill in Silver if desired
        .withColumn("pressure_hpa", F.lit(None).cast(DoubleType()))
        .withColumn("precip_mm", F.lit(None).cast(DoubleType()))
    )

    df_out = df_out.select(
        "station_id", "usaf", "wban",
        "obs_time", "obs_date", "year",
        "latitude", "longitude", "elevation_m",
        "temperature_c", "dewpoint_c", "pressure_hpa",
        "wind_speed_ms", "wind_dir_deg",
        "ceiling_m", "visibility_m",
        "precip_mm",
        "source_file", "ingest_ts",
        "value"  # raw record for Silver-level parsing
    )

    return df_out

In [0]:
# ---- 2. Enumerate ISD files like SMART-DS ----

all_dfs = []

for year in YEARS:
    year_root = f"{ISD_ROOT}/{year}"
    print(f"Listing ISD files under {year_root}")
    files = dbutils.fs.ls(year_root)

    for f in files:
        name = f.name  # e.g. "724940-23234-2016.gz"
        if not name.endswith(".gz"):
            continue

        # Station ID is the first two parts of the name: "724940-23234"
        station_prefix = "-".join(name.split("-")[:2])

        if station_prefix not in STATION_IDS:
            continue

        print(f"  Using file: {f.path}")
        df_raw = spark.read.text(f.path)
        df_file = parse_isd_record(df_raw, year_value=year, source_path=f.path)
        all_dfs.append(df_file)

print(f"Total ISD files loaded: {len(all_dfs)}")

if not all_dfs:
    raise Exception(
        f"No ISD files found for years={YEARS}, stations={STATION_IDS}"
    )


In [0]:
# ---- 3. Union into one Bronze DataFrame ----

bronze_df = reduce(
    lambda a, b: a.unionByName(b, allowMissingColumns=True),
    all_dfs
)

print("ISD Bronze row count:", bronze_df.count())
display(bronze_df.limit(5))


In [0]:
# ---- 4. Write Bronze table ----

(
    bronze_df
    .write
    .format("delta")
    .mode("overwrite")
    .partitionBy("year")
    .saveAsTable(BRONZE_TABLE)
)

display(spark.table(BRONZE_TABLE).limit(5))
