In [0]:
# This notebook is designed to automate the process of replicating all data volumes from **source catalogs** to **target catalogs** in Databricks Unity Catalog, while ignoring the `information_schema`. The replication is incremental, copying only new or updated files.

# ## Parameters Overview

# 1.  **catalog_names**:
#     - **Type**: String
#     - **Description**: A comma-separated list of catalog pairs in the format `source_catalog:target_catalog`. The script will discover and replicate all volumes from the source catalog to the target catalog for each pair.
#     - **Example Value**: `'source_catalog_A:target_catalog_A, source_catalog_B:target_catalog_B'`

# 2.  **max_workers**:
#     - **Type**: Integer
#     - **Description**: Maximum number of worker threads for parallel processing of volume replication tasks.

# 3.  **log_table_name**:
#     - **Type**: String
#     - **Description**: The fully qualified name of the log table.


In [0]:
# dbutils.widgets.removeAll()
dbutils.widgets.text("catalog_names", "")#source and target catalog names like dictionary
dbutils.widgets.text("target_schema_owner", "")
dbutils.widgets.text("max_workers", "5")
dbutils.widgets.text("log_table_name", "users.satyendranath_sure.dr_log_table_name")

In [0]:
catalog_pair_names_list = [catalog_pair.strip() for catalog_pair in dbutils.widgets.get("catalog_names").split(",")]
target_schema_owner_list = [catalog_schema_pair.strip() for catalog_schema_pair in dbutils.widgets.get("target_schema_owner").split(",")]
max_workers = int(dbutils.widgets.get("max_workers"))
log_table_name = dbutils.widgets.get("log_table_name")
print(catalog_pair_names_list, target_schema_owner_list, max_workers, log_table_name)

['_eo_amperity:dr_eo_amperity', 'accuweather_daily_and_hourly_forecasts_u_s_postal_codes_sample:dr_accuweather', 'test_not:dr_test_not', 'satya_share:satya_dr_catalog'] ['dr_eo_amperity.bronze:grp_test_clt_usa', 'dr_eo_amperity.silver:grp_test_clt_usa', 'dr_eo_amperity.default:grp_test_clt_usa'] 5 users.satyendranath_sure.dr_log_table_name


In [0]:
# for catalog_pair in catalog_pair_names_list:
#     second_value = catalog_pair.split(":")[1]
#     print(second_value)
#     spark.sql(f"DROP CATALOG IF EXISTS `{second_value}` CASCADE")
# spark.sql(f"DROP table if exists `{log_table_name}`")

In [0]:

import traceback
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, ArrayType
from datetime import datetime
import json
from pyspark.sql import SparkSession
import os

# The 'spark' and 'dbutils' objects are pre-defined in a Databricks environment.

# Get the current notebook name
notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
notebook_name = notebook_path.split('/')[-1]

# Schema definition for log table
log_table_schema = StructType([
    StructField("notebook_name", StringType(), True),
    StructField("entity_type", StringType(), True),
    StructField("entity_name", StringType(), True),
    StructField("action", StringType(), True),
    StructField("status", StringType(), True),
    StructField("message", StringType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("results_data", ArrayType(StructType([
        StructField("key", StringType(), True),
        StructField("value", StringType(), True)
    ])), True)
])

# Create log table
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {log_table_name}
    (
        notebook_name STRING,
        entity_type STRING,
        entity_name STRING,
        action STRING,
        status STRING,
        message STRING,
        timestamp TIMESTAMP,
        results_data ARRAY<STRUCT<key: STRING, value: STRING>>
    )
""")

def insert_log_entry(log_data):
    """
    Inserts a log entry into the designated log table.
    
    Args:
        log_data (dict): A dictionary containing log information
    """
    try:
        log_data["notebook_name"] = notebook_name
        # Ensure results_data is a list of dictionaries as per schema
        if "results_data" not in log_data or not isinstance(log_data["results_data"], list):
            log_data["results_data"] = []
        
        df = spark.createDataFrame([log_data], schema=log_table_schema)
        df.write.mode("append").saveAsTable(log_table_name)
    except Exception as e:
        print(f"Failed to insert log entry: {log_data}. Error: {e}")

def get_previous_run_info(source_path):
    """
    Retrieves the modification times of files from the last successful run for a given source path.
    
    This is used to identify which files are new or have been modified since the last copy.
    
    Args:
        source_path (str): The source path (directory) to check for previously copied files.
    
    Returns:
        dict: A dictionary mapping absolute file paths to their last successful modification timestamp.
    """
    spark_session = SparkSession.builder.getOrCreate()
    try:
        df = spark_session.sql(f"""
            SELECT
                entity_name,
                results_data
            FROM {log_table_name}
            WHERE
                entity_type = 'file' AND
                action = 'copy' AND
                status = 'success'
            ORDER BY timestamp DESC
        """)
        
        previous_runs = {}
        for row in df.collect():
            try:
                file_path = row['entity_name']
                results_data = row['results_data']
                mod_time = None
                for item in results_data:
                    if item.key == 'modificationTime':
                        mod_time = int(item.value)
                        break
                
                if mod_time:
                    # Use absolute file path instead of relative path
                    if file_path not in previous_runs:
                        previous_runs[file_path] = mod_time
            except Exception as e:
                print(f"Error parsing log entry: {e}")
                continue
        return previous_runs
    except Exception as e:
        log_entry = {
            "entity_type": "info",
            "entity_name": source_path,
            "action": "get_previous_run_info",
            "status": "error",
            "message": f"Error fetching previous run info. Error: {e}",
            "timestamp": datetime.now(),
            "results_data": []
        }
        insert_log_entry(log_entry)
        return {}


def sync_volume_recursively(source_path, target_path, prev_run_info):
    """
    Recursively copies files from a source path to a target path,
    copying only new or modified files.
    
    Args:
        source_path (str): The full source path (dbfs:/Volumes/...) to sync from.
        target_path (str): The full target path (dbfs:/Volumes/...) to sync to.
        prev_run_info (dict): A dictionary of previously copied files and their modification times.
    """
    # Ensure the target directory exists before copying files into it.
    try:
        dbutils.fs.mkdirs(target_path)
    except Exception as e:
        log_entry = {
            "entity_type": "directory",
            "entity_name": target_path,
            "action": "create",
            "status": "error",
            "message": f"Failed to create target directory. Error: {e}",
            "timestamp": datetime.now(),
            "results_data": []
        }
        insert_log_entry(log_entry)

    try:
        # List contents of the source path.
        files = dbutils.fs.ls(source_path)
    except Exception as e:
        log_entry = {
            "entity_type": "directory",
            "entity_name": source_path,
            "action": "list",
            "status": "error",
            "message": f"Error listing contents of directory. Error: {e}",
            "timestamp": datetime.now(),
            "results_data": []
        }
        insert_log_entry(log_entry)
        return

    for file_info in files:
        # Check if the item is a directory by looking for a trailing slash in its name
        if file_info.name.endswith('/'):
            # It's a directory, so we need to process it recursively.
            log_entry = {
                "entity_type": "directory",
                "entity_name": file_info.path,
                "action": "list",
                "status": "success",
                "message": "Found directory to traverse.",
                "timestamp": datetime.now(),
                "results_data": []
            }
            insert_log_entry(log_entry)
            # Construct the new target path correctly for the subdirectory
            new_target_path = os.path.join(target_path, file_info.name)
            sync_volume_recursively(file_info.path, new_target_path, prev_run_info)
        else:
            # It's a file. Check if it needs to be copied.
            relative_path = file_info.path.replace(source_path, '', 1).lstrip('/')
            
            # Compare modification times.
            if file_info.path not in prev_run_info or file_info.modificationTime > prev_run_info[file_info.path]:
                try:
                    # Attempt to copy the file.
                    # The dbutils.fs.cp command copies the source file into the destination directory.
                    dbutils.fs.cp(file_info.path, target_path, recurse=True)
                    
                    # Log successful copy with file modification time.
                    results_data = [
                        {"key": "result", "value": "True"},
                        {"key": "modificationTime", "value": str(file_info.modificationTime)},
                        {"key": "fileSize", "value": str(file_info.size)},
                        {"key": "source_path", "value": file_info.path},
                        {"key": "target_path", "value": target_path}
                    ]
                    log_entry = {
                        "entity_type": "file",
                        "entity_name": file_info.path,
                        "action": "copy",
                        "status": "success",
                        "message": "File copied successfully.",
                        "timestamp": datetime.now(),
                        "results_data": results_data
                    }
                    insert_log_entry(log_entry)
                except Exception as e:
                    # Log a failure to copy the file.
                    log_entry = {
                        "entity_type": "file",
                        "entity_name": file_info.path,
                        "action": "copy",
                        "status": "error",
                        "message": f"Error during file copy. Error: {e}",
                        "timestamp": datetime.now(),
                        "results_data": []
                    }
                    insert_log_entry(log_entry)
            else:
                log_entry = {
                    "entity_type": "file",
                    "entity_name": file_info.path,
                    "action": "skip",
                    "status": "success",
                    "message": "File already exists with same or newer modification date. Skipping.",
                    "timestamp": datetime.now(),
                    "results_data": []
                }
                insert_log_entry(log_entry)

def execute_and_log_sql(sql_command, entity_type, entity_name, action, success_msg, error_msg):
    """
    Executes a SQL command and logs the result.
    
    Args:
        sql_command (str): The SQL command to execute.
        entity_type (str): The type of entity being acted upon (e.g., 'catalog', 'schema').
        entity_name (str): The name of the entity.
        action (str): The action being performed (e.g., 'create').
        success_msg (str): Message to log on success.
        error_msg (str): Message to log on error.
    """
    spark_session = SparkSession.builder.getOrCreate()
    try:
        spark_session.sql(sql_command)
        log_entry = {
            "entity_type": entity_type,
            "entity_name": entity_name,
            "action": action,
            "status": "success",
            "message": success_msg,
            "timestamp": datetime.now(),
            "results_data": []
        }
        insert_log_entry(log_entry)
    except Exception as e:
        log_entry = {
            "entity_type": entity_type,
            "entity_name": entity_name,
            "action": action,
            "status": "error",
            "message": f"{error_msg} Error: {e}",
            "timestamp": datetime.now(),
            "results_data": []
        }
        insert_log_entry(log_entry)


def process_volume_sync(source_catalog, source_schema, target_catalog, target_schema, volume_name):
    """
    Processes the synchronization for a single volume.
    
    Args:
        source_catalog (str): The source catalog name.
        source_schema (str): The source schema name.
        target_catalog (str): The target catalog name.
        target_schema (str): The target schema name.
        volume_name (str): The name of the volume to sync.
    """
    print(f"Starting sync for volume: {source_catalog}.{source_schema}.{volume_name}")
    spark_session = SparkSession.builder.getOrCreate()
    try:
        # 1. Create the target volume if it doesn't exist.
        sql_command = f"CREATE VOLUME IF NOT EXISTS `{target_catalog}`.`{target_schema}`.`{volume_name}`"
        try:
            spark_session.sql(sql_command)
            log_entry = {
                "entity_type": "volume",
                "entity_name": f"{target_catalog}.{target_schema}.{volume_name}",
                "action": "create",
                "status": "success",
                "message": "Volume created or already exists.",
                "timestamp": datetime.now(),
                "results_data": []
            }
            insert_log_entry(log_entry)
        except Exception as e:
            log_entry = {
                "entity_type": "volume",
                "entity_name": f"{target_catalog}.{target_schema}.{volume_name}",
                "action": "create",
                "status": "error",
                "message": f"Failed to create volume. Error: {e}",
                "timestamp": datetime.now(),
                "results_data": []
            }
            insert_log_entry(log_entry)
            return

        # 2. Construct source and target paths for file system operations.
        source_path = os.path.join("/Volumes", source_catalog, source_schema, volume_name)
        target_path = os.path.join("/Volumes", target_catalog, target_schema, volume_name)
        
        # 3. Get modification dates from previous runs to enable efficient sync.
        prev_run_info = get_previous_run_info(source_path)

        # 4. Start the recursive file sync.
        sync_volume_recursively(source_path, target_path, prev_run_info)
        
    except Exception as e:
        log_entry = {
            "entity_type": "volume_sync",
            "entity_name": f"{source_catalog}.{source_schema}.{volume_name}",
            "action": "sync",
            "status": "error",
            "message": f"An unexpected error occurred during volume sync. Error: {e}",
            "timestamp": datetime.now(),
            "results_data": []
        }
        insert_log_entry(log_entry)

# ==============================================================================
# MAIN EXECUTION LOGIC
# ==============================================================================
if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    spark.sql("SET spark.sql.catalog.spark_catalog = 'spark_catalog'")
    
    print("Starting Databricks Volume DR Sync.")
    
    log_entry = {
        "entity_type": "info",
        "entity_name": "N/A",
        "action": "start_sync",
        "status": "success",
        "message": "Starting Databricks Volume DR Sync.",
        "timestamp": datetime.now(),
        "results_data": []
    }
    insert_log_entry(log_entry)

    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []

        for catalog_pair in catalog_pair_names_list:
            try:
                source_catalog, target_catalog = catalog_pair.split(':')
            except ValueError:
                log_entry = {
                    "entity_type": "info",
                    "entity_name": catalog_pair,
                    "action": "parse_catalog_pair",
                    "status": "error",
                    "message": f"Invalid catalog pair format: {catalog_pair}. Skipping.",
                    "timestamp": datetime.now(),
                    "results_data": []
                }
                insert_log_entry(log_entry)
                continue

            try:
                # 1. Create target catalog if it doesn't exist.
                execute_and_log_sql(
                    f"CREATE CATALOG IF NOT EXISTS `{target_catalog}`",
                    "catalog",
                    target_catalog,
                    "create",
                    f"Catalog `{target_catalog}` created or already exists.",
                    f"Failed to create catalog `{target_catalog}`."
                )
                
                # 2. Get list of schemas in the source catalog.
                schemas_df = spark.sql(f"SHOW SCHEMAS IN `{source_catalog}`")
                schemas_to_process = [row['databaseName'] for row in schemas_df.collect()]
                
                for source_schema in schemas_to_process:
                    # 3. Ignore 'information_schema'.
                    if source_schema.lower() == 'information_schema':
                        log_entry = {
                            "entity_type": "schema",
                            "entity_name": f"{source_catalog}.{source_schema}",
                            "action": "skip",
                            "status": "success",
                            "message": "Skipping 'information_schema'.",
                            "timestamp": datetime.now(),
                            "results_data": []
                        }
                        insert_log_entry(log_entry)
                        continue

                    try:
                        # 4. Create target schema if it doesn't exist.
                        execute_and_log_sql(
                            f"CREATE SCHEMA IF NOT EXISTS `{target_catalog}`.`{source_schema}`",
                            "schema",
                            f"{target_catalog}.{source_schema}",
                            "create",
                            f"Schema `{source_schema}` in catalog `{target_catalog}` created or already exists.",
                            f"Failed to create schema `{source_schema}`."
                        )
                        
                        # 5. Get list of volumes in the source schema.
                        volumes_df = spark.sql(f"SHOW VOLUMES IN `{source_catalog}`.`{source_schema}`")
                        volumes_to_process = [row['volume_name'] for row in volumes_df.collect()]
                        
                        for volume_name in volumes_to_process:
                            # 6. Submit volume sync tasks to the thread pool.
                            future = executor.submit(
                                process_volume_sync,
                                source_catalog,
                                source_schema,
                                target_catalog,
                                source_schema, # Use source_schema as target_schema for simplicity
                                volume_name
                            )
                            futures.append(future)

                    except Exception as e:
                        log_entry = {
                            "entity_type": "schema",
                            "entity_name": f"{source_catalog}.{source_schema}",
                            "action": "process",
                            "status": "error",
                            "message": f"An error occurred while processing schema. Error: {e}",
                            "timestamp": datetime.now(),
                            "results_data": []
                        }
                        insert_log_entry(log_entry)

            except Exception as e:
                log_entry = {
                    "entity_type": "catalog",
                    "entity_name": source_catalog,
                    "action": "process",
                    "status": "error",
                    "message": f"An error occurred while processing catalog pair. Error: {e}",
                    "timestamp": datetime.now(),
                    "results_data": []
                }
                insert_log_entry(log_entry)

        # Wait for all tasks to complete and retrieve results.
        for future in concurrent.futures.as_completed(futures):
            try:
                future.result()
            except Exception as e:
                log_entry = {
                    "entity_type": "info",
                    "entity_name": "N/A",
                    "action": "future_result",
                    "status": "error",
                    "message": f"An unexpected error occurred in a thread. Error: {e}",
                    "timestamp": datetime.now(),
                    "results_data": []
                }
                insert_log_entry(log_entry)
                
    print("Databricks Volume DR Sync completed.")

    log_entry = {
        "entity_type": "info",
        "entity_name": "N/A",
        "action": "end_sync",
        "status": "success",
        "message": "Databricks Volume DR Sync completed.",
        "timestamp": datetime.now(),
        "results_data": []
    }
    insert_log_entry(log_entry)





Starting Databricks Volume DR Sync.
Starting sync for volume: _eo_amperity.bronze.chuck
Starting sync for volume: satya_share.satyendranath_sure.satya_external_volume
Starting sync for volume: satya_share.satyendranath_sure.satya_volume
Databricks Volume DR Sync completed.


In [0]:
# spark.sql(f"""
#     DELETE
#     FROM {log_table_name}
#     WHERE notebook_name = '{notebook_name}'
# """)

DataFrame[num_affected_rows: bigint]

In [0]:
# Display logs for the last 24 hours
print(f"\n--- Displaying logs from {log_table_name} for the last 24 hours ---")
# past_24_hours = datetime.now() - timedelta(hours=24)
log_query_df = spark.sql(f"""
    SELECT *
    FROM {log_table_name} where notebook_name = '{notebook_name}'
    ORDER BY timestamp DESC
""")

display(log_query_df)


--- Displaying logs from users.satyendranath_sure.dr_log_table_name for the last 24 hours ---


notebook_name,entity_type,entity_name,action,status,message,timestamp,results_data,watermark_col
DR_Volume_data,info,,end_sync,success,Databricks Volume DR Sync completed.,2025-09-22T18:20:55.824764Z,List(),
DR_Volume_data,file,dbfs:/Volumes/_eo_amperity/bronze/chuck/tiny.json,skip,success,File already exists with same or newer modification date. Skipping.,2025-09-22T18:20:55.077975Z,List(),
DR_Volume_data,file,dbfs:/Volumes/_eo_amperity/bronze/chuck/stitch.txt,skip,success,File already exists with same or newer modification date. Skipping.,2025-09-22T18:20:54.320609Z,List(),
DR_Volume_data,file,dbfs:/Volumes/_eo_amperity/bronze/chuck/stitch-2025-06-11_10-46.json,skip,success,File already exists with same or newer modification date. Skipping.,2025-09-22T18:20:53.5675Z,List(),
DR_Volume_data,file,dbfs:/Volumes/_eo_amperity/bronze/chuck/stitch-2025-06-10_11-26.json,skip,success,File already exists with same or newer modification date. Skipping.,2025-09-22T18:20:52.792117Z,List(),
DR_Volume_data,file,dbfs:/Volumes/_eo_amperity/bronze/chuck/stitch-2025-06-08_18-02.json,skip,success,File already exists with same or newer modification date. Skipping.,2025-09-22T18:20:52.050119Z,List(),
DR_Volume_data,file,dbfs:/Volumes/_eo_amperity/bronze/chuck/stitch-2025-06-08_15-57.json,skip,success,File already exists with same or newer modification date. Skipping.,2025-09-22T18:20:50.84234Z,List(),
DR_Volume_data,file,dbfs:/Volumes/_eo_amperity/bronze/chuck/stitch-2025-06-05_13-39.json,skip,success,File already exists with same or newer modification date. Skipping.,2025-09-22T18:20:49.893648Z,List(),
DR_Volume_data,file,dbfs:/Volumes/_eo_amperity/bronze/chuck/stitch-2025-06-04_18-14.json,skip,success,File already exists with same or newer modification date. Skipping.,2025-09-22T18:20:49.143843Z,List(),
DR_Volume_data,file,dbfs:/Volumes/_eo_amperity/bronze/chuck/stitch-2025-06-04_17-57.json,skip,success,File already exists with same or newer modification date. Skipping.,2025-09-22T18:20:48.386301Z,List(),


In [0]:
# %sql
# drop table 
# -- select * from 
# -- users.satyendranath_sure.dr_log_table_name

