In [0]:
import requests
import base64
import re
import json

# ==========================
# 🔧 CONFIGURATION (EDIT THESE)
# ==========================

# Your Databricks workspace URL (no trailing slash)
DATABRICKS_INSTANCE = "https://<your-databricks-instance>.azuredatabricks.net"

# Starting workspace folder path — script will recurse through subfolders
WORKSPACE_PATH = "/Workspace/Users/<your-username>@<your-domain>.com"

# Target Unity Catalog catalog name
UC_CATALOG = "<target_uc_catalog>"

# Authentication token
# ✅ Best practice: store this in a secret scope or environment variable
TOKEN = dbutils.secrets.get(scope="<your-scope-name>", key="<your-key-name>")

# HTTP headers for REST API calls
headers = {"Authorization": f"Bearer {TOKEN}"}


# ==========================
# 🧠 HELPER FUNCTIONS
# ==========================

def get_hive_schemas():
    """Fetch all Hive Metastore databases dynamically."""
    df = spark.sql("SHOW DATABASES IN hive_metastore")
    return [row.databaseName for row in df.collect()]


def list_workspace_objects(path):
    """List all objects (directories/notebooks) recursively from a workspace path."""
    url = f"{DATABRICKS_INSTANCE}/api/2.0/workspace/list"
    resp = requests.get(url, headers=headers, params={"path": path})
    if resp.status_code == 404:
        return []  # skip missing folders
    resp.raise_for_status()
    return resp.json().get("objects", [])


def export_notebook(path):
    """Export a Databricks notebook as source code."""
    url = f"{DATABRICKS_INSTANCE}/api/2.0/workspace/export"
    resp = requests.get(url, headers=headers, params={"path": path, "format": "SOURCE"})
    resp.raise_for_status()
    data = resp.json()
    content = base64.b64decode(data["content"]).decode("utf-8")
    language = data.get("language", "PYTHON")
    return content, language


def import_notebook(path, content, language):
    """Reimport updated notebook content back into workspace."""
    url = f"{DATABRICKS_INSTANCE}/api/2.0/workspace/import"
    encoded = base64.b64encode(content.encode("utf-8")).decode("utf-8")
    payload = {
        "path": path,
        "format": "SOURCE",
        "language": language,
        "overwrite": True,
        "content": encoded
    }
    resp = requests.post(url, headers=headers, json=payload)
    resp.raise_for_status()


# ==========================
# 🔄 BUILD SCHEMA MAPPINGS
# ==========================

HIVE_SCHEMAS = get_hive_schemas()
SCHEMA_MAPPING = {schema: f"{UC_CATALOG}.{schema}" for schema in HIVE_SCHEMAS}

print("Detected schema mappings:")
print(json.dumps(SCHEMA_MAPPING, indent=2))


# ==========================
# 🧩 REPLACEMENT LOGIC
# ==========================

def update_table_references(content):
    """Replace all hive_metastore.<schema> references with UC equivalents."""
    updated = content

    for old_schema, new_schema in SCHEMA_MAPPING.items():
        # 1️⃣ Replace unquoted hive_metastore.<schema>.<table>
        unquoted = re.compile(
            rf"\b(?:hive_metastore\s*\.\s*)?{re.escape(old_schema)}\s*\.\s*(\w+)\b",
            flags=re.MULTILINE
        )
        updated = unquoted.sub(lambda m: f"{new_schema}.{m.group(1)}", updated)

        # 2️⃣ Replace quoted/backticked references
        quoted = re.compile(
            rf"(['\"`])(?:hive_metastore\s*\.\s*)?{re.escape(old_schema)}\s*\.\s*(\w+)\1",
            flags=re.MULTILINE
        )
        updated = quoted.sub(lambda m: f"{m.group(1)}{new_schema}.{m.group(2)}{m.group(1)}", updated)

        # 3️⃣ Handle variable assignments like schema = "hive_metastore.schema"
        assign = re.compile(
            rf"(\w*_database\s*=\s*)([\"'])(?:hive_metastore\.)?{re.escape(old_schema)}([\"'])",
            flags=re.MULTILINE
        )
        updated = assign.sub(rf"\1'{new_schema}'", updated)

    return updated


# ==========================
# 🚀 WALK WORKSPACE & APPLY CHANGES
# ==========================

def update_notebooks(path):
    """Recursively walk workspace folders and update notebook contents."""
    objects = list_workspace_objects(path)
    for obj in objects:
        if obj["object_type"] == "NOTEBOOK":
            print(f"📝 Processing notebook: {obj['path']}")
            content, language = export_notebook(obj["path"])
            updated = update_table_references(content)
            if updated != content:
                import_notebook(obj["path"], updated, language)
                print(f"✅ Updated: {obj['path']} ({language})")
            else:
                print(f" - No Hive references found in {obj['path']}")
        elif obj["object_type"] == "DIRECTORY":
            update_notebooks(obj["path"])


# ==========================
# 🏁 RUN MIGRATION
# ==========================

print(f"\n🚀 Starting Hive → UC reference migration in: {WORKSPACE_PATH}")
update_notebooks(WORKSPACE_PATH)
print("\n✅ Notebook reference migration completed successfully!")