Migration Databricks Unity Catalog data and schema from one catalog to another.  
Created by Meagan Longoria  
Last updated 25 Sep 2024  
  
Pre-reqs:   
    Must have target external location and target catalog created before running the notebook  
    Must have owner user/group created before running the notebook  
Assumptions:   
    For views and volumes that reference external storage, assumes there is only one source or target storage account  
Notes:  
    Notebook is for running interactively. It prints or displays results to help you understand progress  
    Creates managed tables (and copies data), views, external volumes, managed volumes (does not copy data)  
    Last process is to change the object owner for each object type (table, view, volume)  
    Filters out objects in information schema, but includes objects in default schema  
General pattern:  
    1. Create a dataframe with the objects (tables/views/volumes) to be copied.  
    2. Create a function that dynamically generates the SQL DDL and executes it.  
    3. Loop through the dataframe, check if the object already exists, and call the function to create as needed. 


In [0]:
# create input widgets (only run once)
dbutils.widgets.text("source_catalog", "")
dbutils.widgets.text("target_catalog", "")
dbutils.widgets.text("source_storage","")
dbutils.widgets.text("target_storage","")
dbutils.widgets.text("assigned_owner","")


In [0]:
# assign widget values to variables
source_catalog = dbutils.widgets.get("source_catalog")
target_catalog = dbutils.widgets.get("target_catalog")
source_storage = dbutils.widgets.get("source_storage")
target_storage = dbutils.widgets.get("target_storage")
target_storage = dbutils.widgets.get("assigned_owner")

Copy Tables

In [0]:
# get dataframe of managed tables
df_tables = spark.sql(f"Select * from {source_catalog}.information_schema.tables where table_type = 'MANAGED'")
display(df_tables)

In [0]:
# function for copying tables
def copy_table(source_catalog, target_catalog, schema, table):
    spark.sql(f"CREATE SCHEMA IF NOT EXISTS {target_catalog}.{schema};")

    if spark.catalog.tableExists(f"{target_catalog}.{schema}.{table}"):
        print(f"Skip table {target_catalog}.{schema}.{table}, because the table already exists.")
        
    else:
        print(f"Copy from {source_catalog}.{schema}.{table} into {target_catalog}.{schema}.{table}.")
        source_df = spark.table(f"{source_catalog}.{schema}.{table}")
    
        renamed_cols = [col.replace(" ", "_") for col in source_df.columns]

        renamed_df = source_df.toDF(*renamed_cols)

        renamed_df.write.mode("overwrite").saveAsTable(f"{target_catalog}.{schema}.{table}")
    
    

In [0]:
# loop to call the function for each table
for index, row in df_tables.toPandas().iterrows():
    schema = row[1]
    table = row[2]
    copy_table(source_catalog, target_catalog, schema, table)


In [0]:
# check to see if there are any tables that did not get created in target
df_tables_s = spark.sql(f"Select table_schema, table_name from {source_catalog}.information_schema.tables where table_type = 'MANAGED'")
df_tables_t = spark.sql(f"Select table_schema, table_name from {target_catalog}.information_schema.tables where table_type = 'MANAGED'")
df_tables_left = df_tables_s.exceptAll(df_tables_t)
display(df_tables_left)


Copy Views

In [0]:
# function to copy views
def migrate_view (source_catalog, target_catalog, schema, viewnm):
        vwdef = spark.sql(f"SHOW CREATE TABLE {source_catalog}.{schema}.{viewnm};")
        vwdefp = vwdef.toPandas()
        defstr = vwdefp['createtab_stmt'][0]
        #if view definition contain 3-part name that references catalog, replace source catalog with target catalog in definition
        defstrdb = defstr.replace(source_catalog,target_catalog)
        #if view definition contains reference to storage account, replace source storage with target storage account (make sure external location has been configured in target)
        defstrstr = defstrdb.replace(source_storage, target_storage)
        print (defstrstr)
        spark.sql(f"Use catalog {target_catalog};")
        spark.sql(f"Use schema {schema};")
        spark.sql(defstrstr)


In [0]:
# get df of views
df_views = spark.sql(f"Select * from {source_catalog}.information_schema.views where table_schema <> 'information_schema'")
# loop through views
for index, row in df_views.toPandas().iterrows():
    schema = row[1]
    viewnm = row[2]
# Check if the view exists 
    view_exists = spark.sql(f"""
        SELECT COUNT(*) 
        FROM {target_catalog}.information_schema.views 
        WHERE table_schema = '{schema}' 
        AND table_name = '{viewnm}'
    """).collect()[0][0] > 0
    
    if view_exists:
        print(f"Skip view {target_catalog}.{schema}.{viewnm}, because the view already exists.")
    else:
        migrate_view(source_catalog, target_catalog, schema, viewnm)

In [0]:
# check if all views were copied
df_views_s = spark.sql(f"Select table_schema, table_name from {source_catalog}.information_schema.views")
df_views_t = spark.sql(f"Select table_schema, table_name from {target_catalog}.information_schema.views")
df_views_left = df_views_s.exceptAll(df_views_t)
display(df_views_left)

Copy volumes

In [0]:
# function to copy EXTERNAL volumnes
def migrate_vol (source_catalog, target_catalog, schema, volnm):
        voldef = spark.sql(f"DESCRIBE VOLUME {source_catalog}.{schema}.{volnm};")
        voldefp = voldef.toPandas()
        crstr = f"CREATE EXTERNAL VOLUME {target_catalog}.{schema}.{volnm} LOCATION '"
        defstr = voldefp['storage_location'][0]
        defstrstr = defstr.replace(source_storage, target_storage)
        finalstr = crstr + defstrstr + "';";
        print (finalstr)
        spark.sql(finalstr)


In [0]:
# get df of external volumes
df_vols = spark.sql(f"Select * from {source_catalog}.information_schema.volumes WHERE volume_type = 'EXTERNAL'")

for index, row in df_vols.toPandas().iterrows():
    schema = row[1]
    volnm = row[2]
# Check if the volume exists 
    vol_exists = spark.sql(f"""
        SELECT COUNT(*) 
        FROM {target_catalog}.information_schema.volumes 
        WHERE volume_schema = '{schema}' 
        AND volume_name = '{volnm}'
    """).collect()[0][0] > 0
    
    if vol_exists:
        print(f"Skip volume {target_catalog}.{schema}.{volnm}, because the volume already exists.")
    else:
        spark.sql(f"CREATE SCHEMA IF NOT EXISTS {target_catalog}.{schema};")
        migrate_vol(source_catalog, target_catalog, schema, volnm)

In [0]:
# function to migrate MANAGED volumes
def migrate_mvol (source_catalog, target_catalog, schema, volnm):
        voldef = spark.sql(f"DESCRIBE VOLUME {source_catalog}.{schema}.{volnm};")
        voldefp = voldef.toPandas()
        crstr = f"CREATE VOLUME {target_catalog}.{schema}.{volnm} "
        print (crstr)
        spark.sql(finalstr)


In [0]:
# get df of managed volumes
df_vols = spark.sql(f"Select * from {source_catalog}.information_schema.volumes WHERE volume_type = 'MANAGED'")

# loop through volumes in df
for index, row in df_vols.toPandas().iterrows():
    schema = row[1]
    volnm = row[2]

# Check if the volume exists 
    vol_exists = spark.sql(f"""
        SELECT COUNT(*) 
        FROM {target_catalog}.information_schema.volumes 
        WHERE volume_schema = '{schema}' 
        AND volume_name = '{volnm}'
    """).collect()[0][0] > 0
    
    if vol_exists:
        print(f"Skip volume {target_catalog}.{schema}.{volnm}, because the volume already exists.")
    else:
        spark.sql(f"CREATE SCHEMA IF NOT EXISTS {target_catalog}.{schema};")
        migrate_mvol(source_catalog, target_catalog, schema, volnm)

Change object owner

In [0]:
# function to change volume owner
def migrate_volowner (schema, volnm):
        crstr = f"Alter VOLUME {target_catalog}.{schema}.{volnm} OWNER TO `{assigned_owner}`;"
        print (crstr)
        spark.sql(crstr)


In [0]:

# get df of volumes that need owner changed
df_vols = spark.sql(f"Select * from {target_catalog}.information_schema.volumes WHERE volume_owner <> '{assigned_owner}'")

# loop through volumes
for index, row in df_vols.toPandas().iterrows():
    schema = row[1]
    volnm = row[2]

# Check if the volume exists with correct owner
    vol_exists = spark.sql(f"""
        SELECT COUNT(*) 
        FROM {target_catalog}.information_schema.volumes 
        WHERE volume_schema = '{schema}' 
        AND volume_name = '{volnm}'
        AND volume_owner = '{assigned_owner}'
    """).collect()[0][0] > 0
    
    if vol_exists:
        print(f"Skip volume {target_catalog}.{schema}.{volnm}, because the volume owner already changed.")
    else:
        migrate_volowner(schema, volnm)


In [0]:
# function to update table owner
def migrate_tableowner (schema, tablnm):
        crstr = f"Alter Table {target_catalog}.{schema}.{tablnm} OWNER TO `{assigned_owner}`;"
        print (crstr)
        spark.sql(crstr)


In [0]:

# get df of tables that need owner change
df_tbl = spark.sql(f"Select * from {target_catalog}.information_schema.tables WHERE table_owner <> '{assigned_owner}' and table_schema not in ('information_schema')")

# loop through df of tables
for index, row in df_tbl.toPandas().iterrows():
    schema = row[1]
    tblnm = row[2]
# Check if the table exists with correct owner
    vol_exists = spark.sql(f"""
        SELECT COUNT(*) 
        FROM {target_catalog}.information_schema.tables
        WHERE table_schema = '{schema}' 
        AND table_name = '{tblnm}'
        AND table_owner = '{assigned_owner}'
    """).collect()[0][0] > 0
    
    if vol_exists:
        print(f"Skip table/view {target_catalog}.{schema}.{tblnm}, because the owner already changed.")
    else:
        migrate_tableowner(schema, tblnm)

In [0]:
# function to change schema owner
def migrate_schemaowner (schema):
        crstr = f"""Alter schema {target_catalog}.{schema} OWNER TO `{assigned_owner}`;"""
        print (crstr)
        spark.sql(crstr)

In [0]:

# get list of schemas that need owner changed
df_schema = spark.sql(f"Select * from {target_catalog}.information_schema.schemata where schema_name not in ('information_schema') and schema_owner <> '{assigned_owner}' ")

# loop through df of schemas
for index, row in df_schema.toPandas().iterrows():
    schema = row[1]
  
# Check if the schema exists with correct owner
    vol_exists = spark.sql(f"""
        SELECT COUNT(*) 
        FROM {target_catalog}.information_schema.schemata
        WHERE schema_name = '{schema}' 
        AND schema_owner = '{assigned_owner}'
    """).collect()[0][0] > 0
    
    if vol_exists:
        print(f"Skip schema {target_catalog}.{schema}, because the owner already changed.")
    else:
        migrate_schemaowner(schema)
