In [0]:
import os
import sys
import importlib
import yaml
from pyspark.sql import SparkSession

# ---------------------------------------------------------------------
# Resolve repo_root in both normal Python scripts and Databricks notebooks
# ---------------------------------------------------------------------
try:
    # Normal Python script
    repo_root = os.path.dirname(os.path.abspath(__file__))
except NameError:
    # Databricks notebook (no __file__)
    repo_root = os.getcwd()

if repo_root not in sys.path:
    sys.path.append(repo_root)

print(f"[DQBricks] Repo root set to: {repo_root}")
print(f"[DQBricks] CWD: {os.getcwd()}")

# ---------------------------------------------------------------------
# Arg handling: strip IPython/Databricks noise like "-f <json>"
# We'll keep only the first non-flag positional argument, if any.
# ---------------------------------------------------------------------
def extract_user_cfg_arg(argv: list[str]) -> str | None:
    """
    Returns the first user-supplied positional argument that doesn't look like a flag.
    Skips known IPython patterns like '-f <file>'.
    """
    cleaned = []
    i = 1  # skip argv[0]
    while i < len(argv):
        a = argv[i]
        # Handle the common "-f <connection.json>" pair injected by IPython kernels
        if a == "-f" and (i + 1) < len(argv):
            i += 2
            continue
        # Skip other flags (e.g., --ip, --something)
        if a.startswith("-"):
            i += 1
            continue
        # Take positional args
        cleaned.append(a)
        i += 1
    return cleaned[0] if cleaned else None

# ---------------------------------------------------------------------
# Config path resolution
# Defaults to <repo_root>/config/config.yaml
# Supports: argv override, relative paths (resolved from repo_root),
# absolute workspace paths, and dbfs:/ paths (auto-mapped to /dbfs/...).
# ---------------------------------------------------------------------
def resolve_cfg_path(argv1: str | None, default_dir: str) -> tuple[str, list[str]]:
    tried = []
    default_cfg = os.path.join(default_dir, "config", "config.yaml")

    if argv1:
        candidates = []
        # Map dbfs:/... -> /dbfs/...
        if argv1.startswith("dbfs:/"):
            candidates.append("/dbfs/" + argv1[len("dbfs:/"):].lstrip("/"))

        # Absolute path as-is
        if os.path.isabs(argv1):
            candidates.append(argv1)
        else:
            # Relative to repo_root so "config/config.yaml" works
            candidates.append(os.path.join(default_dir, argv1))

        # Also try relative to CWD for completeness
        candidates.append(os.path.abspath(argv1))
    else:
        candidates = [default_cfg]

    for p in candidates:
        ap = os.path.abspath(p)
        tried.append(ap)
        if os.path.exists(ap):
            return ap, tried

    return "", tried

user_arg = extract_user_cfg_arg(sys.argv)
cfg_path, tried_paths = resolve_cfg_path(user_arg, repo_root)

if not cfg_path:
    msg = [
        "[DQBricks] ERROR: Could not find config.yaml.",
        "Tried:",
        *[f"  - {p}" for p in tried_paths],
        "",
        "Tips:",
        "• Keep your config at: <repo_root>/config/config.yaml  (default)",
        "• Or pass a relative path from repo_root, e.g.:",
        "    %run /Workspace/.../dqbricks/run_notebook -- config/config.yaml",
        "• Or pass an absolute Workspace path:",
        "    %run /Workspace/.../dqbricks/run_notebook -- /Workspace/.../dqbricks/config/config.yaml",
        "• Or pass a DBFS path (auto-mapped):",
        "    %run /Workspace/.../dqbricks/run_notebook -- dbfs:/FileStore/dqbricks/config.yaml",
    ]
    raise FileNotFoundError("\n".join(msg))

print(f"[DQBricks] Using config: {cfg_path}")

# ---------------------------------------------------------------------
# Spark session
# ---------------------------------------------------------------------
spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()

# ---------------------------------------------------------------------
# Load config
# ---------------------------------------------------------------------
with open(cfg_path, "r") as f:
    cfg = yaml.safe_load(f) or {}

# ---------------------------------------------------------------------
# Imports that rely on repo path being on sys.path
# ---------------------------------------------------------------------
from dqcore.bootstrap import ensure_objects
from dqcore.engine import DQEngine
from dqcore.targets import list_tables
import rules  # registers built-in rules

# Optional custom rules
if cfg.get("custom_rules_module"):
    importlib.import_module(cfg["custom_rules_module"])

# ---------------------------------------------------------------------
# Ensure quarantine/metrics tables + dashboard views exist
# ---------------------------------------------------------------------
quarantine_tbl = cfg.get("quarantine_table")
metrics_tbl = cfg.get("metrics_table")
views_cfg = cfg.get("views", {})

if not quarantine_tbl or not metrics_tbl:
    raise ValueError(
        "[DQBricks] 'quarantine_table' and 'metrics_table' must be set in config."
    )

ensure_objects(spark, quarantine_tbl, metrics_tbl, views_cfg)

engine = DQEngine(spark, quarantine_tbl, metrics_tbl, views_cfg)

# ---------------------------------------------------------------------
# Resolve target tables
# scope: 'table' | 'schema' | (anything else => by include/exclude)
# ---------------------------------------------------------------------
scope = cfg.get("scope", "table")
include = cfg.get("include", ["*"])
exclude = cfg.get("exclude", [])

catalog = cfg.get("catalog")
schema = cfg.get("schema")
table = cfg.get("table")

def fq_tuple(cat: str | None, sch: str, tbl: str) -> tuple[str | None, str, str]:
    return (cat, sch, tbl)

if scope == "table":
    if not schema or not table:
        raise ValueError("[DQBricks] For scope='table', 'schema' and 'table' must be set.")
    targets = [fq_tuple(catalog, schema, table)]
elif scope == "schema":
    if not schema:
        raise ValueError("[DQBricks] For scope='schema', 'schema' must be set.")
    targets = list_tables(
        spark, catalog=catalog, schema=schema, include=include, exclude=exclude
    )
else:
    # catalog or global scope (by include/exclude)
    targets = list_tables(
        spark, catalog=catalog, include=include, exclude=exclude
    )

# ---------------------------------------------------------------------
# Execution mode
# ---------------------------------------------------------------------
mode = cfg.get("mode", "batch").lower()

if mode == "streaming":
    # Auto Loader streaming source
    streaming_cfg = cfg.get("streaming", {})
    required = ["input_format", "schema_location", "input_path", "output_table", "checkpoint", "trigger"]
    missing = [k for k in required if k not in streaming_cfg]
    if missing:
        raise ValueError(f"[DQBricks] Missing streaming config keys: {missing}")

    src = (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", streaming_cfg["input_format"])
        .option("cloudFiles.schemaLocation", streaming_cfg["schema_location"])
        .option("rescuedDataColumn", "_rescued_data")
        .load(streaming_cfg["input_path"])
    )

    q = (
        engine.enforce_stream(
            src,
            fq_out_table=streaming_cfg["output_table"],
            rule_cfgs=cfg["rules"],
            checkpoint=streaming_cfg["checkpoint"],
        )
        .trigger(streaming_cfg["trigger"])
        .start()
    )
    q.awaitTermination()

else:
    # -----------------------------
    # Batch mode
    # -----------------------------
    if "rules" not in cfg:
        raise ValueError("[DQBricks] 'rules' must be defined in config for batch mode.")

    inc = cfg.get("incremental", {})

    for (cat, sch, tbl) in targets:
        fq_table = f"{cat}.{sch}.{tbl}" if cat else f"{sch}.{tbl}"
        print(f"[DQBricks] Processing table: {fq_table}")

        # Choose rules: per-table override or defaults
        rules_cfg = cfg.get("tables", {}).get(fq_table, {}).get("rules", cfg["rules"])
        if not rules_cfg:
            print(f"[DQBricks] WARNING: No rules configured for {fq_table}; skipping.")
            continue

        # Read base DataFrame
        try:
            df = spark.table(fq_table)
        except Exception as e:
            print(f"[DQBricks] ERROR: Could not read table {fq_table}: {e}")
            continue

        # Optional incremental via Delta Change Data Feed
        if inc.get("use_cdf", False):
            try:
                spark.sql(
                    f"ALTER TABLE {fq_table} SET TBLPROPERTIES (delta.enableChangeDataFeed = true)"
                )
                df = (
                    spark.read.format("delta")
                    .option("readChangeFeed", "true")
                    .option("startingVersion", inc.get("starting_version", 0))
                    .table(fq_table)
                    .filter("_change_type IN ('insert','update_postimage')")
                )
                print(f"[DQBricks] CDF enabled for {fq_table} (starting_version={inc.get('starting_version', 0)}).")
            except Exception as e:
                print(f"[DQBricks] WARNING: Failed to use CDF for {fq_table}: {e}. Falling back to full read.")

        try:
            _ = engine.enforce_batch(df, fq_table, rules_cfg)
            print(f"[DQBricks] Processed {fq_table} with {len(rules_cfg)} rules.")
        except Exception as e:
            print(f"[DQBricks] ERROR: Enforcement failed for {fq_table}: {e}")

print("[DQBricks] run complete.")


In [0]:
from dqcore.registry import RuleRegistry
print("Registered rules:", sorted(RuleRegistry.names()))
# Expect something like:
# ['check','freshness','in_set','not_null','range','uniqueness', ... plus your custom 'max_length' if any]


In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS explore.dq_schema;

CREATE TABLE IF NOT EXISTS explore.dq_schema.sample_data_dq
(
  id DOUBLE,
  email STRING,
  amount DOUBLE,
  _rescued_data STRING,
  nn_id BOOLEAN,
  amt_non_negative BOOLEAN,
  email_ok BOOLEAN,
  _dq_checked_at TIMESTAMP,
  _dq_is_valid BOOLEAN,
  table_name STRING
) USING DELTA;


In [0]:
from pyspark.sql import functions as F

df_out = (
    df
    .withColumnRenamed("nn_id", "dq_nn_id")
    .withColumnRenamed("amt_non_negative", "dq_amt_non_negative")
    .withColumnRenamed("email_ok", "dq_email_ok")
)

(df_out
 .write
 .mode("append")
 .saveAsTable("explore.dq_schema.sample_data_dq"))



In [0]:
explore.dq_schema.ncr_ride_bookings

In [0]:
import sys, importlib

# 0) Make sure your project root is first on sys.path
sys.path.insert(0, "/Workspace/Users/jayarampogakula@gmail.com/dqbricks")

# 1) Nuke any stale modules from memory
for m in ["rules", "rules.builtin", "custom_rules", "dqcore.registry"]:
    if m in sys.modules:
        sys.modules.pop(m)

# 2) Import the SINGLE registry module we'll all use
import dqcore.registry as registry
print("registry @", registry.__file__)

# 3) Import custom rules FIRST (if present). If they try to overwrite, we'll override next.
try:
    cr = importlib.import_module("custom_rules")
    print("custom_rules @", cr.__file__)
except ModuleNotFoundError:
    cr = None
    print("custom_rules not found (that's fine)")

# 4) Import (or reload) built-ins LAST so they are definitely registered
rb = importlib.import_module("rules.builtin")
importlib.reload(rb)
print("rules.builtin @", rb.__file__)

# 5) Check what’s registered NOW
from dqcore.registry import RuleRegistry
print("Registered rules:", sorted(RuleRegistry.names()))
