##### Prerequisites
Please read the disaster recovery guidance found in <a href="https://learn.microsoft.com/en-us/fabric/security/experience-specific-guidance">the documentation</a> to obtain a general understanding of how to recover lakehouse and warehouse data.
When the OneLake DR setting has been enabled at capacity level, lakehouse and warehouse data will be geo-replicated to the secondary (paired) region - which may not be accessible through the normal Fabric UI experience. Therefore, in a DR scenario, this data will need to be recovered into a corresponding (new) workspace in the DR region using the storage endpoints (abfs paths) as depicted in the image below. 
<!--<div style="margin: 0 auto; text-align: center; overflow: hidden;">
<div style="float: left;"> -->
<img src="https://github.com/hurtn/images/blob/main/reco_from_docs.png?raw=true" width="800"/>
<!--<small><b>Figure 1</b></small></div></div><br> -->

To use this recovery notebook please ensure:<p>&nbsp;</p>
1. You have imported and run the "01 - Run in Primary" notebook in the primary region.
<!-- <div style="margin: 0 auto; text-align: center; overflow: hidden;">
<div style="float: left;"> -->
<img src="https://github.com/hurtn/images/blob/main/before_recovery.png?raw=true" width="800"/>
<!-- <small><b>Figure 2</b></small></div></div><br> --><p>&nbsp;</p>
2. There is at least one capacity (C2) created in the DR region. 
<img src="https://github.com/hurtn/images/blob/main/starting_reco_stage1v2.png?raw=true" width="800"/>
<!--<small><b>Figure 3</b></small> </div></div><br> --><p>&nbsp;</p>
3. A recovery workspace (Reco2) has been created in the secondary region (C2.Reco2) which contains this notebook "02 - Run in DR" and attached to a default lakehouse (C2.Reco2.LH_BCDR).
<img src="https://github.com/hurtn/images/blob/main/starting_reco_stage2v2.png?raw=true" width="800"/>
<!-- <small><b>Figure 4</b></small> </div></div> -->
<p>&nbsp;</p>
4. Set the values for the notebook parameters below according to your item names. Sample names are shown below if using the naming scheme in the image above. Note: If passing these parameters in from a pipeline these defaults will be overridden:<p>&nbsp;</p>
<left>
p_bcdr_workspace_src -> Reco1 <br>
p_bcdr_workspace_tgt -> Reco2  <br>
p_bcdr_lakehouse_src -> LH_BCDR <br>
p_bcdr_lakehouse_tgt -> LH_BCDR <br>
p_secondary_ws_suffix eg '_SECONDARY' - specify a suffix which will be appended to each workspace created in the DR region. This is to ensure uniqueness of workspace names and is useful for identifying these worksspaces when testing in a non-DR scenario also.  <br>
p_recovered_object_suffix eg '_recovered'- specify a suffix that will be appended to each table name created in the recovery lakehouse C2.Reco2.LH_BCDR <br>
list_of_workspaces_to_recover eg ['WS1','WS2'] - specify a list of workspaces to specifically recover. This is useful for testing in a non-DR scenario also. Leave empty [''] for all workspaces.  <br>

In [ ]:
# Specify the source and target BCDR workspaces and lakehouses
p_bcdr_workspace_src = ''
p_bcdr_workspace_tgt = ''
p_bcdr_lakehouse_src = ''
p_bcdr_lakehouse_tgt = ''

# Specify the DR capacity ID. To get the list of IDs simply run fabric.list_capacities() in a new cell below the loading the utility functions
p_dr_capacity_id = ''

# This variable adds a suffix to the name of the new workspaces created to ensure there are no naming conflicts with the original workspace name. 
# Ensure that you use a string that will gaurantee uniqueness rather than common terms which may be used by others in day to day activities.  
p_secondary_ws_suffix = '_SECONDARY'
p_recovered_object_suffix = '_recovered'

# Determines whether to add role assignments to the new workspaces. If you prefer to apply these at a later stage set the value to False. 
p_add_ws_role_assignments = True

# List parameters below need to be in the format of ['string1','string2',...'stringn']. Empty lists must be declared as []
# Specify the list of workspaces to recover, leave empty [] to recover all. For specific workspaces e.g. p_list_of_workspaces_to_recover = ['Workspace1','Workspace2'] 
p_list_of_workspaces_to_recover = []
# Specify an exact list of workspaces to ignore e.g. p_ws_ignore_list = ['Microsoft Fabric Capacity Metrics 26/02/2024 16:15:42','AdminInsights']
p_ws_ignore_list = [] # add workspaces to this list to ignore them from the metadata extract process including git details
# Specify a list with wildcards using % e.g. to ignore anything with _DEV and _TEST as a suffix p_ws_ignore_like_list = ['%_DEV%','%_TEST%']  
p_ws_ignore_like_list = []

# Boolean parameter to specify verbose informational messages. 
# Only set to True if additional logging information required, otherwise notebook may generate significant (print) messages.
p_logging_verbose = False

##### Imports and Utility Functions
Ensure the cell below is runs successfully to include all the helper utilities

In [ ]:
%run workspaceutils

###### Check default lakehouse

In [ ]:
if (notebookutils.runtime.context['defaultLakehouseId']==None):
    displayHTML('<div style="display: flex; align-items: flex-end;"><img style="float: left; margin-right: 10px;" src="https://github.com/hurtn/images/blob/main/stop.png?raw=true" width="50"><span><h4>Please set a default lakehouse before proceeding</span><img style="float: right; margin-left: 10px;" src="https://github.com/hurtn/images/blob/main/stop.png?raw=true" width="50"></div>')
    print('\n')
    raise noDefaultLakehouseException('No default lakehouse found. Please add a lakehouse to this notebook.')
else: 
    print('Default lakehouse is set to '+ notebookutils.runtime.context['defaultLakehouseName'] + '('+ notebookutils.runtime.context['defaultLakehouseId'] + ')')

##### Stage 4: Recover metadata tables from the BCDR workspace

In order for the recovery process to begin, it needs the metadata of the primary environment (created by running the Store DR Metadata notebook) such as workspaces. This data will be persisted as tables in the default lakehouse of this notebook.
<div>
<img src="https://github.com/hurtn/images/blob/main/starting_reco_stage2v3.png?raw=true" width="800"/>
</div>


In [ ]:
# Gathers the list of recovers tables and source paths to be copied into the lakehouse associated with this notebook 

src_path = f'abfss://{p_bcdr_workspace_src}@onelake.dfs.fabric.microsoft.com/{p_bcdr_lakehouse_src}.Lakehouse'

table_list = get_lh_object_list(src_path)
print('The following tables will attempt to be recovered and persisted as tables in the default lakehouse of this notebook...')
display(table_list)

In [ ]:
print('Restoring recovery tables...')
res = copy_lh_objects(table_list[table_list['type']=='table'],p_bcdr_workspace_src,p_bcdr_workspace_tgt,
                      p_bcdr_lakehouse_src,p_bcdr_lakehouse_tgt,p_recovered_object_suffix,False)
print('Done.')
display(res)


##### Stage 5: Recreate workspaces
Recover the workspaces that used to exist in the primary. The suffix parameter will be appended to the workspace name


In [ ]:
recovered_ws_sql = "SELECT distinct ID,Type,Name,Capacity_Id FROM workspaces" + p_recovered_object_suffix + " where 1=1 "

if len(p_list_of_workspaces_to_recover)>0:
  recovered_ws_sql = recovered_ws_sql+" and Name in ('" +  "', '".join(p_list_of_workspaces_to_recover)+ "') "

if len(p_ws_ignore_list)>0:
   recovered_ws_sql = recovered_ws_sql+ " and Name not in ('" + "', '".join(p_ws_ignore_list)+ "') "

if len(p_ws_ignore_like_list)>0:
    for notlike in p_ws_ignore_like_list:
        recovered_ws_sql  = recovered_ws_sql + " and name not like '" + notlike + "'"

print('Recreating workspaces with suffix of '+ p_secondary_ws_suffix + '...')
#print(recovered_ws_sql)
df = spark.sql(recovered_ws_sql).collect()
for i in df:
    #print(i['ID'])
    if i['Type'] == 'Workspace':
      try:
        if p_logging_verbose:
          print("Creating workspace: " + i['Name']+p_secondary_ws_suffix + " in capacity "+ i['Capacity_Id']+"...")
        response = fabric.create_workspace(i['Name']+p_secondary_ws_suffix,p_dr_capacity_id)
        if p_logging_verbose:
          print("Created workspace with ID: " + response)
      except Exception as error:
          errmsg =  "Failed to recreate workspace " + i['Name'] +p_secondary_ws_suffix + " with capacity ID ("+ i['Capacity_Id'] + ") due to: "+str(error)
          print(errmsg)
print('Now reloading workspace metadata table...')
# Now popupate the workspace metadata table
saveWorkspaceMeta()
print('Done.')


#### Stage 6: Connect Git and Initialise

In [ ]:
gitsql = "select gt.gitConnectionState,gt.gitProviderDetails, wks.name Workspace_Name, wks.id Workspace_ID from gitconnections_recovered gt " \
         "inner join workspaces wks on gt.Workspace_Name = replace(wks.name,'" + p_secondary_ws_suffix+ "','') " \
         "where gt.gitConnectionState = 'ConnectedAndInitialized' and wks.name like '%"+p_secondary_ws_suffix+"' and wks.id != '"+thisWsId+"'" 

if len(p_list_of_workspaces_to_recover)>0:
    gitsql = gitsql+" and gt.Workspace_Name in ('""" +  "', '".join(p_list_of_workspaces_to_recover)+ "') "

if len(p_ws_ignore_list)>0:
    gitsql = gitsql+" and gt.Workspace_Name not in ('" + "', '".join(p_ws_ignore_list)+ "') "

if len(p_ws_ignore_like_list)>0:
    for notlike in p_ws_ignore_like_list:
        gitsql  = gitsql + " and name not like '" + notlike + "'"

print('Reconnecting workspaces to Git...')
df = spark.sql(gitsql).collect()

for idx,i in enumerate(df):
    if i['gitConnectionState'] == 'ConnectedAndInitialized':
        
        url = "/v1/workspaces/" + i['Workspace_ID'] + "/git/connect"
        payload = '{"gitProviderDetails": ' + i['gitProviderDetails'] + '}'
        #print(str(payload))

        try:
            if p_logging_verbose:
                print('Attempting to connect workspace '+ i['Workspace_Name'])
            response = client.post(url,json= json.loads(payload))
            if p_logging_verbose:
                print(str(response.status_code) + response.text) 
            success = True
            
        except Exception as error:
            error_string = str(error)
            error_index = error_string.find("Error:")

            # Extract the substring after "Error:"
            error_message = error_string[error_index + len("Error:"):].strip()
            headers_index = error_message.find("Headers:")

            # Extract the substring before "Headers:"
            error_message = error_message[:headers_index].strip()
            error_data = json.loads(error_message)
            # Extract the error message
            error_message = error_data.get("message", "")

            errmsg =  "Couldn't connect git to workspace " + i['Workspace_Name'] + "("+ i['Workspace_ID'] + "). Error: "+str(error_message)
            print(str(errmsg))
            success = False
        # If connection successful then try to initialise    
        if (success):
            url = "/v1/workspaces/" + i['Workspace_ID'] + "/git/initializeConnection"
            payload = {"initializationStrategy":"PreferRemote"}
            try:
                if p_logging_verbose:
                    print('Attempting to initialize git connection for workspace '+ i['Workspace_Name'])
                response = client.post(url,json= payload)
                #print(str(response.status_code) + response.text) 
                commithash = response.json()['remoteCommitHash']
                if p_logging_verbose:
                    print('Successfully initialized. Updating with commithash '+commithash)
                if commithash!='':
                    url = "/v1/workspaces/" + i['Workspace_ID'] + "/git/updateFromGit"
                    payload = '{"remoteCommitHash": "' + commithash + '","conflictResolution": {"conflictResolutionType": "Workspace","conflictResolutionPolicy": "PreferWorkspace"},"options": {"allowOverrideItems": true}}'
                    response = client.post(url,json= json.loads(payload))
                    if p_logging_verbose and response.status_code==200:
                        print('Successfully started sync')
            except Exception as error:
                success = False
                error_string = str(error)
                error_index = error_string.find("Error:")

                # Extract the substring after "Error:"
                error_message = error_string[error_index + len("Error:"):].strip()
                headers_index = error_message.find("Headers:")

                # Extract the substring before "Headers:"
                error_message = error_message[:headers_index].strip()
                error_data = json.loads(error_message)
                # Extract the error message
                error_message = error_data.get("message", "")
                errmsg =  "Couldn't initialize git for workspace " + i['Workspace_Name'] + "("+ i['Workspace_ID'] + "). Error: "+str(error_message)
                print(str(errmsg))

                        
if success:        
    print('Done')
else:
    print('Completed with errors.')

<div style="display: flex; align-items: flex-end;"><img style="float: left; margin-right: 10px;" src="https://github.com/hurtn/images/blob/main/stop.png?raw=true" width="50"><span><h5>Only run the cell below if you wish to disconnect the workspaces from git and re-run the cell above</span></div>
Note: Using Run all will not run this cell as it should be frozen


In [ ]:
# disconnect

for idx,i in enumerate(df):
    if i['gitConnectionState'] == 'ConnectedAndInitialized':

            url = "/v1/workspaces/" + i['Workspace_ID'] + "/git/disconnect"
            response = client.post(url)



##### After git sync completes then capture environment metadata (items and reports)  
#TODO: Need to write a method to check the sync is complete before attempting to extract the items. For now you will need to wait until the sync has complete and all workspaces have been updated from git. For now adding a sleep into the process to make sure it waits a minute for the sync to complete.

In [ ]:
import time
# temoparary wait while items sync from git. In future this code should be replaced to check for complete status using LRO before proceeding
time.sleep(60)
print('Gathering metadata about reports and items... ')

saveItemMeta(verbose_logging=p_logging_verbose, ws_ignore_list=p_ws_ignore_list,ws_ignore_like_list=p_ws_ignore_like_list)
saveReportMeta(verbose_logging=p_logging_verbose,only_secondary=True,ws_ignore_list=p_ws_ignore_list,ws_ignore_like_list=p_ws_ignore_like_list)

##### Stage 7: Recover lakehouse data (files and tables) to corresponding recovered workspaces

<div>
<img src="https://github.com/hurtn/images/blob/main/lh_reocvery.png?raw=true" width="800"/>
</div>

In [ ]:
thisWsId = notebookutils.runtime.context['currentWorkspaceId'] #failsafe: obtaining this workspace id so we don't accidently overwrite objects in this workspace!

data_recovery_sql = """select wr.Name primary_ws_name, ir.type primary_type, ir.DisplayName primary_name,  
wr.id primary_ws_id, ir.id primary_id,wks.id secondary_ws_id, wks.Name secondary_ws_name, it.id secondary_id, 
it.DisplayName secondary_name, cr.display_name capacity_name
from items_recovered ir 
    inner join workspaces_recovered wr on wr.Id = ir.WorkspaceId 
    inner join capacities_recovered cr on wr.capacity_id = cr.id
    inner join workspaces wks on wr.Name = replace(wks.name,'""" + p_secondary_ws_suffix+ """','')
    inner join items it on ir.DisplayName = it.DisplayName and it.WorkspaceId = wks.Id 
where ir.type in ('Lakehouse','Warehouse') and it.type = ir.type and wr.Name in ('""" +  "', '".join(p_list_of_workspaces_to_recover)+ """') 
     and wks.name like '%"""+p_secondary_ws_suffix+"""'  and wks.id != '"""+thisWsId+"""' order by ir.type,primary_ws_name"""
#print(data_recovery_sql)
print("The subsequent notebook cells attempt to recover lakehouse and warehouse data for the following items...")
df_recovery_items=spark.sql(data_recovery_sql)

# populate dataframes for lakehouse metadata and warehouse metadata respectively 
lakehousedf = df_recovery_items.filter("primary_type='Lakehouse'").collect()
warehousedf = df_recovery_items.filter("primary_type='Warehouse'").collect()

display(df_recovery_items.select("primary_ws_name","primary_type","primary_name","secondary_ws_name","secondary_name", "secondary_ws_id", "capacity_name"))



In [ ]:
print('The following lakehouse(s) will attempt to be recovered... ')
display(lakehousedf)

for idx,i in enumerate(lakehousedf):
    # Define the full abfss source path of the primary BCDR workspace and lakehouse 
    src_path = f'abfss://'+i['primary_ws_id']+'@onelake.dfs.fabric.microsoft.com/'+i['primary_id']
    if p_logging_verbose:
        print('Attempting to recover items for workspace: ' + i['primary_ws_name'] + ', lakehouse: ' + i['primary_name'] + ' into target workspace ' + i['secondary_ws_name'] + ' lakehouse ' + i['secondary_name'])
    table_list = get_lh_object_list(src_path)

    #recover files
    copy_lh_objects(table_list[table_list['type']=='file'],i['primary_ws_id'], i['secondary_ws_id'] ,i['primary_id'],i['secondary_id'],p_recovered_object_suffix,False,True)
    #recover tables
    copy_lh_objects(table_list[table_list['type']=='table'],i['primary_ws_id'], i['secondary_ws_id'] ,i['primary_id'],i['secondary_id'],p_recovered_object_suffix,False,True)

print('Done')

##### Stage 8: Prepare warehouse recovery
###### Important: 
This process creates a staging lakehouse to store shortcuts which point back to the DR copy of the tables (actually delta folders) - in testing this will point back to folders in the primary region but in a true DR scenario where Microsoft has failed over the OneLake endpoints, they will point back to folders in the secondary (paired) storage region.
A parameterised pipeline is injected into each target workspace which will load target tables from these shortcuts.
This staging lakehouse and pipeline can be deleted manually after the datawarehouse tables have been successfully recovered.
<div>
<img src="https://github.com/hurtn/images/blob/main/wh_recovery.png?raw=true" width="1000"/>
</div>

In [ ]:
print('The following warehouse(s) will attempt to be recovered... ')
display(warehousedf)
print('\nPreparing for recovery...\n')
# interate through all the data warehouses to recover
for idx,i in enumerate(warehousedf):
    if p_logging_verbose:
        print('Setting up for recovery of warehouse '+i['primary_ws_name'] + '.'+i['primary_name'] + ' into ' + i['secondary_ws_name'] + '.'+i['secondary_name'] )

    src_path = f'abfss://'+i['primary_ws_id']+'@onelake.dfs.fabric.microsoft.com/'+i['primary_id']
    tgt_path = f'abfss://'+i['secondary_ws_id']+'@onelake.dfs.fabric.microsoft.com/'+i['secondary_id']

    # extract the list of schemas per data 
    schema_list = get_lh_object_list(src_path,['Tables'])
    #display(schema_list)
    # extract a list of warehouse objects per schema and store in a list
    table_list = get_wh_object_list(schema_list['name'].to_list(),src_path)
    #display(table_list)
  
    # create a temporary staging lakehouse per warehouse to create shortcuts into, 
    # which point back to original warehouse data currently in the DR storage account
    lhname = 'temp_rlh_' + i['primary_ws_name']+'_'+i['primary_name']
    # check if it exists before attempting create
    if p_logging_verbose:
        print('Checking whether the temporary lakehouse "'+ lhname +'" exists in workspace '+i['secondary_ws_name']+'...')
    temp_lh_id = getItemId(i['secondary_ws_id'],lhname,'Lakehouse')
    if temp_lh_id == 'NotExists':
        lhname = 'temp_rlh_' + i['primary_ws_name']+'_'+i['primary_name'][:256] # lakehouse name should not exceed 256 characters
        payload = payload = '{"displayName": "' + lhname + '",' \
        + '"description":  "Interim staging lakehouse for primary warehouse recovery: ' \
        + i['primary_ws_name']+'_'+i['primary_name'] + 'into workspace '+ i['secondary_ws_name'] + '(' + i['secondary_ws_id'] +')"}'
        try:
            lhurl = "v1/workspaces/" + i['secondary_ws_id'] + "/lakehouses"
            lhresponse = client.post(lhurl,json= json.loads(payload))
            temp_lh_id = lhresponse.json()['id']
            if p_logging_verbose:
                print('Temporary lakehouse "'+ lhname +'" created with Id ' + temp_lh_id + ': ' + str(lhresponse.status_code) + ' ' + str(lhresponse.text))
        except Exception as error:
            print(error.errorCode)
    else:
        if p_logging_verbose:
            print('Temporary lakehouse '+lhname+' (' + temp_lh_id + ') already exists.')
        

    # Create shortcuts for every table in the format of schema_table under the tables folder
    for index,itable in table_list.iterrows():
        #print(itable)
        #url = "/v1/workspaces/" + i['secondary_ws_id'] + "/items/"+ i['secondary_id'] +"/shortcuts"
        #df94e950-e4e3-4b96-86c3-154b884c6a31
        shortcutExists=False
        # Check if shortcut exists
        try:
            url = "v1/workspaces/" + i['secondary_ws_id'] + "/items/" + temp_lh_id + "/shortcuts/Tables/"+itable['schema']+'_'+itable['name']
            tlhresponse = client.get(url)
            shortcutExists = True
            if p_logging_verbose:
                print('Shortcut '+itable['schema']+'_'+itable['name'] +' already exists')
        except Exception as error:
            shortcutExists = False    

        if not shortcutExists: 
            # Create shortcuts - one per table per schema
            url = "v1/workspaces/" + i['secondary_ws_id'] + "/items/" + temp_lh_id + "/shortcuts"
            scpayload = '{' \
            '"path": "Tables/",' \
            '"name": "'+itable['schema']+'_'+itable['name']+'",' \
            '"target": {' \
            '"oneLake": {' \
                '"workspaceId": "' + i['primary_ws_id'] + '",' \
                '"itemId": "'+ i['primary_id'] +'",' \
                '"path": "/Tables/' + itable['schema']+'/'+itable['name'] + '"' \
                '}}}' 
            try:
                #print(scpayload)                
                shctresponse = client.post(url,json= json.loads(scpayload))
                if p_logging_verbose:
                    print('Shortcut '+itable['schema']+'_'+itable['name'] + ' created.' )

            except Exception as error:
                print('Error creating shortcut '+itable['schema']+'_'+itable['name']+' due to '+str(error) + ':' + shctresponse.text)
    
    recovery_pipeline_prefix= 'plRecover_Warehouse'       
    # recovery pipeline name should not exceed 256 characters
    recovery_pipeline = recovery_pipeline_prefix+'_'+i['primary_ws_name'] + '_'+i['primary_name'][:256]
    if p_logging_verbose:
        print('Attempting to deploy a copy pipeline in the target workspace to load the target warehouse tables from the shortcuts created above... ')
    # Create the pipeline in the target workspace that loads the target warehouse from shortcuts created above 
    plid = getItemId( i['secondary_ws_id'],recovery_pipeline,'DataPipeline')
    #print(plid)
    if plid == 'NotExists':
      plid = createDWrecoverypl(i['secondary_ws_id'],recovery_pipeline_prefix+'_'+i['primary_ws_name'] + '_'+i['primary_name'])
      if p_logging_verbose:
          print('Recovery pipeline ' + recovery_pipeline + ' created with Id '+plid)
    else:
      if p_logging_verbose:
          print('Datawarehouse recovery pipeline "' + recovery_pipeline + '" ('+plid+') already exist in workspace "'+i['secondary_ws_name'] + '" ('+i['secondary_ws_id']+')')  
          print('\n')
print('Done')     



##### Stage 9: Recover warehouse data by running copy pipelines

In [ ]:
bearerToken = notebookutils.credentials.getToken('pbi')
headers = {
"Authorization": f"Bearer {bearerToken}", 
"Content-Type": "application/json"  # Set the content type based on your request
}
print('Starting warehouse recovery pipelines...')
# interate through all the data warehouses to recover
for idx,i in enumerate(warehousedf):
    if p_logging_verbose:
        print('Invoking pipeline to copy warehouse data from  '+i['primary_ws_name'] + '.'+i['primary_name'] + ' into ' + i['secondary_ws_name'] + '.'+i['secondary_name'] )

    src_path = f'abfss://'+i['primary_ws_id']+'@onelake.dfs.fabric.microsoft.com/'+i['primary_id']
    #tgt_path = f'abfss://'+i['secondary_ws_id']+'@onelake.dfs.fabric.microsoft.com/'+i['secondary_id']

    # extract the list of schemas per data 
    schema_list = get_lh_object_list(src_path,['Tables'])
    #display(schema_list)
    # extract a list of warehouse objects per schema and store in a list
    table_list = get_wh_object_list(schema_list['name'].to_list(),src_path)

    tablesToCopyParam = table_list[['schema','name']].to_json( orient='records')
    # ensure the temporary lakehouse exists
    lhname = 'temp_rlh_' + i['primary_ws_name']+'_'+i['primary_name']
    temp_lh_id = getItemId(i['secondary_ws_id'],lhname,'Lakehouse')
    
    # obtain the connection string for the warehouse to pass to the copy pipeline
    whurl  = "v1/workspaces/" + i['secondary_ws_id'] + "/warehouses/" + i['secondary_id']
    whresponse = client.get(whurl)
    connStr = whresponse.json()['properties']['connectionInfo']

    recovery_pipeline = recovery_pipeline_prefix+'_'+i['primary_ws_name'] + '_'+i['primary_name'][:256]
    # obtain the pipeline id created to recover this warehouse
    plid = getItemId( i['secondary_ws_id'],recovery_pipeline,'DataPipeline')
    if plid == 'NotExists':
        print('Error: Could not execute pipeline '+recovery_pipeline+ ' as the ID could not be obtained ')
    else:
        # pipeline url including pipeline Id unique to each warehouse
        plurl = 'v1/workspaces/'+i['secondary_ws_id'] +'/items/'+plid+'/jobs/instances?jobType=Pipeline'
        #print(plurl)

        payload_data = '{' \
            '"executionData": {' \
                '"parameters": {' \
                    '"lakehouseId": "' + temp_lh_id + '",' \
                    '"tablesToCopy": ' + tablesToCopyParam + ',' \
                    '"workspaceId": "' + i['secondary_ws_id'] +'",' \
                    '"warehouseId": "' + i['secondary_id'] + '",' \
                    '"connStr": "' + connStr + '"' \
                    '}}}'
        #print(payload_data)
        plresponse = client.post(plurl, json=json.loads(payload_data), headers=headers)
        if p_logging_verbose:
            print(str(plresponse.status_code))      
print('Done')

##### Stage 10: Add workspace roles assignments to the new workspaces

In [ ]:
if p_add_ws_role_assignments:
    ws_role_sql = "SELECT wks.ID new_workspace_id, wks.name new_workspace, rar.* FROM wsroleassignments_recovered rar inner join workspaces wks on rar.workspaceName = replace(wks.Name,'" + p_secondary_ws_suffix+ "','')" \
                "where wks.name like '%"+p_secondary_ws_suffix+"' and wks.id != '"+thisWsId+"'" 

    # Only apply roles to the (new) workspaces based the list of workspaces defined in the parameter section at the top of this notebook. 
    # Note that the list is based on the workspace name defined in the primary but will be translated to the associated (new) workspace recently created in the secondary.
    if len(p_list_of_workspaces_to_recover)>0:
        ws_role_sql = ws_role_sql+" and rar.workspaceName in ('" +  "', '".join(p_list_of_workspaces_to_recover)+ "') "

    # Ingore workspaces based on the ignore list defined in the parameter section at the top of this notebook
    if len(p_ws_ignore_list)>0:
        ws_role_sql = ws_role_sql+ " and rar.workspaceName not in ('" + "', '".join(p_ws_ignore_list)+ "') "

    if len(p_ws_ignore_like_list)>0:
        for notlike in p_ws_ignore_like_list:
            ws_role_sql  = ws_role_sql + " and name not like '" + notlike + "'"
    
    print('Adding workspace role assignments...')

    #print(ws_role_sql)
    dfwsrole = spark.sql(ws_role_sql).collect()
    for idx,i in enumerate(dfwsrole):
        wsroleurl = "/v1/workspaces/" + i['new_workspace_id'] + "/roleAssignments"
        wsrolepayload = '{"principal": {"id": "'+i['principalId']+'", "type": "'+i['principalType']+'"},"role": "'+i['role']+'"}'
        #print(str(wsrolepayload))
        
        try:
            if p_logging_verbose:
                print("Attempting to add role assignments " + i['role'] + " for " +  i['principalType'] + " princpal " +i['displayName'] + " (" +i['principalId'] + ") to workspace " + i['new_workspace'] + "("+ i['new_workspace_id'] + ")...")

            response = client.post(wsroleurl,json= json.loads(wsrolepayload))

            success = True
        except Exception as error:
            error_string = str(error)
            error_index = error_string.find("Error:")

            # Extract the substring after "Error:"
            error_message = error_string[error_index + len("Error:"):].strip()
            headers_index = error_message.find("Headers:")

            # Extract the substring before "Headers:"
            error_message = error_message[:headers_index].strip()
            error_data = json.loads(error_message)
            # Extract the error message
            error_message = error_data.get("message", "")
            if error_message is not None:
                errmsg =  "Couldn't add role assignment " + i['role'] + " for princpal " +i['displayName'] + " to workspace " + i['workspaceName'] + "("+ i['workspaceId'] + "). Error: "+error_message
            else:
                errmsg =  "Couldn't add role assignment " + i['role'] + " for princpal " +i['displayName'] + " to workspace " + i['workspaceName'] + "("+ i['workspaceId'] + ")."
            print(str(errmsg))
            success = False
print('Done')


##### Clean up - deletes recovered workspaces!!
<div style="display: flex; align-items: flex-end;"><img style="float: left; margin-right: 10px;" src="https://github.com/hurtn/images/blob/main/stop.png?raw=true" width="50"><span><h6>Only run the cell below if you are re-testing this process  or do not wish to keep the recovered workspaces in the secondary.<br>Keeping the cell frozen ensures it is not run when the Run all button is used.</span></div>

In [ ]:
# Please ensure you have run the workspaceutils command at the top of this notebook before running this cell to ensure all necessary imports and variables are loaded.

print('Refreshing the workspaces metadata table...')
# Refresh the list of current workspaces
saveWorkspaceMeta()

thisWsId = notebookutils.runtime.context['currentWorkspaceId'] #obtaining this so we don't accidently delete this workspace!

delete_ws_sql = "SELECT distinct ID,Type,Name FROM workspaces where Name like '%"+p_secondary_ws_suffix+"' and id != '"+thisWsId+"'" 
print('Deleting workspaces...')
# Get all workspaces created with the prefix set in the parameters at the top so that they can be deleted, except for this workspace of course!
df = spark.sql(delete_ws_sql).collect()
for i in df:
    #print(i['ID'])
    if i['Type'] == 'Workspace':
      workspaceId = i['ID']
      if p_logging_verbose:
        print("Deleting workspace "+i['Name'] + '(' + i['ID'] + ')')
      response = client.delete("/v1/workspaces/"+workspaceId)
      if p_logging_verbose and response.status_code ==200:
        print('Successfully deleted')

print('Refreshing the workspaces metadata table after deleting recovered workspaces...')
# now refresh workspaces
saveWorkspaceMeta()
print('Done')
