In [None]:
from delta.tables import *
from pyspark.sql.functions import *
from pyspark.errors import PySparkException

In [None]:
## Parameters ##
dbutils.widgets.text('catalog_source', '')
catalog_source = dbutils.widgets.get('catalog_source')
################
dbutils.widgets.text('catalog_target', '')
catalog_target = dbutils.widgets.get('catalog_target')
################
dbutils.widgets.text('full_catalog', '')
full_catalog = dbutils.widgets.get('full_catalog')
################
dbutils.widgets.text('schemas_clone', '')
schemas_clone = dbutils.widgets.get('schemas_clone')
################
dbutils.widgets.text('tables_clone', '')
tables_clone = dbutils.widgets.get('tables_clone')

In [None]:
schemas_clone = [s for s in schemas_clone.split(',') if s != '']
tables_clone = [t for t in tables_clone.split(',') if t != '']
CLONE_TYPE = 'deep'

In [None]:
def get_schemas_from_catalog(catalog_source):
    try: 
        df_schemas = spark.sql(f'SHOW SCHEMAS IN {catalog_source}')
        lst_schemas = [s[0] for s in df_schemas.select('databaseName').collect()]
    except:
        print(f'Catalog {catalog_source} does not exist.')
        lst_schemas = []
    return lst_schemas

def get_tables_from_schema(catalog_source, schema_name):
    try: 
        df_tables = spark.sql(f'SHOW TABLES IN {catalog_source}.{schema_name}')
        lst_tables = [t[0] for t in df_tables.select('tableName').collect()]
    except:
        print(f'Schema {schema_name} does not exist in {catalog_source}.')
        lst_tables = []
    return lst_tables

def get_container_from_catalog(catalog_target):
    try:
        df = spark.sql(f'DESCRIBE CATALOG EXTENDED {catalog_target}')
        storage_location = df.filter(col('info_name') == 'Storage Root').collect()[0][1]
        container_target = storage_location.replace('abfss://', '')
    except PySparkException as ex:
        if (ex.getErrorClass() == "NO_SUCH_CATALOG_EXCEPTION"):
            container_target = f'Catalog {catalog_target} does not exist.'
        else:
            raise
    return container_target

def clone_table(catalog_source, catalog_target, schema_name, table_name, clone_type):
    container_target = get_container_from_catalog(catalog_target)
    if not(container_target.endswith(' does not exist.')):
        location = f'abfss://{container_target}{schema_name}/{table_name}'
        print(f'Cloning table {catalog_source}.{schema_name}.{table_name} to {catalog_target}...')
        if clone_type.upper() in ['SHALLOW', 'DEEP']:
            try:
                spark.sql(f"""
                    CREATE OR REPLACE TABLE {catalog_target}.{schema_name}.{table_name} 
                    {clone_type} CLONE {catalog_source}.{schema_name}.{table_name} 
                    LOCATION '{location}'
                """)
            except PySparkException as ex:
                if (ex.getErrorClass() == "TABLE_OR_VIEW_NOT_FOUND"):
                    print(f'Table {catalog_source}.{schema_name}.{table_name} does not exist.')
                else:
                    raise
        else:
            print('Invalid Clone Type. Valid clone types are "shallow" or "deep".')
    else:
        print(container_target)

def create_schema(catalog_source, catalog_target, schema_name):
    df_schemas = spark.sql(f'SHOW SCHEMAS IN {catalog_source}')
    lst_schemas = [t[0] for t in df_schemas.select('databaseName').collect() if t[0] == schema_name]
    if len(lst_schemas)>0:
        print(f'Creating schema {schema_name} on catalog {catalog_target}')
        spark.sql(f'CREATE SCHEMA IF NOT EXISTS {catalog_target}.{schema_name}')
    else:
        print(f'Schema {schema_name} does not exist in {catalog_source}.')

In [None]:
if full_catalog == 'Y': 
    # Copy Full Catalog
    for schema_name in get_schemas_from_catalog(catalog_source):
        create_schema(catalog_source, catalog_target, schema_name)
        for table_name in get_tables_from_schema(catalog_source, schema_name):
            clone_table(catalog_source, catalog_target, schema_name, table_name, CLONE_TYPE)
else:
    # Copy Only Specified Tables
    if (len(schemas_clone) == 0 and len(tables_clone) > 0):
        for schema_table in tables_clone:
            schema_name, table_name =  schema_table.split('.')[0], schema_table.split('.')[1]
            create_schema(catalog_source, catalog_target, schema_name)            
            clone_table(catalog_source, catalog_target, schema_name, table_name, CLONE_TYPE)
    # Copy Only Specified Schemas
    elif (len(schemas_clone) > 0 and len(tables_clone) == 0):
        for schema_name in schemas_clone:
            create_schema(catalog_source, catalog_target, schema_name)
            for table_name in get_tables_from_schema(catalog_source, schema_name):
                clone_table(catalog_source, catalog_target, schema_name, table_name, CLONE_TYPE)
    else:
        print('Please provide correct parameters to a full copy of the catalog, a list of schemas, or a list of tables!')