# **Silver Layer**

In the Silver layer we transform the raw Bronze records into cleaned, daily station-level climate data and structured metadata tables.

**Key steps:**
- Read all Bronze Parquet data and restrict to years `2010–2025`.
- Switch to RDD-based processing and map each record to a simpler structure
- Apply quality filtering:
  - keep only records with empty quality flag (`qflag == ""`);
  - keep only valid/empty ingestion status (`status in ["", "valid"]`);
  - drop missing/sentinel values (`value is not None` and `value != -9999`);
  - keep only the three core elements: **TMAX**, **TMIN**, **PRCP**.
- Deduplicate by `(id, date, element)` and convert raw units to SI:
  - temperatures to °C (`tmax_c`, `tmin_c`, derived `tavg_c`),
  - precipitation to millimetres (`prcp_mm`).
- Aggregate to one row per station–day `(id, date, year)` with:
  - `tmax_c`, `tmin_c`, `tavg_c`, `prcp_mm`.
- Create a Silver DataFrame, parse the date, and add convenience fields:
  - `date` (as `yyyy-MM-dd`), `month`, `day`, `year`.
- Write the Silver fact table as Parquet, partitioned by `year`, to `SILVER_PATH`

**MetaData:**
- Parse `ghcnd-stations.txt` (fixed-width) into a cleaned **stations** table
- Parse `ghcnd-inventory.txt` (fixed-width) into an **inventory** table
- Derive a small **coverage** helper table summarizing the min/max years
  of TMAX/TMIN/PRCP coverage per station.
- Store all metadata as Parquet under `silver_meta`
  (`stations.parquet`, `inventory.parquet`, `coverage.parquet`).

## 01. Spark Session

In [12]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("GHCN-Silver-RDD").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

print("Spark version:", spark.version)

Spark version: 3.5.0


## 02. Data Paths and Setup

In [None]:
# Data paths
BRONZE_PATH = "/home/ubuntu/spark-notebooks/project/data/bronze" 
SILVER_PATH = "/home/ubuntu/spark-notebooks/project/data/silver" 

# Year bounds for faster iteration
YEAR_MIN = 2010
YEAR_MAX = 2025

# Elements to keep in first pass
ELEMENTS = {"TMAX", "TMIN", "PRCP"}

VERBOSE_SAMPLES = 5

## 03. Read Bronze and switch to RDD

In [23]:
# Checking bronze data
df_bronze = spark.read.parquet("/home/ubuntu/spark-notebooks/project/data/bronze")
print("Columns:", df_bronze.columns)
df_bronze.select("year").distinct().orderBy("year").show(100)

Columns: ['station', 'date_str', 'element', 'raw_value', 'mflag', 'qflag', 'sflag', 'obstime', '_ingestion_timestamp', '_source', '_status', '_raw_data', 'year']
+----+
|year|
+----+
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
|2017|
|2018|
|2019|
|2020|
|2021|
|2022|
|2023|
|2024|
|2025|
+----+



                                                                                

In [24]:
from pyspark.sql import functions as F

# Read Bronze (partitioned by year). We filter by year on the DF read,
# but we won't use DF operations for cleaning itself.
df_bronze = spark.read.format("parquet").load(BRONZE_PATH)

# Using 'year' column to prune the read for faster iteration.
if "year" in df_bronze.columns:
    df_bronze = df_bronze.where((F.col("year") >= YEAR_MIN) & (F.col("year") <= YEAR_MAX))

print("Bronze columns:", df_bronze.columns)
print("Bronze count:", df_bronze.count())
df_bronze.printSchema()

Bronze columns: ['station', 'date_str', 'element', 'raw_value', 'mflag', 'qflag', 'sflag', 'obstime', '_ingestion_timestamp', '_source', '_status', '_raw_data', 'year']
Bronze count: 586514274
root
 |-- station: string (nullable = true)
 |-- date_str: string (nullable = true)
 |-- element: string (nullable = true)
 |-- raw_value: string (nullable = true)
 |-- mflag: string (nullable = true)
 |-- qflag: string (nullable = true)
 |-- sflag: string (nullable = true)
 |-- obstime: string (nullable = true)
 |-- _ingestion_timestamp: double (nullable = true)
 |-- _source: string (nullable = true)
 |-- _status: string (nullable = true)
 |-- _raw_data: string (nullable = true)
 |-- year: integer (nullable = true)



In [17]:
# Switch to RDD for all cleaning
rdd_raw = df_bronze.rdd
print("First raw record:", rdd_raw.first().asDict())

First raw record: {'station': 'US1KSDP0016', 'date_str': '20180917', 'element': 'SNOW', 'raw_value': '0', 'mflag': None, 'qflag': None, 'sflag': 'N', 'obstime': '0800', '_ingestion_timestamp': 1762903015.8262305, '_source': 'ghcn_txt', '_status': 'valid', '_raw_data': None, 'year': 2018}


## 04. RDD Cleaning (MapReduce style)

In [18]:
from typing import Dict, Any

def to_int_safe(x):
    try:
        return int(x)
    except:
        return None

def to_rec(row) -> Dict[str, Any]:
    d = row.asDict()
    return {
        "id": d.get("station"),                 
        "date": d.get("date_str"),              
        "element": d.get("element"),
        "value": to_int_safe(d.get("raw_value")), 
        "qflag": (d.get("qflag") or ""),        
        "year": d.get("year"),
        "status": (d.get("_status") or "")      
    }

# Map to simple dicts
rdd1 = rdd_raw.map(to_rec)

# Metrics pre-clean (RDD-only)
total_in = rdd1.count()
elem_counts_pre = (rdd1.map(lambda r: (r["element"], 1)).reduceByKey(lambda a,b: a+b).collect())
print(f"Total rows (bounded): {total_in}")
print("Elements present (pre-clean):", sorted(elem_counts_pre))

# Cleaning
rdd_clean = (
    rdd1
    # keep QA-passed
    .filter(lambda r: r["qflag"] == "")
    # keep valid ingestion status if present
    .filter(lambda r: (r["status"] in ("", "valid")))
    # drop sentinel/missing
    .filter(lambda r: (r["value"] is not None and r["value"] != -9999))
    # keep the three core elements
    .filter(lambda r: r["element"] in ELEMENTS)
)

# Deduplicate (id, date, element)
rdd_unique = (
    rdd_clean
    .map(lambda r: ((r["id"], r["date"], r["element"]), r))
    .reduceByKey(lambda a, b: a)   # keep first
    .map(lambda kv: kv[1])
)

# Unit conversion (SI): /10 for TMAX/TMIN/PRCP
rdd_si = rdd_unique.map(lambda r: {**r, "value_si": r["value"] / 10.0})

kept = rdd_si.count()
print(f"Kept after QA+status+missing+element filters: {kept} ({kept/total_in*100:.1f}%)")

# Show few examples
for rec in rdd_si.take(VERBOSE_SAMPLES):
    print(rec)


                                                                                

Total rows (bounded): 586514274
Elements present (pre-clean): [('ADPT', 2484948), ('ASLP', 2480218), ('ASTP', 2480218), ('AWBT', 2484948), ('AWDR', 140734), ('AWND', 6236152), ('DAEV', 7), ('DAPR', 1572877), ('DASF', 314), ('DATN', 42209), ('DATX', 40836), ('DAWM', 2), ('DWPR', 124727), ('EVAP', 770845), ('FMTM', 333968), ('MDEV', 5), ('MDPR', 1574733), ('MDSF', 1662), ('MDTN', 42208), ('MDTX', 40836), ('MDWM', 2), ('MNPN', 362678), ('MXPN', 365989), ('PGTM', 2014083), ('PRCP', 170414823), ('PSUN', 18956), ('RHAV', 2496957), ('RHMN', 2498781), ('RHMX', 2498781), ('SN02', 3389), ('SN03', 6), ('SN11', 351), ('SN12', 6693), ('SN13', 1087), ('SN14', 396), ('SN21', 30), ('SN22', 30), ('SN23', 30), ('SN31', 104928), ('SN32', 844656), ('SN33', 78544), ('SN34', 1173), ('SN35', 21705), ('SN36', 9729), ('SN51', 32024), ('SN52', 341801), ('SN53', 42189), ('SN54', 982), ('SN55', 16818), ('SN56', 6628), ('SN57', 31), ('SNOW', 73667319), ('SNWD', 49482209), ('SX02', 3330), ('SX03', 6), ('SX11', 351)

                                                                                

Kept after QA+status+missing+element filters: 321200037 (54.8%)


[Stage 70:>                                                         (0 + 1) / 1]

{'id': 'ASN00091041', 'date': '20180918', 'element': 'PRCP', 'value': 60, 'qflag': '', 'year': 2018, 'status': 'valid', 'value_si': 6.0}
{'id': 'US1PABD0006', 'date': '20181024', 'element': 'PRCP', 'value': 0, 'qflag': '', 'year': 2018, 'status': 'valid', 'value_si': 0.0}
{'id': 'USW00024284', 'date': '20180918', 'element': 'PRCP', 'value': 0, 'qflag': '', 'year': 2018, 'status': 'valid', 'value_si': 0.0}
{'id': 'USS0020L10S', 'date': '20181025', 'element': 'TMIN', 'value': -11, 'qflag': '', 'year': 2018, 'status': 'valid', 'value_si': -1.1}
{'id': 'SPE00156180', 'date': '20181025', 'element': 'TMIN', 'value': 115, 'qflag': '', 'year': 2018, 'status': 'valid', 'value_si': 11.5}


                                                                                

## 05. Combine to daily rows

In [19]:

# Combine to one row per (id, date, year)
rdd_by_day = (
    rdd_si
    .map(lambda r: ((r["id"], r["date"], r["year"]), {r["element"]: r["value_si"]}))
    .reduceByKey(lambda a,b: {**a, **b})
)

def to_daily(kv):
    (id_, date, year) = kv[0]
    bag = kv[1]
    tmax = bag.get("TMAX")
    tmin = bag.get("TMIN")
    prcp = bag.get("PRCP")
    tavg = (tmax + tmin)/2.0 if (tmax is not None and tmin is not None) else None
    return (id_, date, year, tmax, tmin, tavg, prcp)

rdd_daily = rdd_by_day.map(to_daily)

print("Sample daily rows:")
for row in rdd_daily.take(VERBOSE_SAMPLES):
    print(row)
print("Daily row count:", rdd_daily.count())


Sample daily rows:


                                                                                

('GME00130738', '20210719', 2021, 24.2, 11.1, 17.65, 0.3)
('RMW00040604', '20230521', 2023, 31.1, 26.7, 28.9, 4.1)
('USS0020H12S', '20211103', 2021, 10.9, 2.3, 6.6, 0.0)
('CA003073730', '20140916', 2014, 11.1, 5.5, 8.3, 0.0)
('US1ILDP0117', '20210523', 2021, None, None, None, 0.0)




Daily row count: 185907675


                                                                                

## 06. Convert to DataFrame (write only) and Save Silver

In [20]:

from pyspark.sql import types as T

schema = T.StructType([
    T.StructField("id", T.StringType(), False),
    T.StructField("yyyymmdd", T.StringType(), False),
    T.StructField("year", T.IntegerType(), True),
    T.StructField("tmax_c", T.DoubleType(), True),
    T.StructField("tmin_c", T.DoubleType(), True),
    T.StructField("tavg_c", T.DoubleType(), True),
    T.StructField("prcp_mm", T.DoubleType(), True),
])

df_silver = spark.createDataFrame(rdd_daily, schema)

from pyspark.sql import functions as F

# Parse date & add convenience fields
df_silver = (df_silver
    .withColumn("date", F.to_date(F.col("yyyymmdd"), "yyyyMMdd"))
    .withColumn("month", F.month("date"))
    .withColumn("day", F.dayofmonth("date"))
    .drop("yyyymmdd")
)

# Write Silver (Parquet is fine; switch to 'delta' if you prefer)
(df_silver
 .write
 .mode("overwrite")
 .partitionBy("year")
 .format("parquet")
 .save(SILVER_PATH))

print("Silver written to:", SILVER_PATH)


25/11/12 13:53:02 WARN MemoryManager: Total allocation exceeds 95.00% (994,652,967 bytes) of heap memory
Scaling row group sizes to 92.63% for 8 writers
25/11/12 13:53:02 WARN MemoryManager: Total allocation exceeds 95.00% (994,652,967 bytes) of heap memory
Scaling row group sizes to 92.63% for 8 writers
25/11/12 13:53:02 WARN MemoryManager: Total allocation exceeds 95.00% (994,652,967 bytes) of heap memory
Scaling row group sizes to 92.63% for 8 writers
25/11/12 13:53:02 WARN MemoryManager: Total allocation exceeds 95.00% (994,652,967 bytes) of heap memory
Scaling row group sizes to 92.63% for 8 writers
25/11/12 13:53:02 WARN MemoryManager: Total allocation exceeds 95.00% (994,652,967 bytes) of heap memory
Scaling row group sizes to 92.63% for 8 writers
25/11/12 13:53:02 WARN MemoryManager: Total allocation exceeds 95.00% (994,652,967 bytes) of heap memory
Scaling row group sizes to 92.63% for 8 writers
25/11/12 13:53:02 WARN MemoryManager: Total allocation exceeds 95.00% (994,652,967

Silver written to: /home/ubuntu/spark-notebooks/project/data/silver


## 07. Validation & sanity checks

In [21]:

# Read back and verify partitioning & counts
df_check = spark.read.parquet(SILVER_PATH)

print("Silver partitions (years):")
if "year" in df_check.columns:
    df_check.select("year").distinct().orderBy("year").show(40, truncate=False)

print("Silver schema:")
df_check.printSchema()

print("Row count:", df_check.count())

print("Preview:")
df_check.orderBy("date").show(10, truncate=False)


Silver partitions (years):
+----+
|year|
+----+
|2010|
|2011|
|2012|
|2013|
|2014|
|2015|
|2016|
|2017|
|2018|
|2019|
|2020|
|2021|
|2022|
|2023|
|2024|
|2025|
+----+

Silver schema:
root
 |-- id: string (nullable = true)
 |-- tmax_c: double (nullable = true)
 |-- tmin_c: double (nullable = true)
 |-- tavg_c: double (nullable = true)
 |-- prcp_mm: double (nullable = true)
 |-- date: date (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- year: integer (nullable = true)

Row count: 185907675
Preview:




+-----------+------+------+------------------+-------+----------+-----+---+----+
|id         |tmax_c|tmin_c|tavg_c            |prcp_mm|date      |month|day|year|
+-----------+------+------+------------------+-------+----------+-----+---+----+
|USC00262497|11.1  |0.0   |5.55              |0.0    |2010-01-01|1    |1  |2010|
|SWE00139202|NULL  |NULL  |NULL              |4.1    |2010-01-01|1    |1  |2010|
|US1INWK0005|NULL  |NULL  |NULL              |0.0    |2010-01-01|1    |1  |2010|
|US1NDCS0018|NULL  |NULL  |NULL              |0.0    |2010-01-01|1    |1  |2010|
|US1COEG0025|NULL  |NULL  |NULL              |0.0    |2010-01-01|1    |1  |2010|
|MXN00024111|25.6  |5.6   |15.600000000000001|0.0    |2010-01-01|1    |1  |2010|
|TX000038392|6.8   |NULL  |NULL              |NULL   |2010-01-01|1    |1  |2010|
|US1UTSL0006|NULL  |NULL  |NULL              |0.3    |2010-01-01|1    |1  |2010|
|USC00422578|-6.1  |-15.0 |-10.55            |0.0    |2010-01-01|1    |1  |2010|
|FIE00146688|-14.6 |-24.4 |-

                                                                                

## 08. MetaData

In [26]:
# === Silver: parse & persist NOAA metadata (stations + inventory) ===
import os
from pyspark.sql import functions as F, types as T

STATIONS_TXT  = "/home/ubuntu/spark-notebooks/project/data/meta/ghcnd-stations.txt"
INVENTORY_TXT = "/home/ubuntu/spark-notebooks/project/data/meta/ghcnd-inventory.txt"

OUT_DIR = "/home/ubuntu/spark-notebooks/project/data/silver_meta"
STATIONS_PQ  = f"{OUT_DIR}/stations.parquet"
INVENTORY_PQ = f"{OUT_DIR}/inventory.parquet"
COVERAGE_PQ  = f"{OUT_DIR}/coverage.parquet"

os.makedirs(OUT_DIR, exist_ok=True)

# --- Parse ghcnd-stations.txt (fixed width) ---
stn_schema = T.StructType([
    T.StructField("id",    T.StringType(), False),
    T.StructField("lat",   T.DoubleType(), True),
    T.StructField("lon",   T.DoubleType(), True),
    T.StructField("elev",  T.DoubleType(), True),
    T.StructField("state", T.StringType(), True),
    T.StructField("name",  T.StringType(), True),
    T.StructField("gsn",   T.StringType(), True),
    T.StructField("hcn_crn", T.StringType(), True),
    T.StructField("wmo",   T.StringType(), True),
])

stn_rdd = (spark.read.text(STATIONS_TXT).rdd
    .map(lambda r: r[0])
    .filter(lambda s: len(s) >= 85)
    .map(lambda s: (
        s[0:11].strip(),
        float(s[12:20].strip() or "nan"),
        float(s[21:30].strip() or "nan"),
        float(s[31:37].strip() or "nan"),
        s[38:40].strip() or None,
        s[41:71].strip() or None,
        s[72:75].strip() or None,
        s[76:79].strip() or None,
        s[80:85].strip() or None,
    ))
)
stations_df_raw = spark.createDataFrame(stn_rdd, stn_schema).dropDuplicates(["id"])

# Light cleaning: convert NaNs→nulls; clip lat/lon to valid bounds; normalize text
stations_df = (stations_df_raw
    .withColumn("lat",  F.when(F.isnan("lat"),  None).otherwise(F.col("lat")))
    .withColumn("lon",  F.when(F.isnan("lon"),  None).otherwise(F.col("lon")))
    .withColumn("elev", F.when(F.isnan("elev"), None).otherwise(F.col("elev")))
    .filter((F.col("lat").isNull()) | ((F.col("lat") >= -90) & (F.col("lat") <= 90)))
    .filter((F.col("lon").isNull()) | ((F.col("lon") >= -180) & (F.col("lon") <= 180)))
    .withColumn("state", F.upper(F.col("state")))
    .withColumn("name",  F.trim(F.col("name")))
)

(stations_df.write.mode("overwrite").parquet(STATIONS_PQ))

# --- Parse ghcnd-inventory.txt (fixed width) ---
inv_schema = T.StructType([
    T.StructField("id",        T.StringType(), False),
    T.StructField("lat",       T.DoubleType(), True),
    T.StructField("lon",       T.DoubleType(), True),
    T.StructField("element",   T.StringType(), False),
    T.StructField("first_year",T.IntegerType(), True),
    T.StructField("last_year", T.IntegerType(), True),
])

inv_rdd = (spark.read.text(INVENTORY_TXT).rdd
    .map(lambda r: r[0])
    .filter(lambda s: len(s) >= 45)
    .map(lambda s: (
        s[0:11].strip(),
        float(s[12:20].strip() or "nan"),
        float(s[21:30].strip() or "nan"),
        s[31:35].strip(),
        int((s[36:40].strip() or "0")),
        int((s[41:45].strip() or "0")),
    ))
)
inventory_df_raw = spark.createDataFrame(inv_rdd, inv_schema)

inventory_df = (inventory_df_raw
    .withColumn("lat",  F.when(F.isnan("lat"),  None).otherwise("lat"))
    .withColumn("lon",  F.when(F.isnan("lon"),  None).otherwise("lon"))
    .filter(F.col("element").isin("TMAX","TMIN","PRCP","TAVG","SNOW","SNWD"))  # keep common ones (adjust if you like)
)

(inventory_df.write.mode("overwrite").parquet(INVENTORY_PQ))

# --- Tiny helper table used often in Gold: coverage per station for your core elements ---
coverage_df = (inventory_df
    .filter(F.col("element").isin("TMAX","TMIN","PRCP"))
    .groupBy("id")
    .agg(
        F.min("first_year").alias("min_first_year"),
        F.max("last_year").alias("max_last_year")
    )
)

(coverage_df.write.mode("overwrite").parquet(COVERAGE_PQ))

print("Saved:")
print("  stations  ->", STATIONS_PQ)
print("  inventory ->", INVENTORY_PQ)
print("  coverage  ->", COVERAGE_PQ)




Saved:
  stations  -> /home/ubuntu/spark-notebooks/project/data/silver_meta/stations.parquet
  inventory -> /home/ubuntu/spark-notebooks/project/data/silver_meta/inventory.parquet
  coverage  -> /home/ubuntu/spark-notebooks/project/data/silver_meta/coverage.parquet


                                                                                