### Databricks notebook source 
	•	reads approved SQL from ai_sql_candidates,
	•	creates/updates the UC view from that approved SQL,
	•	writes _DRAFT artifacts (CSV / XLSX, PDF optional) to DBFS,
	•	writes a manifest per report,
	•	upserts into report_candidates with status='ready_for_business'.

In [0]:
# Config & helpers

# ---------- CONFIG ----------
CATALOG        = "finance"
SCHEMA         = "kyc_gold"
META_FILE      = "dbfs:/FileStore/kyc/report_metadata/report_definitions.json"
VIEW_DUMP_ROOT = "dbfs:/FileStore/kyc/reports/views"
AI_CANDIDATES_VIEW = f"{CATALOG}.{SCHEMA}.ai_sql_candidates"

# ---------- IMPORTS ----------
import os, io, json, uuid, hashlib
from datetime import datetime, timezone

from pyspark.sql import SparkSession
spark = SparkSession.getActiveSession()
assert spark is not None, "Must run on a Databricks cluster"

# PDF is optional
try:
    from reportlab.lib.pagesizes import A4, landscape
    from reportlab.lib import colors
    from reportlab.lib.styles import getSampleStyleSheet
    from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle
    REPORTLAB_AVAILABLE = True
except Exception:
    REPORTLAB_AVAILABLE = False

# ---------- LIMITS (safe for driver -> pandas) ----------
ROW_CAP_XLSX = 1_000_000   # Excel max is ~1,048,576
ROW_CAP_PDF  = 5_000       # keep PDFs small & readable

# ---------- PATH HELPERS ----------
def dbfs_to_local(dbfs_path: str) -> str:
    # 'dbfs:/FileStore/...' -> '/dbfs/FileStore/...'
    return "/dbfs" + dbfs_path.replace("dbfs:", "")

def artifact_dir(report_name: str) -> str:
    today = datetime.now(timezone.utc).strftime("%Y%m%d")
    return f"dbfs:/FileStore/kyc/reports/{today}/{report_name}"

def files_url(dbfs_path: str) -> str:
    # Relative link works in Databricks: '/files/...'
    return "/files/" + dbfs_path.replace("dbfs:/FileStore/", "")

def write_text(dbfs_path: str, text: str):
    local = dbfs_to_local(dbfs_path)
    os.makedirs(os.path.dirname(local), exist_ok=True)
    with open(local, "w", encoding="utf-8") as f:
        f.write(text)

def write_bytes(dbfs_path: str, data: bytes):
    local = dbfs_to_local(dbfs_path)
    os.makedirs(os.path.dirname(local), exist_ok=True)
    with open(local, "wb") as f:
        f.write(data)

def sha256_bytes(data: bytes) -> str:
    import hashlib
    return hashlib.sha256(data).hexdigest()

def sanitize_identifier(name: str) -> str:
    return (
        name.lower()
        .replace("-", "_")
        .replace(" ", "_")
        .replace("/", "_")
    )

# --- Pre-escape helpers & SQL literals ---
def esc(s: str | None) -> str | None:
    return s.replace("'", "''") if s is not None else None


# ---------- LOAD METADATA (single JSON array) ----------
def load_metadata_array(meta_file: str):
    # read full file (assumed modest size)
    txt = dbutils.fs.head(meta_file, 5_000_000)
    items = json.loads(txt)
    assert isinstance(items, list), "Metadata must be a JSON array"
    return {m.get("report_name"): m for m in items if m.get("report_name")}
    
META = load_metadata_array(META_FILE)

# ---------- TABLES ----------
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {SCHEMA}")

spark.sql(f"""
CREATE TABLE IF NOT EXISTS {CATALOG}.{SCHEMA}.report_candidates (
  id STRING,
  report_name STRING,
  report_owner STRING,
  status STRING,
  view_name STRING,
  export_format STRING,
  widget_title STRING,
  filters STRING,
  report_sql STRING,
  draft_manifest_path STRING,
  report_url STRING,
  dashboard STRING,
  notify ARRAY<STRING>,
  certify BOOLEAN,
  created_at TIMESTAMP,
  reviewed_at TIMESTAMP,
  reviewed_by STRING,
  published_at TIMESTAMP
) USING DELTA
""")
print("Setup complete.")

In [0]:
# Exporter

import pandas as pd

def export_report(report_name: str, sql_select_all_from_view: str, export_format: str, widget_title: str, filters: str, owner: str | None):
    """
    Runs Spark SQL, emits CSV / XLSX / PDF (optional) to DBFS as _DRAFT artifacts,
    and writes a JSON manifest. Returns (draft_paths, manifest_path).
    """
    # 1) Query with Spark SQL
    sdf = spark.sql(sql_select_all_from_view)

    # collect to pandas only when needed (xlsx/pdf)
    # CSV we'll also do via pandas.to_csv for simplicity (single file) — safe for moderate row counts.
    pdf = sdf.toPandas()

    base_dir = artifact_dir(report_name)
    os.makedirs(dbfs_to_local(base_dir), exist_ok=True)
    base = f"{base_dir}/{report_name}_{uuid.uuid4().hex[:8]}"

    draft_paths = []
    artifacts = []

    targets = ['csv','xlsx','pdf'] if export_format == 'all' else [export_format]

    # CSV
    if 'csv' in targets:
        csv_path = f"{base}_DRAFT.csv"
        csv_bytes = pdf.to_csv(index=False).encode("utf-8")
        write_bytes(csv_path, csv_bytes)
        draft_paths.append(csv_path)
        artifacts.append({"type":"csv","path":csv_path,"sha256":sha256_bytes(csv_bytes),"files_url":files_url(csv_path)})

    # XLSX
    if 'xlsx' in targets:
        lim_pdf = pdf if len(pdf) <= ROW_CAP_XLSX else pdf.iloc[:ROW_CAP_XLSX].copy()
        xlsx_path = f"{base}_DRAFT.xlsx"
        bio = io.BytesIO()
        with pd.ExcelWriter(bio, engine="xlsxwriter") as xw:
            lim_pdf.to_excel(xw, sheet_name="Report", index=False)
            ws = xw.sheets["Report"]
            for i, col in enumerate(lim_pdf.columns):
                maxlen = max([len(str(x)) for x in [col] + lim_pdf[col].astype(str).tolist()[:100]])
                ws.set_column(i, i, min(maxlen + 2, 60))
        write_bytes(xlsx_path, bio.getvalue())
        draft_paths.append(xlsx_path)
        artifacts.append({"type":"xlsx","path":xlsx_path,"sha256":sha256_bytes(bio.getvalue()),"files_url":files_url(xlsx_path)})

    # PDF (optional)
    if 'pdf' in targets:
        if not REPORTLAB_AVAILABLE:
            print("PDF export skipped: reportlab not installed on this cluster.")
        else:
            lim_pdf = pdf if len(pdf) <= ROW_CAP_PDF else pdf.iloc[:ROW_CAP_PDF].copy()
            pdf_path = f"{base}_DRAFT.pdf"
            bio = io.BytesIO()
            doc = SimpleDocTemplate(bio, pagesize=landscape(A4), leftMargin=24, rightMargin=24, topMargin=24, bottomMargin=24)
            styles = getSampleStyleSheet()
            elems = [Paragraph(widget_title or report_name, styles['Title']), Spacer(1, 8)]
            data = [list(lim_pdf.columns)] + lim_pdf.astype(str).values.tolist()
            table = Table(data)
            table.setStyle(TableStyle([
                ('BACKGROUND', (0,0), (-1,0), colors.lightgrey),
                ('TEXTCOLOR', (0,0), (-1,0), colors.black),
                ('FONTSIZE', (0,0), (-1,-1), 8),
                ('GRID', (0,0), (-1,-1), 0.25, colors.grey),
                ('ALIGN', (0,0), (-1,-1), 'LEFT'),
            ]))
            elems.append(table)
            doc.build(elems)
            write_bytes(pdf_path, bio.getvalue())
            draft_paths.append(pdf_path)
            artifacts.append({"type":"pdf","path":pdf_path,"sha256":sha256_bytes(bio.getvalue()),"files_url":files_url(pdf_path)})

    # manifest
    manifest = {
        "id": str(uuid.uuid4()),
        "report_name": report_name,
        "export_format": export_format,
        "row_count": int(len(pdf)),
        "generated_at": datetime.now(timezone.utc).isoformat(),
        "owner": owner,
        "filters": filters,
        "sql": sql_select_all_from_view,
        "artifacts": artifacts,
    }
    manifest_path = f"{base}_DRAFT.manifest.json"
    write_text(manifest_path, json.dumps(manifest, indent=2))

    return draft_paths, manifest_path, manifest, len(pdf)

In [0]:
# Process approved AI SQL → create view → export drafts → upsert tracking

# Pull approved candidates
approved = spark.sql(f"""
  SELECT id, report_name, generated_sql, created_by
  FROM {AI_CANDIDATES_VIEW}
  WHERE status = 'APPROVED' """).collect()

if not approved:
    print("No approved AI SQL candidates found.")
else:
    for row in approved:
        ai_id       = row["id"]
        report_name = row["report_name"]
        sql_text    = row["generated_sql"]
        owner       = row["created_by"]

        print(f"Report -{report_name}")
        
        if not report_name or not sql_text:
            print(f"Skipping id={ai_id}: missing report_name or sql")
            continue

        # metadata adornments (format/title/dashboard/notify/certify/filters)
        meta = META.get(report_name, {})
        export_format = (meta.get("export_format") or "csv").lower()
        widget_title  = meta.get("widget_title") or report_name
        dashboard     = meta.get("dashboard") or None
        filters_meta  = meta.get("filters") or ""
        certify       = bool(meta.get("certify", False))
        notify        = meta.get("notify", [])

        # 1) Create/replace the UC view from the APPROVED SQL
        view_name = sanitize_identifier(f"vw_{report_name}")
        full_view = f"{CATALOG}.{SCHEMA}.{view_name}"
        spark.sql(f"CREATE OR REPLACE VIEW {full_view} AS {sql_text}")

        # annotate view (comments / certification flag)
        desc = meta.get("description", "")
        nl   = meta.get("natural_language", "")
        comment = f"{desc} | {nl}".strip(" |")
        comment = comment.replace("'", "''")
        if comment:
            spark.sql(
                f"COMMENT ON VIEW {full_view} IS '{comment}'"
            )
        if certify:
            spark.sql(
                f"ALTER VIEW {full_view} SET TBLPROPERTIES ('quality'='certified')"
            )

        print(f"Created view: {full_view}")

        # 2) Persist DDL + metadata snapshot
        ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
        dump_dir = f"{VIEW_DUMP_ROOT}/{sanitize_identifier(report_name)}/{ts}_{uuid.uuid4().hex[:6]}"
        ddl = f"CREATE OR REPLACE VIEW {full_view} AS\n{sql_text}\n"
        write_text(f"{dump_dir}/view.sql", ddl)
        write_text(f"{dump_dir}/metadata.json", json.dumps({"ai_id": ai_id, "report_name": report_name, "view": full_view, "metadata": meta}, indent=2))

        # 3) Generate DRAFT artifacts from the view
        draft_paths, manifest_path, manifest, row_count = export_report(
            report_name=report_name,
            sql_select_all_from_view=f"SELECT * FROM {full_view}",
            export_format=export_format,
            widget_title=widget_title,
            filters=filters_meta,
            owner=owner
        )
        print("Draft artifacts:")
        for p in draft_paths:
            print("  ", p)

        # 4) Upsert into tracking table (ready_for_business)
        if notify:
            safe_notify = [n.replace("'", "''") for n in notify]
            notify_sql = "ARRAY(" + ", ".join([f"'{n}'" for n in safe_notify]) + ")"
        else:
            notify_sql = "ARRAY()"

        # Optional fields
        owner_sql      = f"'{esc(owner)}'" if owner else "NULL"
        dashboard_sql  = f"'{esc(dashboard)}'" if dashboard else "''"

        # Required fields as SQL string literals
        ai_id_sql          = f"'{esc(ai_id)}'"
        report_name_sql    = f"'{esc(report_name)}'"
        view_name_sql      = f"'{esc(view_name)}'"
        export_format_sql  = f"'{esc(export_format)}'"
        widget_title_sql   = f"'{esc(widget_title)}'"
        filters_sql        = f"'{esc(filters_meta)}'"
        sql_text_sql       = f"'{esc(sql_text)}'"
        manifest_path_sql  = f"'{esc(manifest_path)}'"

        # Booleans
        certify_sql = 'true' if certify else 'false'

        # Report URL
        artifacts = manifest.get("artifacts", [])
        report_dbfs_path = artifacts[0]["path"] if artifacts else None
        report_url = f"'{files_url(report_dbfs_path)}'"

        # notify_sql already built safely earlier:
        # if notify:
        #     safe_notify = [n.replace("'", "''") for n in notify]
        #     notify_sql = "ARRAY(" + ", ".join([f"'{n}'" for n in safe_notify]) + ")"
        # else:
        #     notify_sql = "ARRAY()"

        # --- Final MERGE ---
        spark.sql(f"""
            MERGE INTO {CATALOG}.{SCHEMA}.report_candidates AS t
            USING (SELECT {ai_id_sql} AS id) s
            ON t.id = s.id
            WHEN MATCHED THEN UPDATE SET
            report_name         = {report_name_sql},
            report_owner               = {owner_sql},
            status              = 'ready_for_business',
            view_name           = {view_name_sql},
            export_format       = {export_format_sql},
            widget_title        = {widget_title_sql},
            filters             = {filters_sql},
            report_sql          = {sql_text_sql},
            draft_manifest_path = {manifest_path_sql},
            report_url          = {report_url},
            dashboard           = {dashboard_sql},
            notify              = {notify_sql},
            certify             = {certify_sql},
            created_at          = current_timestamp()
            WHEN NOT MATCHED THEN INSERT (
            id, report_name, report_owner, status, view_name, export_format, widget_title, filters,
            report_sql, draft_manifest_path, report_url, dashboard, notify, certify, created_at
            ) VALUES (
            {ai_id_sql}, {report_name_sql}, {owner_sql}, 'ready_for_business', {view_name_sql}, {export_format_sql}, {widget_title_sql}, {filters_sql},
            {sql_text_sql}, {manifest_path_sql}, {report_url}, {dashboard_sql}, {notify_sql}, {certify_sql}, current_timestamp()
            )
        """)

print("Done: processed approved AI SQL candidates.")