In [0]:
from pyspark.sql import functions as F, types as T
import re
import json

dbutils.widgets.text("dt", "")
dbutils.widgets.text("bucket", "")
dbutils.widgets.text("bronze_prefix", "")
dbutils.widgets.dropdown("target", "all", ["all", "chain_daily_state", "active_locks_daily"])
target = dbutils.widgets.get("target")

dbutils.widgets.dropdown("debug", "false", ["false","true"])
debug = dbutils.widgets.get("debug") == "true"


In [0]:
# Static variables
DEC_UOPT = T.DecimalType(38, 0)
DEC_SHARES = T.DecimalType(38, 18)

CHAIN_DAILY_SCHEMA = T.StructType([
    T.StructField("dt", T.StringType(), False),
    T.StructField("height", T.LongType(), True),
    T.StructField("snapshot_ts", T.TimestampType(), True),
    T.StructField("total_supply_uopt", DEC_UOPT, True),
    T.StructField("total_locked_uopt", DEC_UOPT, True),
    T.StructField("bonded_uopt", DEC_UOPT, True),
    T.StructField("not_bonded_uopt", DEC_UOPT, True),
    T.StructField("source_run_ids", T.MapType(T.StringType(), T.StringType()), True),
])


In [0]:
# validating input variables
dt = dbutils.widgets.get("dt")
bucket = dbutils.widgets.get("bucket").strip()
bronze_prefix = dbutils.widgets.get("bronze_prefix").strip().rstrip("/")

if not dt:
    raise ValueError("dt is required (YYYY-MM-DD)")
if not re.match(r"^\d{4}-\d{2}-\d{2}$", dt):
    raise ValueError(f"dt must be YYYY-MM-DD, got: {dt}")
if not bucket:
    raise ValueError("bucket is required")
if not bronze_prefix:
    raise ValueError("bronze_prefix is required (e.g., raw/optio)")

bronze_dt = f"s3://{bucket}/{bronze_prefix}/dt={dt}"
manifest_path = f"{bronze_dt}/_MANIFEST.json"


In [0]:
# Helper functions

def read_manifest_or_raise(manifest_path: str, dt_prefix: str) -> dict:
    try:
        txt = dbutils.fs.head(manifest_path, 1000000)
        return json.loads(txt)
    except Exception as e:
        # optional troubleshooting detail
        if path_exists(dt_prefix):
            raise FileNotFoundError(
                f"Manifest missing at {manifest_path}, but dt prefix exists: {dt_prefix}. "
                f"Possible partial run or wrong manifest name."
            ) from e
        raise FileNotFoundError(
            f"dt prefix does not exist (or cannot be listed): {dt_prefix}. "
            f"Check dt/bucket/bronze_prefix."
        ) from e

def dataset_run_path(bucket: str, bronze_prefix: str, dt: str, manifest: dict, dataset: str) -> str:
    info = manifest["datasets"].get(dataset)
    if not info:
        raise ValueError(f"Dataset missing from manifest: {dataset}")
    if info.get("status") != "SUCCESS":
        raise ValueError(f"Dataset not SUCCESS in manifest: {dataset} => {info}")
    run_id = info["run_id"]
    return f"s3://{bucket}/{bronze_prefix}/dt={dt}/dataset={dataset}/run_id={run_id}"

def read_bronze_jsonl_gz(run_path: str):
    return spark.read.json(f"{run_path}/part-*.jsonl.gz")


def as_dec_uopt(col):
    return F.col(col).cast(DEC_UOPT)

def as_ts(col):
    return F.to_timestamp(F.col(col))

def as_date(col):
    return F.to_date(F.col(col))

def json_get_dec(raw_json_col: str, *json_paths: str):
    """
    Extract the first non-null value among json_paths from raw_json as DECIMAL(38,0).
    raw_json may be struct or string; to_json normalizes.
    """
    raw_str = F.to_json(F.col(raw_json_col))
    candidates = [F.get_json_object(raw_str, p) for p in json_paths]
    return F.coalesce(*[c.cast("string") for c in candidates]).cast(DEC_UOPT)

def extract_uopt_from_supply_df(df_supply):
    """
    Your observed shape:
    raw_json = {"supply":[{"amount":"...","denom":"..."}, ...], ...}
    Extract the uOPT amount.
    """
    raw_json_schema = T.StructType([
        T.StructField("supply", T.ArrayType(
            T.StructType([
                T.StructField("amount", T.StringType(), True),
                T.StructField("denom", T.StringType(), True),
            ])
        ), True)
    ])

    raw_str = F.to_json(F.col("raw_json"))
    parsed = df_supply.withColumn("raw_parsed", F.from_json(raw_str, raw_json_schema))

    coins = (parsed
        .withColumn("coin", F.explode(F.col("raw_parsed.supply")))
        .select(
            F.col("snapshot_height").cast("bigint").alias("height"),
            F.to_timestamp("extracted_at").alias("snapshot_ts"),
            F.col("coin.denom").alias("denom"),
            F.col("coin.amount").alias("amount_str"),
        )
    )

    uopt = (coins
        .where(F.col("denom") == F.lit("uOPT"))
        .select(
            "height",
            "snapshot_ts",
            F.col("amount_str").cast("string").cast(DEC_UOPT).alias("total_supply_uopt"),
        )
    )

    return uopt


def require_single_row(df, label: str):
    rows = df.limit(2).collect()
    if len(rows) != 1:
        raise ValueError(f"{label} expected exactly 1 row, found {len(rows)}")
    return rows[0]


In [0]:
# Transform functions
def build_silver_active_locks_daily(bucket: str, bronze_prefix: str, dt: str, manifest: dict):
    dataset = "lockup_active_locks"
    run_id = manifest["datasets"][dataset]["run_id"]
    run_path = dataset_run_path(bucket, bronze_prefix, dt, manifest, dataset)

    bronze = read_bronze_jsonl_gz(run_path)

    # Create deterministic lock_id from stable business fields
    df = (bronze
        .select(
            F.lit(dt).alias("dt"),
            F.col("snapshot_height").cast("bigint").alias("height"),
            as_ts("extracted_at").alias("snapshot_ts"),
            F.col("address").cast("string").alias("address"),
            as_date("unlock_date").alias("unlock_date"),
            F.col("amount_denom").cast("string").alias("denom"),
            as_dec_uopt("amount_uopt").alias("amount_uopt"),
            F.lit(run_id).alias("source_run_id"),
        )
        .withColumn(
            "lock_id",
            F.sha2(
                F.concat_ws("|",
                    F.col("dt"),
                    F.col("height").cast("string"),
                    F.col("address"),
                    F.col("unlock_date").cast("string"),
                    F.col("denom"),
                    F.col("amount_uopt").cast("string"),
                ),
                256
            )
        )
        .dropDuplicates(["dt", "lock_id"])
    )
    return df


def build_silver_chain_daily_state(bucket: str, bronze_prefix: str, dt: str, manifest: dict):
    ds_supply = "bank_total_supply"
    ds_locked = "lockup_total_locked"
    ds_pool   = "staking_pool"

    p_supply = dataset_run_path(bucket, bronze_prefix, dt, manifest, ds_supply)
    p_locked = dataset_run_path(bucket, bronze_prefix, dt, manifest, ds_locked)
    p_pool   = dataset_run_path(bucket, bronze_prefix, dt, manifest, ds_pool)

    df_supply = read_bronze_jsonl_gz(p_supply)
    df_locked = read_bronze_jsonl_gz(p_locked)
    df_pool   = read_bronze_jsonl_gz(p_pool)

    # Guardrails
    require_single_row(df_supply, "bank_total_supply")
    require_single_row(df_locked, "lockup_total_locked")
    require_single_row(df_pool, "staking_pool")

    uopt_supply_df = extract_uopt_from_supply_df(df_supply)

    # Guardrail: should find exactly one uOPT row
    r_supply = require_single_row(uopt_supply_df, "bank_total_supply uOPT coin row")

    r_locked = require_single_row(df_locked.select(
        json_get_dec("raw_json",
            "$.amount.amount",
            "$.amount",
            "$.total_locked.amount",
            "$.total_locked",
            "$.coin.amount"
        ).alias("total_locked_uopt")
        ))

    r_pool = require_single_row(
    df_pool.select(
        json_get_dec("raw_json", "$.bonded_tokens", "$.pool.bonded_tokens").alias("bonded_uopt"),
        json_get_dec("raw_json", "$.not_bonded_tokens", "$.pool.not_bonded_tokens").alias("not_bonded_uopt"),
        ),
    "staking_pool projection"
    )

    source_run_ids = {
        ds_supply: manifest["datasets"][ds_supply]["run_id"],
        ds_locked: manifest["datasets"][ds_locked]["run_id"],
        ds_pool: manifest["datasets"][ds_pool]["run_id"],
    }

    # Debug: show extracted values before building DF
    if debug:
        print("r_supply:", dict(r_supply.asDict()))
        print("r_locked:", dict(r_locked.asDict()))
        print("r_pool:", dict(r_pool.asDict()))

    row_dict = {
        "dt": dt,
        "height": int(r_supply["height"]) if r_supply["height"] is not None else None,
        "snapshot_ts": r_supply["snapshot_ts"],
        "total_supply_uopt": r_supply["total_supply_uopt"],
        "total_locked_uopt": r_locked["total_locked_uopt"],
        "bonded_uopt": r_pool["bonded_uopt"],
        "not_bonded_uopt": r_pool["not_bonded_uopt"],
        "source_run_ids": source_run_ids,
    }

    out = spark.createDataFrame([row_dict], schema=CHAIN_DAILY_SCHEMA)

    # Fail fast if any critical numeric came back null
    row = out.collect()[0]
    for k in ["total_supply_uopt", "total_locked_uopt", "bonded_uopt", "not_bonded_uopt"]:
        if row[k] is None:
            raise ValueError(f"{k} resolved to null. Check raw_json shapes in bronze datasets for dt={dt}.")

    assert row["total_supply_uopt"] >= row["bonded_uopt"], "Supply should be >= bonded"
    assert row["total_supply_uopt"] >= row["total_locked_uopt"], "Supply should be >= locked"

    return out


In [0]:
# Execution with gating based on target input

manifest = read_manifest_or_raise(manifest_path, bronze_dt)
if debug:
    display(manifest)


if target in ("all", "active_locks_daily"):
    df_locks = build_silver_active_locks_daily(bucket, bronze_prefix, dt, manifest)
    if debug:
        print("rows:", df_locks.count())
        print("distinct lock_id:", df_locks.select("lock_id").distinct().count())
        display(df_locks.limit(10))

    target_table = "optio_warehouse.silver.silver_active_locks_daily"
    (df_locks.write
    .format("delta")
    .mode("overwrite")
    .option("replaceWhere", f"dt = '{dt}'")
    .saveAsTable(target_table))

if target in ("all", "chain_daily_state"):
    df_chain = build_silver_chain_daily_state(bucket, bronze_prefix, dt, manifest)
    if debug:
        display(df_chain)
        print("rows:", df_chain.count())

    target_table_chain = "optio_warehouse.silver.silver_chain_daily_state"
    (df_chain.write
    .format("delta")
    .mode("overwrite")
    .option("replaceWhere", f"dt = '{dt}'")
    .saveAsTable(target_table_chain))
