# ConversionCentral Managed Profiling
Run this notebook from a Databricks Repo so backend deployments control profiling logic.

In [None]:
# Collect parameters passed by the FastAPI backend
# Each widget is declared up front so Databricks jobs can safely supply overrides.
dbutils.widgets.text("table_group_id", "")
dbutils.widgets.text("profile_run_id", "")
dbutils.widgets.text("data_quality_schema", "")
dbutils.widgets.text("payload_path", "")
dbutils.widgets.text("payload_base_path", "")
dbutils.widgets.text("callback_url", "")
dbutils.widgets.text("callback_base_url", "")
dbutils.widgets.text("callback_token", "")
dbutils.widgets.text("payload_storage", "")
dbutils.widgets.text("callback_behavior", "")
dbutils.widgets.text("catalog", "")
dbutils.widgets.text("schema_name", "")
dbutils.widgets.text("connection_id", "")
dbutils.widgets.text("connection_name", "")
dbutils.widgets.text("system_id", "")
dbutils.widgets.text("project_key", "")
dbutils.widgets.text("http_path", "")

from datetime import datetime
import json
import requests
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
table_group_id = dbutils.widgets.get("table_group_id")
profile_run_id = dbutils.widgets.get("profile_run_id")
dq_schema = (dbutils.widgets.get("data_quality_schema") or "").strip()
raw_payload_path = (dbutils.widgets.get("payload_path") or "").strip()
payload_path = raw_payload_path or None
payload_base_path = (dbutils.widgets.get("payload_base_path") or "").strip() or None
callback_url = (dbutils.widgets.get("callback_url") or "").strip() or None
callback_base_url = (dbutils.widgets.get("callback_base_url") or "").strip() or None
callback_token = (dbutils.widgets.get("callback_token") or "").strip() or None
connection_catalog = (dbutils.widgets.get("catalog") or "").strip()
connection_schema = (dbutils.widgets.get("schema_name") or "").strip()

if not table_group_id or not profile_run_id:
    raise ValueError("Required widgets missing: table_group_id/profile_run_id")
if not dq_schema:
    raise ValueError("Data quality schema widget is required for profiling runs.")

In [None]:
# Profile the tables registered for this table group and build the result payload.
from datetime import datetime
import re
from contextlib import suppress
from typing import Iterable

import datetime as dt
import hashlib
import json
import math

from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, BinaryType, MapType, StructType
from pyspark.sql import types as T
from pyspark.sql.utils import AnalysisException

MAX_COLUMNS_TO_PROFILE = 25
NULL_RATIO_ALERT_THRESHOLD = 0.5
HIGH_NULL_RATIO_THRESHOLD = 0.9
VALUE_DISTRIBUTION_LIMIT = 25
VALUE_DISTRIBUTION_DISTINCT_THRESHOLD = 1000
VALUE_DISTRIBUTION_MAX_ROWS = 5_000_000
MAX_VALUE_DISPLAY_LENGTH = 256

PROFILE_COLUMN_FIELDS = [
    "profile_run_id",
    "schema_name",
    "table_name",
    "column_name",
    "qualified_name",
    "data_type",
    "general_type",
    "ordinal_position",
    "row_count",
    "null_count",
    "non_null_count",
    "distinct_count",
    "min_value",
    "max_value",
    "avg_value",
    "stddev_value",
    "median_value",
    "p95_value",
    "true_count",
    "false_count",
    "min_length",
    "max_length",
    "avg_length",
    "non_ascii_ratio",
    "min_date",
    "max_date",
    "date_span_days",
    "metrics_json",
    "generated_at",
]

PROFILE_COLUMNS_SCHEMA = T.StructType(
    [
        T.StructField("profile_run_id", T.StringType(), False),
        T.StructField("schema_name", T.StringType(), True),
        T.StructField("table_name", T.StringType(), False),
        T.StructField("column_name", T.StringType(), False),
        T.StructField("qualified_name", T.StringType(), True),
        T.StructField("data_type", T.StringType(), True),
        T.StructField("general_type", T.StringType(), True),
        T.StructField("ordinal_position", T.IntegerType(), True),
        T.StructField("row_count", T.LongType(), True),
        T.StructField("null_count", T.LongType(), True),
        T.StructField("non_null_count", T.LongType(), True),
        T.StructField("distinct_count", T.LongType(), True),
        T.StructField("min_value", T.StringType(), True),
        T.StructField("max_value", T.StringType(), True),
        T.StructField("avg_value", T.DoubleType(), True),
        T.StructField("stddev_value", T.DoubleType(), True),
        T.StructField("median_value", T.DoubleType(), True),
        T.StructField("p95_value", T.DoubleType(), True),
        T.StructField("true_count", T.LongType(), True),
        T.StructField("false_count", T.LongType(), True),
        T.StructField("min_length", T.IntegerType(), True),
        T.StructField("max_length", T.IntegerType(), True),
        T.StructField("avg_length", T.DoubleType(), True),
        T.StructField("non_ascii_ratio", T.DoubleType(), True),
        T.StructField("min_date", T.DateType(), True),
        T.StructField("max_date", T.DateType(), True),
        T.StructField("date_span_days", T.IntegerType(), True),
        T.StructField("metrics_json", T.StringType(), True),
        T.StructField("generated_at", T.TimestampType(), True),
    ]
)

PROFILE_COLUMN_VALUES_FIELDS = [
    "profile_run_id",
    "schema_name",
    "table_name",
    "column_name",
    "value",
    "value_hash",
    "frequency",
    "relative_freq",
    "rank",
    "bucket_label",
    "bucket_lower_bound",
    "bucket_upper_bound",
    "generated_at",
]

PROFILE_COLUMN_VALUES_SCHEMA = T.StructType(
    [
        T.StructField("profile_run_id", T.StringType(), False),
        T.StructField("schema_name", T.StringType(), True),
        T.StructField("table_name", T.StringType(), False),
        T.StructField("column_name", T.StringType(), False),
        T.StructField("value", T.StringType(), True),
        T.StructField("value_hash", T.StringType(), True),
        T.StructField("frequency", T.LongType(), True),
        T.StructField("relative_freq", T.DoubleType(), True),
        T.StructField("rank", T.IntegerType(), True),
        T.StructField("bucket_label", T.StringType(), True),
        T.StructField("bucket_lower_bound", T.DoubleType(), True),
        T.StructField("bucket_upper_bound", T.DoubleType(), True),
        T.StructField("generated_at", T.TimestampType(), True),
    ]
)

def _split_identifier(value: str | None) -> list[str]:
    cleaned = (value or "").replace("`", "").strip()
    if not cleaned:
        return []
    return [segment.strip() for segment in cleaned.split(".") if segment.strip()]


def _catalog_component(value: str | None) -> str | None:
    parts = _split_identifier(value)
    if len(parts) >= 2:
        return parts[0]
    return None


def _schema_component(value: str | None) -> str | None:
    parts = _split_identifier(value)
    if not parts:
        return None
    return parts[-1]


def _qualify(*parts: Iterable[str | None]) -> str:
    tokens: list[str] = []
    for part in parts:
        if isinstance(part, (list, tuple)):
            tokens.extend([token for token in part if token])
        elif part:
            tokens.append(part)
    if not tokens:
        raise ValueError("Cannot build a fully qualified identifier with no parts.")
    return ".".join(f"`{token}`" for token in tokens)


metadata_catalog = _catalog_component(dq_schema)
metadata_schema = _schema_component(dq_schema)
if metadata_schema is None:
    raise ValueError("Unable to resolve schema portion of the data quality schema setting.")
if metadata_catalog is None:
    fallback_catalog = _catalog_component(connection_catalog)
    if fallback_catalog:
        metadata_catalog = fallback_catalog
    else:
        with suppress(Exception):
            metadata_catalog = spark.catalog.currentCatalog()

connection_catalog_clean = _catalog_component(connection_catalog)
connection_schema_clean = _schema_component(connection_schema)


def _metadata_table(name: str) -> str:
    return _qualify(metadata_catalog, metadata_schema, name) if metadata_catalog else _qualify(metadata_schema, name)


def _compile_patterns(mask: str | None) -> list[re.Pattern[str]]:
    if not mask:
        return []
    tokens = [token.strip() for token in re.split(r"[\n,]+", mask) if token.strip()]
    compiled: list[re.Pattern[str]] = []
    for token in tokens:
        escaped = re.escape(token).replace("\\*", ".*").replace("\\%", ".*")
        compiled.append(re.compile(f"^{escaped}$", re.IGNORECASE))
    return compiled


def _matches_pattern(patterns: list[re.Pattern[str]], schema_name: str | None, table_name: str) -> bool:
    if not patterns:
        return False
    candidate_full = ".".join(filter(None, [(schema_name or "").lower(), table_name.lower()])).strip(".")
    short_name = table_name.lower()
    for pattern in patterns:
        if pattern.match(candidate_full) or pattern.match(short_name):
            return True
    return False


def _qualify_data_table(raw_schema: str | None, table_name: str) -> str:
    table_tokens = _split_identifier(table_name)
    if len(table_tokens) >= 2:
        return _qualify(table_tokens)

    schema_tokens = _split_identifier(raw_schema)
    if len(schema_tokens) >= 2:
        return _qualify(schema_tokens + table_tokens)

    catalog_part = connection_catalog_clean
    schema_part = connection_schema_clean
    if len(schema_tokens) == 1:
        schema_part = schema_tokens[0]
    elif schema_tokens:
        schema_part = schema_tokens[-1]

    return _qualify(catalog_part, schema_part, table_tokens[0] if table_tokens else table_name)


def _select_profile_columns(df) -> list[str]:
    allowed: list[str] = []
    for field in df.schema.fields:
        if isinstance(field.dataType, (BinaryType, MapType, ArrayType, StructType)):
            continue
        allowed.append(field.name)
        if len(allowed) >= MAX_COLUMNS_TO_PROFILE:
            break
    return allowed


def _record_anomaly(buffer: list[dict[str, str]], table_name: str, column_name: str | None, anomaly_type: str, severity: str, description: str, detected_at: str) -> None:
    buffer.append(
        {
            "table_name": table_name,
            "column_name": column_name,
            "anomaly_type": anomaly_type,
            "severity": severity,
            "description": description,
            "detected_at": detected_at,
        }
    )


def _infer_general_type(data_type: T.DataType | None) -> str:
    if isinstance(data_type, T.BooleanType):
        return "B"
    if isinstance(data_type, (T.StringType,)):
        return "A"
    if isinstance(data_type, (T.DateType, T.TimestampType)):
        return "D"
    if isinstance(
        data_type,
        (
            T.ByteType,
            T.ShortType,
            T.IntegerType,
            T.LongType,
            T.FloatType,
            T.DoubleType,
            T.DecimalType,
        ),
    ):
        return "N"
    return "X"


def _stringify_metric_value(value: object) -> str | None:
    if value is None:
        return None
    if isinstance(value, (dt.datetime, dt.date)):
        return value.isoformat()
    if isinstance(value, bytes):
        return value.hex()
    return str(value)


def _safe_float(value: object) -> float | None:
    if value is None:
        return None
    try:
        numeric_value = float(value)
    except (TypeError, ValueError):
        return None
    if math.isnan(numeric_value) or math.isinf(numeric_value):
        return None
    return numeric_value


def _approx_distinct_count(df, column_name: str) -> int | None:
    try:
        result = df.select(F.approx_count_distinct(F.col(column_name)).alias("distinct_count")).collect()[0]
        value = result.get("distinct_count")
        return int(value) if value is not None else None
    except Exception as exc:  # noqa: BLE001
        print(f"approx_count_distinct failed for column {column_name}: {exc}")
        return None


def _numeric_stats(df, column_name: str) -> tuple[str | None, str | None, float | None, float | None, float | None, float | None]:
    numeric_col = F.col(column_name).cast("double")
    try:
        stats_row = df.select(
            F.min(numeric_col).alias("min_value"),
            F.max(numeric_col).alias("max_value"),
            F.avg(numeric_col).alias("avg_value"),
            F.stddev_pop(numeric_col).alias("stddev_value"),
            F.percentile_approx(numeric_col, [0.5, 0.95], 1000).alias("percentiles"),
        ).collect()[0]
    except Exception as exc:  # noqa: BLE001
        print(f"Numeric metric collection failed for column {column_name}: {exc}")
        return (None, None, None, None, None, None)

    min_value = _stringify_metric_value(stats_row.get("min_value"))
    max_value = _stringify_metric_value(stats_row.get("max_value"))
    avg_value = _safe_float(stats_row.get("avg_value"))
    stddev_value = _safe_float(stats_row.get("stddev_value"))
    percentiles = stats_row.get("percentiles") or []
    median_value = _safe_float(percentiles[0]) if len(percentiles) > 0 else None
    p95_value = _safe_float(percentiles[1]) if len(percentiles) > 1 else None
    return (min_value, max_value, avg_value, stddev_value, median_value, p95_value)


def _string_length_stats(df, column_name: str, row_count: int) -> tuple[int | None, int | None, float | None, float | None]:
    try:
        length_col = F.length(F.col(column_name))
        stats_row = df.select(
            F.min(length_col).alias("min_length"),
            F.max(length_col).alias("max_length"),
            F.avg(length_col.cast("double")).alias("avg_length"),
            F.sum(F.when(F.col(column_name).cast("string").rlike(r"[^\u0000-\u007F]"), 1).otherwise(0)).alias("non_ascii_count"),
        ).collect()[0]
    except Exception as exc:  # noqa: BLE001
        print(f"String metric collection failed for column {column_name}: {exc}")
        return (None, None, None, None)

    min_length = stats_row.get("min_length")
    max_length = stats_row.get("max_length")
    avg_length = _safe_float(stats_row.get("avg_length"))
    non_ascii_count = stats_row.get("non_ascii_count")
    non_ascii_ratio = None
    if non_ascii_count is not None and row_count:
        non_ascii_ratio = float(non_ascii_count) / float(row_count)
    return (
        int(min_length) if min_length is not None else None,
        int(max_length) if max_length is not None else None,
        avg_length,
        non_ascii_ratio,
    )


def _boolean_stats(df, column_name: str) -> tuple[int | None, int | None]:
    bool_col = F.col(column_name).cast("boolean")
    try:
        stats_row = df.select(
            F.sum(F.when(bool_col == F.lit(True), 1).otherwise(0)).alias("true_count"),
            F.sum(F.when(bool_col == F.lit(False), 1).otherwise(0)).alias("false_count"),
        ).collect()[0]
    except Exception as exc:  # noqa: BLE001
        print(f"Boolean metric collection failed for column {column_name}: {exc}")
        return (None, None)

    true_count = stats_row.get("true_count")
    false_count = stats_row.get("false_count")
    return (
        int(true_count) if true_count is not None else None,
        int(false_count) if false_count is not None else None,
    )


def _date_stats(df, column_name: str) -> tuple[dt.date | None, dt.date | None, int | None]:
    date_col = F.to_date(F.col(column_name))
    try:
        stats_row = df.select(
            F.min(date_col).alias("min_date"),
            F.max(date_col).alias("max_date"),
        ).collect()[0]
    except Exception as exc:  # noqa: BLE001
        print(f"Date metric collection failed for column {column_name}: {exc}")
        return (None, None, None)

    min_date = stats_row.get("min_date")
    max_date = stats_row.get("max_date")
    if isinstance(min_date, dt.datetime):
        min_date = min_date.date()
    if isinstance(max_date, dt.datetime):
        max_date = max_date.date()
    date_span_days = None
    if min_date and max_date:
        date_span_days = (max_date - min_date).days
    return (min_date, max_date, int(date_span_days) if date_span_days is not None else None)


def _should_capture_value_distribution(general_type: str, row_count: int, distinct_count: int | None) -> bool:
    if not row_count or row_count > VALUE_DISTRIBUTION_MAX_ROWS:
        return False
    if general_type in {"A", "B"}:
        return True
    if general_type in {"N", "D"} and distinct_count is not None and distinct_count <= VALUE_DISTRIBUTION_DISTINCT_THRESHOLD:
        return True
    return False


def _render_value_display_and_hash(value: object) -> tuple[str | None, str]:
    if value is None:
        sentinel = "__NULL__"
        digest = hashlib.sha256(sentinel.encode("utf-8")).hexdigest()
        return ("NULL", digest)
    if isinstance(value, bytes):
        full_text = value.hex()
    elif isinstance(value, (dt.datetime, dt.date)):
        full_text = value.isoformat()
    else:
        full_text = str(value)
    display_text = full_text
    if len(display_text) > MAX_VALUE_DISPLAY_LENGTH:
        display_text = f"{display_text[: MAX_VALUE_DISPLAY_LENGTH - 3]}..."
    digest = hashlib.sha256(full_text.encode("utf-8")).hexdigest()
    return (display_text, digest)


def _collect_value_distribution_rows(
    df,
    column_name: str,
    schema_name: str | None,
    table_name: str,
    run_id: str,
    row_count: int,
) -> list[dict[str, object]]:
    rows: list[dict[str, object]] = []
    if not row_count:
        return rows
    try:
        freq_rows = (
            df.groupBy(F.col(column_name))
            .agg(F.count(F.lit(1)).alias("frequency"))
            .orderBy(F.desc("frequency"))
            .limit(VALUE_DISTRIBUTION_LIMIT)
            .collect()
        )
    except Exception as exc:  # noqa: BLE001
        print(f"Value distribution query failed for column {column_name}: {exc}")
        return rows

    for idx, freq_row in enumerate(freq_rows, start=1):
        frequency_value = freq_row.get("frequency")
        raw_value = freq_row.get(column_name)
        display_value, value_hash = _render_value_display_and_hash(raw_value)
        rows.append(
            {
                "profile_run_id": run_id,
                "schema_name": schema_name,
                "table_name": table_name,
                "column_name": column_name,
                "value": display_value,
                "value_hash": value_hash,
                "frequency": int(frequency_value) if frequency_value is not None else None,
                "relative_freq": (float(frequency_value) / float(row_count)) if frequency_value is not None and row_count else None,
                "rank": idx,
                "bucket_label": None,
                "bucket_lower_bound": None,
                "bucket_upper_bound": None,
                "generated_at": datetime.utcnow(),
            }
        )
    return rows


def _quote_sql_literal(value: str | None) -> str:
    if value is None:
        return "NULL"
    escaped = value.replace("'", "''")
    return f"'{escaped}'"


def _replace_profile_run_rows(table_name: str, run_id: str, frame) -> None:
    literal = _quote_sql_literal(run_id)
    spark.sql(f"DELETE FROM {table_name} WHERE profile_run_id = {literal}")
    frame.select(*frame.columns).write.insertInto(table_name, overwrite=False)


def _persist_profile_detail_tables(run_id: str, column_rows: list[dict[str, object]], value_rows: list[dict[str, object]]) -> None:
    if not column_rows and not value_rows:
        print("No column metrics were captured; skipping dq_profile_columns persistence.")
        return
    try:
        columns_table = _metadata_table("dq_profile_columns")
        values_table = _metadata_table("dq_profile_column_values")
    except Exception as exc:
        print(f"Unable to resolve metadata tables for column metrics: {exc}")
        return

    if column_rows:
        try:
            column_df = spark.createDataFrame(column_rows, PROFILE_COLUMNS_SCHEMA).select(*PROFILE_COLUMN_FIELDS)
            _replace_profile_run_rows(columns_table, run_id, column_df)
            print(f"Persisted {len(column_rows)} rows to {columns_table}.")
        except AnalysisException as exc:
            print(f"dq_profile_columns is unavailable; skipping column metric persistence: {exc}")
        except Exception as exc:  # noqa: BLE001
            print(f"Failed to persist dq_profile_columns: {exc}")

    if value_rows:
        try:
            value_df = spark.createDataFrame(value_rows, PROFILE_COLUMN_VALUES_SCHEMA).select(*PROFILE_COLUMN_VALUES_FIELDS)
            _replace_profile_run_rows(values_table, run_id, value_df)
            print(f"Persisted {len(value_rows)} rows to {values_table}.")
        except AnalysisException as exc:
            print(f"dq_profile_column_values is unavailable; skipping value distribution persistence: {exc}")
        except Exception as exc:  # noqa: BLE001
            print(f"Failed to persist dq_profile_column_values: {exc}")


metadata_tables_name = _metadata_table("dq_tables")
group_table_name = _metadata_table("dq_table_groups")
group_rows = (
    spark.table(group_table_name)
    .where(F.col("table_group_id") == table_group_id)
    .select("name", "profiling_include_mask", "profiling_exclude_mask")
    .limit(1)
    .collect()
)
if not group_rows:
    raise ValueError(f"Table group '{table_group_id}' not found in schema '{dq_schema}'.")
group_details = group_rows[0].asDict()
include_patterns = _compile_patterns(group_details.get("profiling_include_mask"))
exclude_patterns = _compile_patterns(group_details.get("profiling_exclude_mask"))

table_rows = (
    spark.table(metadata_tables_name)
    .where(F.col("table_group_id") == table_group_id)
    .select("schema_name", "table_name")
    .collect()
)
if not table_rows:
    raise ValueError(f"No dq_tables rows registered for table_group_id '{table_group_id}'.")

table_candidates: list[dict[str, str]] = []
for row in table_rows:
    schema_value = (row["schema_name"] or connection_schema_clean or "").strip() or None
    table_value = (row["table_name"] or "").strip()
    if not table_value:
        continue
    if include_patterns and not _matches_pattern(include_patterns, schema_value, table_value):
        continue
    if exclude_patterns and _matches_pattern(exclude_patterns, schema_value, table_value):
        continue
    label = ".".join(filter(None, [schema_value, table_value])) or table_value
    table_candidates.append({"schema_name": schema_value, "table_name": table_value, "label": label})

if not table_candidates:
    raise ValueError("All candidate tables were filtered out by include/exclude masks.")

generated_at = datetime.utcnow().isoformat() + "Z"
anomalies: list[dict[str, str]] = []
table_profiles: list[dict[str, object]] = []
total_rows = 0
profiling_failures = 0
profiling_successes = 0
column_metric_rows: list[dict[str, object]] = []
value_distribution_rows: list[dict[str, object]] = []

print(f"Profiling {len(table_candidates)} tables for group {table_group_id}.")
for candidate in table_candidates:
    schema_value = candidate["schema_name"]
    table_value = candidate["table_name"]
    label = candidate["label"]
    qualified_name = _qualify_data_table(schema_value, table_value)
    table_result: dict[str, object] = {
        "table_name": label,
        "table": label,
        "name": label,
        "qualified_name": qualified_name,
    }
    print(f"-> Scanning {qualified_name}")
    df = None
    try:
        df = spark.read.table(qualified_name).cache()
    except AnalysisException as exc:
        profiling_failures += 1
        table_result["error"] = str(exc)
        _record_anomaly(anomalies, label, None, "missing_table", "high", f"Spark could not read {qualified_name}: {exc}", generated_at)
        table_profiles.append(table_result)
        continue
    except Exception as exc:  # noqa: BLE001
        profiling_failures += 1
        table_result["error"] = str(exc)
        _record_anomaly(anomalies, label, None, "profiling_error", "high", f"Unexpected error while reading {qualified_name}: {exc}", generated_at)
        table_profiles.append(table_result)
        continue

    try:
        row_count = int(df.count())
        table_result["row_count"] = row_count
        total_rows += row_count

        if row_count == 0:
            profiling_failures += 1
            _record_anomaly(anomalies, label, None, "empty_table", "high", "Table returned zero rows during profiling.", generated_at)
            table_profiles.append(table_result)
            continue

        profiling_successes += 1
        profile_columns = _select_profile_columns(df)
        table_result["profiled_columns"] = profile_columns
        full_column_count = len(df.columns)
        if full_column_count > len(profile_columns):
            table_result["profiled_columns_truncated"] = full_column_count - len(profile_columns)

        column_profiles: list[dict[str, object]] = []
        column_null_ratios: dict[str, float] = {}

        if profile_columns:
            agg_exprs = [F.sum(F.when(F.col(col_name).isNull(), 1).otherwise(0)).alias(col_name) for col_name in profile_columns]
            null_counts = df.agg(*agg_exprs).collect()[0].asDict()
            schema_field_map = {field.name: field for field in df.schema.fields}
            schema_field_positions = {field.name: idx + 1 for idx, field in enumerate(df.schema.fields)}
            column_schema_name = schema_value or connection_schema_clean or None

            for column in profile_columns:
                null_count = int(null_counts.get(column, 0) or 0)
                null_ratio = float(null_count / row_count) if row_count else 0.0
                column_null_ratios[column] = null_ratio

                spark_field = schema_field_map.get(column)
                data_type = spark_field.dataType if spark_field else None
                data_type_label = data_type.simpleString() if data_type else None
                general_type = _infer_general_type(data_type)
                ordinal_position = schema_field_positions.get(column)
                non_null_count = int(max(row_count - null_count, 0))
                distinct_count = _approx_distinct_count(df, column)

                min_value = max_value = avg_value = stddev_value = median_value = p95_value = None
                true_count = false_count = None
                min_length = max_length = None
                avg_length = None
                non_ascii_ratio = None
                min_date = max_date = None
                date_span_days = None

                if general_type == "N" and non_null_count:
                    (
                        min_value,
                        max_value,
                        avg_value,
                        stddev_value,
                        median_value,
                        p95_value,
                    ) = _numeric_stats(df, column)

                if general_type == "A" and non_null_count:
                    min_length, max_length, avg_length, non_ascii_ratio = _string_length_stats(df, column, row_count)

                if general_type == "B" and non_null_count:
                    true_count, false_count = _boolean_stats(df, column)

                if general_type == "D" and non_null_count:
                    min_date, max_date, date_span_days = _date_stats(df, column)

                sampled_value_rows: list[dict[str, object]] = []
                if _should_capture_value_distribution(general_type, row_count, distinct_count):
                    sampled_value_rows = _collect_value_distribution_rows(
                        df,
                        column,
                        column_schema_name,
                        label,
                        profile_run_id,
                        row_count,
                    )
                    value_distribution_rows.extend(sampled_value_rows)

                metrics_metadata: dict[str, object] = {}
                if sampled_value_rows:
                    metrics_metadata["value_distribution_sampled"] = len(sampled_value_rows)

                column_metric_rows.append(
                    {
                        "profile_run_id": profile_run_id,
                        "schema_name": column_schema_name,
                        "table_name": label,
                        "column_name": column,
                        "qualified_name": f"{qualified_name}.{column}",
                        "data_type": data_type_label,
                        "general_type": general_type,
                        "ordinal_position": ordinal_position,
                        "row_count": row_count,
                        "null_count": null_count,
                        "non_null_count": non_null_count,
                        "distinct_count": distinct_count,
                        "min_value": min_value,
                        "max_value": max_value,
                        "avg_value": avg_value,
                        "stddev_value": stddev_value,
                        "median_value": median_value,
                        "p95_value": p95_value,
                        "true_count": true_count,
                        "false_count": false_count,
                        "min_length": min_length,
                        "max_length": max_length,
                        "avg_length": avg_length,
                        "non_ascii_ratio": non_ascii_ratio,
                        "min_date": min_date,
                        "max_date": max_date,
                        "date_span_days": date_span_days,
                        "metrics_json": json.dumps(metrics_metadata) if metrics_metadata else None,
                        "generated_at": datetime.utcnow(),
                    }
                )

                metrics_summary = {
                    "row_count": row_count,
                    "null_count": null_count,
                    "non_null_count": non_null_count,
                    "null_ratio": null_ratio,
                    "distinct_count": distinct_count,
                    "general_type": general_type,
                    "min_value": min_value,
                    "max_value": max_value,
                    "avg_value": avg_value,
                    "stddev_value": stddev_value,
                    "median_value": median_value,
                    "p95_value": p95_value,
                    "true_count": true_count,
                    "false_count": false_count,
                    "min_length": min_length,
                    "max_length": max_length,
                    "avg_length": avg_length,
                    "non_ascii_ratio": non_ascii_ratio,
                    "min_date": _stringify_metric_value(min_date),
                    "max_date": _stringify_metric_value(max_date),
                    "date_span_days": date_span_days,
                }
                if sampled_value_rows:
                    metrics_summary["value_distribution_sampled"] = len(sampled_value_rows)

                column_profiles.append(
                    {
                        "column_name": column,
                        "column": column,
                        "name": column,
                        "data_type": data_type_label,
                        "general_type": general_type,
                        "ordinal_position": ordinal_position,
                        "row_count": row_count,
                        "null_count": null_count,
                        "non_null_count": non_null_count,
                        "null_ratio": null_ratio,
                        "metrics": metrics_summary,
                    }
                )

                if null_ratio >= NULL_RATIO_ALERT_THRESHOLD:
                    severity = "high" if null_ratio >= HIGH_NULL_RATIO_THRESHOLD else "medium"
                    description = f"Null ratio {null_ratio:.2%} exceeds {NULL_RATIO_ALERT_THRESHOLD:.0%} threshold."
                    _record_anomaly(anomalies, label, column, "null_ratio", severity, description, generated_at)

            table_result["column_null_ratios"] = column_null_ratios

        table_result["column_profiles"] = column_profiles
        table_result["columns"] = column_profiles
        table_profiles.append(table_result)
    finally:
        with suppress(Exception):
            if df is not None:
                df.unpersist()

_persist_profile_detail_tables(profile_run_id, column_metric_rows, value_distribution_rows)

status = "completed" if profiling_successes else "failed"
results = {
    "table_group_id": table_group_id,
    "profile_run_id": profile_run_id,
    "table_group_name": group_details.get("name"),
    "status": status,
    "row_count": int(total_rows),
    "anomaly_count": len(anomalies),
    "anomalies": anomalies,
    "generated_at": generated_at,
    "table_profiles": table_profiles,
    "diagnostics": {
        "tables_requested": len(table_rows),
        "tables_profiled": profiling_successes,
        "tables_failed": profiling_failures,
        "include_mask_applied": bool(include_patterns),
        "exclude_mask_applied": bool(exclude_patterns),
    },
}

print(
    f"Profiling complete: {profiling_successes} succeeded, {profiling_failures} failed, total rows={total_rows}."
)

In [None]:
# Persist payload and call back into the API

from datetime import datetime
import re
import socket
from contextlib import suppress
from functools import lru_cache
from urllib.parse import urlparse, urlunparse

from pyspark.sql import functions as F
from pyspark.sql.utils import AnalysisException

DEFAULT_PRIVATE_PAYLOAD_ROOT = "dbfs:/tmp/conversioncentral/profiles"
DEFAULT_DRIVER_PAYLOAD_ROOT = "file:/databricks/driver/conversioncentral/profiles"
DEFAULT_CALLBACK_BEHAVIOR = "metadata_only"

DEFAULT_PAYLOAD_STORAGE_MODE = "inline"


DBFS_DISABLED_MESSAGES = ("public dbfs root is disabled", "access is denied")
DRIVER_DISABLED_MESSAGES = ("local filesystem access is forbidden", "workspacelocalfilesystem")
URI_SCHEME_PATTERN = re.compile(r"^[a-z][a-z0-9+.\-]*:/", re.IGNORECASE)
_DBFS_REDIRECT_NOTICE_EMITTED = False
_STORAGE_DISABLED_NOTICE_EMITTED = False


def _looks_like_dns_failure(error: BaseException) -> bool:
    """Detect DNS resolution failures from nested request exceptions."""
    current = error
    while current:
        if isinstance(current, socket.gaierror):
            return True
        name = current.__class__.__name__.lower()
        if "nameresolution" in name:
            return True
        message = str(current).lower()
        if "temporary failure in name resolution" in message:
            return True
        current = getattr(current, "__cause__", None) or getattr(current, "__context__", None)
    return False


def _rewrite_heroku_app_host(url: str | None) -> str | None:
    """Fallback to canonical Heroku hostname when review-app hosts fail DNS."""
    if not url:
        return None
    parsed = urlparse(url)
    host = parsed.hostname
    if not host:
        return None
    match = re.match(r"^(?P<base>[a-z0-9-]+?)-[0-9a-f]{12}\.herokuapp\.com$", host)
    if not match:
        return None
    canonical_host = f"{match.group('base')}.herokuapp.com"
    netloc = canonical_host
    if parsed.port:
        netloc = f"{canonical_host}:{parsed.port}"
    if parsed.username:
        auth = parsed.username
        if parsed.password:
            auth = f"{auth}:{parsed.password}"
        netloc = f"{auth}@{netloc}"
    scheme = parsed.scheme or "https"
    if scheme.lower() == "http":
        scheme = "https"
    return urlunparse(parsed._replace(netloc=netloc, scheme=scheme))


def _is_dbfs_path(path: str | None) -> bool:
    return bool(path and path.lower().startswith("dbfs:/"))


def _has_uri_scheme(value: str | None) -> bool:
    return bool(value and URI_SCHEME_PATTERN.match(value.strip()))


@lru_cache(maxsize=1)
def _dbfs_root_is_disabled() -> bool:
    probe_path = f"{DEFAULT_PRIVATE_PAYLOAD_ROOT}/_dbfs_access_probe"
    try:
        dbutils.fs.mkdirs(probe_path)
        dbutils.fs.rm(probe_path, True)
        return False
    except Exception as exc:  # noqa: BLE001 - Databricks surfaces JVM errors generically
        message = str(exc).lower()
        return any(fragment in message for fragment in DBFS_DISABLED_MESSAGES)


@lru_cache(maxsize=1)
def _driver_fs_is_disabled() -> bool:
    probe_path = f"{DEFAULT_DRIVER_PAYLOAD_ROOT}/_driver_access_probe"
    try:
        dbutils.fs.mkdirs(probe_path)
        dbutils.fs.rm(probe_path, True)
        return False
    except Exception as exc:  # noqa: BLE001 - Databricks surfaces JVM errors generically
        message = str(exc).lower()
        return any(fragment in message for fragment in DRIVER_DISABLED_MESSAGES)


def _warn_storage_disabled(message: str) -> None:
    global _STORAGE_DISABLED_NOTICE_EMITTED
    if not _STORAGE_DISABLED_NOTICE_EMITTED:
        print(message)
        _STORAGE_DISABLED_NOTICE_EMITTED = True


def _redirect_dbfs_path(path: str) -> str | None:
    global _DBFS_REDIRECT_NOTICE_EMITTED
    if not _is_dbfs_path(path):
        return path
    if not _dbfs_root_is_disabled():
        return path
    if _driver_fs_is_disabled():
        _warn_storage_disabled(
            "DBFS root access and driver filesystem writes are both disabled; payload artifacts will be skipped unless "
            "a cloud storage payload_base_path is provided."
        )
        return None
    if not _DBFS_REDIRECT_NOTICE_EMITTED:
        print(
            "DBFS root access is disabled on this workspace; persisting profiling artifacts to the driver filesystem "
            "instead."
        )
        _DBFS_REDIRECT_NOTICE_EMITTED = True
    suffix = path[len("dbfs:/") :].lstrip("/")
    redirected = f"{DEFAULT_DRIVER_PAYLOAD_ROOT}/{suffix}" if suffix else DEFAULT_DRIVER_PAYLOAD_ROOT
    return redirected.rstrip("/")


def _mkdirs_if_supported(target_path: str) -> None:
    lowered = target_path.lower()
    if lowered.startswith("dbfs:/") and _dbfs_root_is_disabled():
        return
    if lowered.startswith("file:/") and _driver_fs_is_disabled():
        return
    if lowered.startswith("dbfs:/") or lowered.startswith("file:/"):
        parent_dir = target_path.rsplit("/", 1)[0]
        dbutils.fs.mkdirs(parent_dir)


def _ensure_https_base_url(value: str) -> str:
    normalized = (value or "").strip()
    if not normalized:
        return normalized
    parsed = urlparse(normalized)
    if not parsed.scheme:
        normalized = f"https://{normalized.lstrip('/')}"
        parsed = urlparse(normalized)
    if parsed.scheme.lower() == "http":
        parsed = parsed._replace(scheme="https")
    normalized = urlunparse(parsed).rstrip("/")
    return normalized


def _lookup_metadata_setting(setting_key: str) -> str | None:
    normalized_key = (setting_key or "").strip().lower()
    if not normalized_key:
        return None
    try:
        settings_table = _metadata_table("dq_settings")
    except NameError:
        return None
    try:
        row = (
            spark.table(settings_table)
            .where(F.lower(F.col("key")) == normalized_key)
            .select("value")
            .limit(1)
            .collect()
        )
    except AnalysisException:
        return None
    if not row:
        return None
    value = row[0].get("value")
    return value.strip() if isinstance(value, str) and value.strip() else None




def _normalize_payload_storage_mode(value: str | None) -> str | None:
    normalized = (value or "").strip().lower()
    if not normalized:
        return None
    if normalized in {"inline", "database", "db"}:
        return "inline"
    if normalized in {"artifact", "artifacts", "file", "files", "path", "paths", "dbfs", "cloud"}:
        return "artifact"
    return None


def _resolve_payload_storage_mode() -> str:
    widget_choice = _normalize_payload_storage_mode(dbutils.widgets.get("payload_storage"))
    if widget_choice:
        return widget_choice
    setting_choice = _normalize_payload_storage_mode(_lookup_metadata_setting("profile_payload_storage_mode"))
    if setting_choice:
        return setting_choice
    return DEFAULT_PAYLOAD_STORAGE_MODE


def _payload_storage_is_artifact(mode: str) -> bool:
    return (mode or "").strip().lower() == "artifact"


def _encode_payload_json(payload: dict[str, object]) -> str | None:
    try:
        return json.dumps(payload, separators=(",", ":"))
    except TypeError as exc:
        print(f"Unable to serialize profiling payload: {exc}")
        return None


def _resolve_callback_behavior() -> str:
    widget_value = (dbutils.widgets.get("callback_behavior") or "").strip().lower()
    if widget_value:
        return widget_value
    setting_value = (_lookup_metadata_setting("profile_callback_behavior") or "").strip().lower()
    if setting_value:
        return setting_value
    return DEFAULT_CALLBACK_BEHAVIOR


def _callbacks_enabled(behavior: str) -> bool:
    if behavior in {"api", "callback", "legacy"}:
        return True
    if behavior in {"metadata_only", "metadata", "skip", "disabled", "off"}:
        return False
    print(f"Unknown callback behavior '{behavior}'; defaulting to metadata_only.")
    return False





def _sql_string_literal(value: str | None) -> str:
    if value is None:
        return "NULL"
    escaped = str(value).replace("'", "''")
    return f"'{escaped}'"


def _sql_numeric_literal(value: int | float | None) -> str:
    if value is None:
        return "NULL"
    try:
        return str(int(value))
    except (TypeError, ValueError):
        return "NULL"


def _normalize_temp_view_name(suffix: str | None) -> str:
    cleaned = re.sub(r"[^a-zA-Z0-9_]", "_", (suffix or "profile_run"))
    return f"_profile_anomalies_{cleaned}"


def _parse_anomaly_timestamp(value: str | None) -> datetime | None:
    if not value:
        return None
    candidate = value.strip()
    if not candidate:
        return None
    if candidate.endswith("Z"):
        candidate = f"{candidate[:-1]}+00:00"
    with suppress(ValueError):
        return datetime.fromisoformat(candidate)
    return None


def _persist_results_to_metadata(results_payload: dict[str, object], payload_location: str | None) -> None:
    if not profile_run_id:
        raise ValueError("profile_run_id widget is required before persisting profiling metadata.")
    profiles_table = _metadata_table("dq_profiles")
    anomalies_table = _metadata_table("dq_profile_anomalies")
    assignments = [
        f"status = {_sql_string_literal(results_payload.get('status') or 'unknown')}",
        "completed_at = current_timestamp()",
        f"row_count = {_sql_numeric_literal(results_payload.get('row_count'))}",
        f"anomaly_count = {_sql_numeric_literal(results_payload.get('anomaly_count'))}",
        f"payload_path = {_sql_string_literal(payload_location)}",
    ]
    update_sql = (
        f"UPDATE {profiles_table} "
        f"SET {', '.join(assignments)} "
        f"WHERE profile_run_id = {_sql_string_literal(profile_run_id)}"
    )
    spark.sql(update_sql)
    print(f"Updated dq_profiles entry for run {profile_run_id}.")

    anomalies = list(results_payload.get("anomalies") or [])
    delete_sql = f"DELETE FROM {anomalies_table} WHERE profile_run_id = {_sql_string_literal(profile_run_id)}"
    spark.sql(delete_sql)

    if not anomalies:
        print(f"No anomalies to persist for run {profile_run_id}.")
        return

    anomaly_rows = []
    for anomaly in anomalies:
        anomaly_rows.append(
            {
                "profile_run_id": profile_run_id,
                "table_name": anomaly.get("table_name"),
                "column_name": anomaly.get("column_name"),
                "anomaly_type": anomaly.get("anomaly_type"),
                "severity": anomaly.get("severity"),
                "description": anomaly.get("description"),
                "detected_at": _parse_anomaly_timestamp(anomaly.get("detected_at")) or datetime.utcnow(),
            }
        )

    anomalies_df = spark.createDataFrame(anomaly_rows)
    view_name = _normalize_temp_view_name(profile_run_id)
    try:
        anomalies_df.createOrReplaceTempView(view_name)
        spark.sql(
            f"INSERT INTO {anomalies_table} "
            "(profile_run_id, table_name, column_name, anomaly_type, severity, description, detected_at) "
            f"SELECT profile_run_id, table_name, column_name, anomaly_type, severity, description, detected_at FROM {view_name}"
        )
    finally:
        with suppress(Exception):
            spark.catalog.dropTempView(view_name)

    print(f"Persisted {len(anomalies)} anomalies for run {profile_run_id}.")


payload_base_path = payload_base_path or _lookup_metadata_setting("profile_payload_base_path")
callback_base_url = callback_base_url or _lookup_metadata_setting("profile_callback_base_url")
callback_token = callback_token or _lookup_metadata_setting("profile_callback_token")


def _normalize_payload_base(base_path: str | None) -> str | None:
    raw_value = (base_path or "").strip()
    if not raw_value:
        redirected = _redirect_dbfs_path(DEFAULT_PRIVATE_PAYLOAD_ROOT)
        if not redirected:
            _warn_storage_disabled(
                "No writable default payload location detected; configure profile_payload_base_path to point to cloud "
                "storage accessible from this workspace."
            )
        return redirected
    if "://" in raw_value and not raw_value.lower().startswith("dbfs:/"):
        return raw_value.rstrip("/")
    if raw_value.lower().startswith("dbfs:/"):
        normalized = raw_value
    elif raw_value.startswith("/"):
        normalized = f"dbfs:{raw_value}"
    else:
        normalized = f"dbfs:/tmp/conversioncentral/{raw_value.lstrip('/')}"
    normalized = normalized.rstrip("/")
    if normalized.lower().startswith("dbfs:/filestore"):
        print("FileStore paths are disabled on this workspace; switching to private tmp storage.")
        normalized = DEFAULT_PRIVATE_PAYLOAD_ROOT
    redirected = _redirect_dbfs_path(normalized)
    if not redirected:
        _warn_storage_disabled(
            "The configured payload base path resolves to a blocked filesystem; provide a supported cloud URI instead."
        )
    return redirected


def _derive_payload_path(base_path: str | None, group_id: str, run_id: str) -> str | None:
    normalized_base = _normalize_payload_base(base_path)
    if not normalized_base:
        return None
    safe_group = (group_id or "default").replace(":", "_")
    safe_run = (run_id or "unknown").replace(":", "_")
    return f"{normalized_base}/{safe_group}/{safe_run}.json"


def _normalize_payload_target(path: str | None, group_id: str, run_id: str, base_path: str | None) -> str | None:
    candidate = (path or "").strip()
    if not candidate:
        return None
    if candidate.lower().startswith("dbfs:/"):
        normalized = candidate
    elif _has_uri_scheme(candidate):
        normalized = candidate.rstrip("/")
    elif candidate.startswith("/"):
        normalized = f"dbfs:{candidate}"
    else:
        normalized = f"{DEFAULT_PRIVATE_PAYLOAD_ROOT}/{candidate.lstrip('/')}"
    normalized = normalized.rstrip("/")
    lowered = normalized.lower()
    if lowered.startswith("dbfs:/filestore"):
        print("FileStore paths are disabled on this workspace; switching to private tmp storage.")
        derived = _derive_payload_path(base_path, group_id, run_id)
        normalized = derived or ""
    redirected = _redirect_dbfs_path(normalized)
    if not redirected:
        return None
    return redirected.rstrip("/")


def _resolve_callback_target(base_url: str | None, run_id: str) -> str | None:
    if not base_url:
        return None
    normalized = base_url.strip()
    if not normalized:
        return None
    if "{profile_run_id}" in normalized:
        try:
            normalized = normalized.format(profile_run_id=run_id)
        except (KeyError, ValueError):
            pass
    normalized = _ensure_https_base_url(normalized)
    if normalized.endswith("/complete"):
        return normalized
    return f"{normalized}/{run_id}/complete"


payload_storage_mode = _resolve_payload_storage_mode()
payload_reference: str | None = None
payload_json_value = _encode_payload_json(results)

if _payload_storage_is_artifact(payload_storage_mode):
    artifact_path = payload_path
    payload_was_derived = False
    if not artifact_path:
        artifact_path = _derive_payload_path(payload_base_path, table_group_id, profile_run_id)
        payload_was_derived = bool(artifact_path)

    normalized_payload_path = _normalize_payload_target(artifact_path, table_group_id, profile_run_id, payload_base_path)
    if normalized_payload_path:
        if artifact_path and normalized_payload_path != artifact_path:
            print(f"Normalized payload path: {normalized_payload_path}")
        artifact_path = normalized_payload_path
    elif not artifact_path:
        print("Payload storage mode is 'artifact' but no valid path was supplied; inline fallback will be used.")
    else:
        print(
            "Resolved payload path is blocked by workspace filesystem restrictions; skipping artifact export unless a "
            "cloud path is provided."
        )
        artifact_path = None

    if payload_was_derived and artifact_path:
        print(f"Derived payload artifact path: {artifact_path}")

    if artifact_path:
        try:
            _mkdirs_if_supported(artifact_path)
            dbutils.fs.put(artifact_path, json.dumps(results, indent=2), overwrite=True)
            print(f"Wrote profiling payload to {artifact_path}")
            payload_reference = artifact_path
        except Exception as exc:  # noqa: BLE001 - surface full failure for Databricks
            print(f"Failed to write profiling payload to {artifact_path}: {exc}")
            artifact_path = None
            print("Falling back to inline payload storage.")
    else:
        print("Skipping artifact export; inline payload will be stored instead.")
else:
    print(
        "Payload storage mode set to 'inline'; profiling results will be written directly to dq_profiles.payload_path."
    )
    artifact_path = None

if artifact_path and not payload_reference:
    payload_reference = artifact_path

if not payload_reference:
    if payload_json_value is None:
        print("Unable to capture profiling payload; payload_path column will remain NULL.")
    else:
        payload_reference = payload_json_value
        print("Stored profiling payload inline with the profile metadata entry.")

_persist_results_to_metadata(results, payload_reference)


callback_behavior = _resolve_callback_behavior()
callback_source_url = callback_url or callback_base_url or _lookup_metadata_setting("profile_callback_base_url")
callback_target = _resolve_callback_target(callback_source_url, profile_run_id)
if not _callbacks_enabled(callback_behavior):
    print(
        "Skipping completion callback; profiling results were written directly to metadata tables. "
        "Set callback_behavior='api' to re-enable HTTP callbacks."
    )
elif callback_target:
    headers = {"Content-Type": "application/json"}
    token_value = callback_token or _lookup_metadata_setting("profile_callback_token")
    if token_value:
        headers["Authorization"] = f"Bearer {token_value}"
    callback_body = {
        "status": results["status"],
        "row_count": results["row_count"],
        "anomaly_count": results["anomaly_count"],
        "anomalies": results["anomalies"],
    }
    canonical_fallback = _rewrite_heroku_app_host(callback_target)
    callback_candidates = [callback_target]
    if canonical_fallback and canonical_fallback not in callback_candidates:
        callback_candidates.append(canonical_fallback)
    response = None
    last_error: Exception | None = None
    for idx, candidate in enumerate(callback_candidates):
        try:
            response = requests.post(candidate, headers=headers, json=callback_body, timeout=30)
            response.raise_for_status()
            print(f"Callback succeeded: {candidate} ({response.status_code})")
            break
        except requests.exceptions.RequestException as exc:
            last_error = exc
            should_retry = idx == 0 and canonical_fallback and _looks_like_dns_failure(exc)
            if should_retry:
                print(
                    f"Callback host failed DNS lookup ({exc}); retrying canonical domain {canonical_fallback}."
                )
                continue
            raise
    if response is None:
        raise last_error or RuntimeError("Callback failed without an HTTP response.")
else:
    print("Callback URL not provided; skipping completion POST.")

results