In [0]:
# 01_ingest_ecb_fx_rates
# Bronze: ECB daily FX (USD, JPY, CNY) quoted as TARGET per 1 EUR (EXR/D.<CUR>.EUR.SP00.A)
# Output: fx_impact.bronze_ecb_fx_rates  (managed Delta; partitioned by currency)


In [0]:
# Cell 1 — Imports & repo wiring
import sys, importlib, inspect
from datetime import date

# Adjust if your repo path differs
repo_root = "/Workspace/Users/chiuyunhan@gmail.com/germany_import_cost_fx_impact"
if repo_root not in sys.path:
    sys.path.append(repo_root)

import scripts.api_clients as api_clients
import scripts.utils as utils
importlib.reload(api_clients)
importlib.reload(utils)

print("api_clients:", inspect.getsourcefile(api_clients))
print("utils:", inspect.getsourcefile(utils))


In [0]:
# Cell 2 — Params
# Timeline: cover 2018–2025+ (dynamic to 'today' so Silver never misses months)
START_DATE = "2018-01-01"
END_DATE   = date.today().isoformat()  # dynamic
CURRENCIES = ("USD","JPY","CNY")


In [0]:
# Cell 3 — Fetch from ECB (with fallback parser)

import pandas as pd
from scripts.api_clients import fetch_ecb_fx_daily

# Pull daily FX
fx_pdf = fetch_ecb_fx_daily(
    symbols=CURRENCIES,
    start=START_DATE,     # ⬅️ use start, not start_date
    end=END_DATE          # ⬅️ use end,   not end_date
)

# Minimal schema check
required = {"date","currency","fx_rate"}
missing = required - set(map(str.lower, fx_pdf.columns))
if missing:
    raise ValueError(f"Unexpected ECB columns; missing: {missing}. Got: {fx_pdf.columns.tolist()}")

# Normalize columns just in case
fx_pdf.columns = [c.lower() for c in fx_pdf.columns]
fx_pdf = fx_pdf[["date","currency","fx_rate"]]
fx_pdf["date"] = pd.to_datetime(fx_pdf["date"])
fx_pdf["fx_rate"] = pd.to_numeric(fx_pdf["fx_rate"], errors="coerce")

print(f"Rows fetched: {len(fx_pdf):,}  |  Range: {fx_pdf['date'].min().date()} → {fx_pdf['date'].max().date()}")
fx_pdf.head(3)


In [0]:
# Cell 4 — To Spark + metadata
from pyspark.sql import functions as F
from scripts.utils import with_ingest_meta

sdf = spark.createDataFrame(fx_pdf)
sdf = (sdf
       .withColumn("date", F.to_date("date"))
       .withColumn("currency", F.col("currency").cast("string"))
       .withColumn("fx_rate", F.col("fx_rate").cast("double")))
sdf = with_ingest_meta(sdf, src="ECB_SDMX_EXR_D_SP00_A")

display(sdf.limit(10))

In [0]:
# Cell 5 — DQ checks (currency-aware)
# 1) No duplicate (date, currency)
dups = sdf.groupBy("date","currency").count().filter("count > 1")
assert dups.count() == 0, "Duplicate (date, currency) rows detected."

# 2) Non-positive values are invalid
assert sdf.filter((F.col("fx_rate") <= 0) & F.col("fx_rate").isNotNull()).count() == 0, "Found non-positive fx_rate."

# 3) Currency-specific sanity ranges (broad, 2018–today)
ranges = {
    "USD": (0.5, 2.0),
    "CNY": (5.0, 12.0),
    "JPY": (80.0, 300.0),
}
violations = None
for cur, (lo, hi) in ranges.items():
    v = sdf.filter((F.col("currency")==cur) & F.col("fx_rate").isNotNull() &
                   ((F.col("fx_rate") < lo) | (F.col("fx_rate") > hi)))
    violations = v if violations is None else violations.unionByName(v)

if violations and violations.count() > 0:
    display(violations.orderBy("currency","date").limit(50))
    raise AssertionError("FX rates outside expected ranges. See 'violations' preview above.")

# 4) Nulls are allowed in Bronze (holidays); track null share by currency
profile = (sdf.groupBy("currency")
           .agg(F.count("*").alias("rows"),
                F.sum(F.when(F.col("fx_rate").isNull(), 1).otherwise(0)).alias("nulls"),
                F.min("date").alias("min_date"),
                F.max("date").alias("max_date"),
                F.min("fx_rate").alias("min_fx"),
                F.max("fx_rate").alias("max_fx")))
display(profile)



In [0]:
# Cell 6 — Write managed Delta (no FileStore)

spark.sql("CREATE DATABASE IF NOT EXISTS fx_impact")

# Small dataset → safe to overwrite fully
# Easiest: overwrite + overwriteSchema
(sdf.write
   .format("delta")
   .mode("overwrite")
   .option("overwriteSchema", "true")
   .partitionBy("currency")
   .saveAsTable("fx_impact.bronze_ecb_fx_rates"))

print("✅ Wrote table: fx_impact.bronze_ecb_fx_rates")

In [0]:
spark.table("fx_impact.bronze_ecb_fx_rates").printSchema()


In [0]:
# Cell 7 — Smoke tests / coverage
fx = spark.table("fx_impact.bronze_ecb_fx_rates")

display(
  fx.groupBy("currency")
    .agg(F.min("date").alias("min_date"),
         F.max("date").alias("max_date"),
         F.count("*").alias("rows"))
    .orderBy("currency")
)

# Months present (useful to check alignment with Comtrade)
display(
  fx.select(F.to_date(F.date_trunc("month","date")).alias("month"))
    .distinct().orderBy("month").limit(12)
)
