In [0]:
# COMMAND ----------
from __future__ import annotations
from typing import Dict, Any, Iterable, Optional
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, explode, lit
from pyspark.sql.types import StructType

In [0]:
# --- ensure exceptions exist, even if configs hasn't run ---
try:
    ConfigError
except NameError:
    class ConfigError(Exception): ...
try:
    DataQualityError
except NameError:
    class DataQualityError(Exception): ...

# --- logger shim (uses 'log' from logging_config if available) ---
try:
    log
except NameError:
    import logging
    log = logging.getLogger("functions")
    if not log.handlers:
        log.addHandler(logging.StreamHandler())
    log.setLevel(logging.INFO)

# CE path helpers
def _to_local_dbfs(p: str) -> str:
    return "/dbfs/" + p.split("dbfs:/", 1)[1].lstrip("/") if p.startswith("dbfs:/") else p

def _mkdirs_dbfs_dir(dbfs_dir: str):
    try:
        dbutils.fs.mkdirs(dbfs_dir)
    except Exception as e:
        raise ConfigError(f"Cannot create DBFS directory {dbfs_dir}: {e}") from e


In [0]:
# --- I/O helpers -------------------------------------------------------------

def read_json_files(path: str, mode: str = "PERMISSIVE"):
    try:
        # Spark read with common-safe options
        df = (spark.read
              .option("multiLine", "true")
              .option("mode", mode)           # PERMISSIVE | DROPMALFORMED | FAILFAST
              .json(path))
        return df
    except Exception as e:
        # Bubble up as ConfigError so your main's except blocks work
        raise ConfigError(f"Failed to read JSON from {path}: {e}") from e

In [0]:
def write_delta(
    df: DataFrame,
    path: str,
    *,
    mode: str = "append",
    merge_schema: bool = True
) -> None:
    """
    Write a DataFrame to Delta.

    Parameters
    ----------
    df : DataFrame
        DataFrame to persist.
    path : str
        Output Delta path.
    mode : str
        Save mode: 'append' or 'overwrite'.
    merge_schema : bool
        Whether to merge schema on write.

    Raises
    ------
    DataQualityError
        If df is empty, to prevent writing bad checkpoints.
    """
    if df.rdd.isEmpty():
        raise DataQualityError(f"Refusing to write empty DataFrame to {path}")
    logger.info(f"Writing Delta to {path} (mode={mode}, merge_schema={merge_schema})")
    (df.write
       .format("delta")
       .mode(mode)
       .option("mergeSchema", str(merge_schema).lower())
       .save(path))
    logger.info("Write complete.")


In [0]:
# --- Transformations (example shapes for NASA NEO) ---------------------------

def flatten_neo_feed(df: DataFrame) -> DataFrame:
    """
    Flatten a NASA NEO 'feed' JSON structure into a row-per-object DataFrame.

    Notes
    -----
    This assumes input like:
      {'near_earth_objects': {'2020-01-01': [ {...}, {...} ], '2020-01-02': [...] }, ...}

    Parameters
    ----------
    df : DataFrame
        Raw DataFrame loaded from the feed format.

    Returns
    -------
    DataFrame
        Flattened DataFrame with one row per NEO object and a 'close_approach_date' column.
    """
    logger.info("Flattening NEO feed structure")
    # near_earth_objects is a map[date -> array]
    neo_map_col = "near_earth_objects"
    exploded_dates = df.selectExpr(f"stack(1, {neo_map_col}) as tmp") if neo_map_col not in df.columns else df.select(neo_map_col)
    if neo_map_col in df.columns:
        # explode map into (date, array)
        exploded = df.selectExpr("inline(near_earth_objects) as (close_date, objs)")
    else:
        exploded = df  # fallback for already exploded structures

    # explode the array of objects
    flattened = exploded.select(col("close_date"), explode(col("objs")).alias("obj"))
    # Pull common fields to top-level; modify as needed based on your raw schema
    cols = [
        col("close_date").alias("close_approach_date"),
        col("obj.id").cast("string").alias("neo_id"),
        col("obj.name").alias("name"),
        col("obj.is_potentially_hazardous_asteroid").alias("is_hazardous"),
        col("obj.nasa_jpl_url").alias("reference_url"),
    ]
    # Example nested: close_approach_data[0].relative_velocity.kilometers_per_second
    # Keep it defensive for missing arrays
    first = col("obj.close_approach_data")[0]
    velocity = first["relative_velocity"]["kilometers_per_second"].cast("double").alias("kps")
    miss_distance_km = first["miss_distance"]["kilometers"].cast("double").alias("miss_km")

    result = flattened.select(*cols, velocity, miss_distance_km)
    logger.info(f"Flattened to {result.count()} rows")
    return result


In [0]:
# put this in functions (or wherever expect_non_empty lives)

def expect_non_empty(df, step: str = "dataframe"):
    """Works for Spark & pandas; raises DataQualityError if empty."""
    # Detect Spark DF without importing at top-level in case pandas is used
    try:
        from pyspark.sql import DataFrame as SparkDF  # available on Databricks
        is_spark = isinstance(df, SparkDF)
    except Exception:
        is_spark = False

    try:
        if is_spark:
            # Fast + safe emptiness checks for Spark
            # 1) try RDD.isEmpty(); if that fails, 2) use head(1)
            try:
                empty = df.rdd.isEmpty()
            except Exception:
                empty = (df.head(1) == [])  # no action if at least one row
        else:
            # pandas (or pandas-like)
            empty = getattr(df, "empty", None)
            if empty is None:
                # last-resort check for sequences
                empty = (len(df) == 0)  # only for objects that support len()
    except TypeError:
        # If len() is not supported (e.g., Spark DF leaked here), fallback to safe Spark head(1)
        try:
            empty = (df.head(1) == [])
        except Exception:
            # If we truly can't check, assume not empty to avoid false failures
            empty = False

    if empty:
        raise DataQualityError(f"{step} is empty")
