####Databricks Notebook Path Migration and Validation

In [0]:
#Install Pyyaml library
%pip install pyyaml

In [0]:
#Import the library
import yaml 

In [0]:
# Load configuration from the YAML file
with open("/Workspace/Users/prasaddhumal2145@gmail.com/config.yaml", 'r') as file:
    config = yaml.safe_load(file)


In [0]:
#Extract configuration values from the loaded YAML file
DATABRICKS_Host = config.get('DATABRICKS_Host')
DATABRICKS_TOKEN = config.get('DATABRICKS_TOKEN')
UC_CATALOG_NAME = config.get('UC_CATALOG_NAME')
UPDATED_DIRECTORY = config.get('UPDATED_DIRECTORY')



###Databricks Notebook Path Migration

In [0]:
import requests
import re
import base64

databricks_host = DATABRICKS_Host   # Set your Databricks host URL

# Set the UC catalog name to replace hive_metastore
uc_catalog_name = UC_CATALOG_NAME

def list_notebooks_in_directory(directory_path, headers):
    """List all notebooks in the specified directory, including subdirectories."""
    url = f"{databricks_host}/api/2.0/workspace/list"
    response = requests.get(url, headers=headers, params={"path": directory_path})
    
    if response.status_code != 200:
        print(f"Failed to list notebooks: {response.status_code} - {response.text}")
        return []

    try:
        return response.json().get('objects', [])
    except Exception as e:
        print(f"Failed to parse JSON response: {e}")
        print(f"Response content: {response.text}")
        return []

def read_notebook_content(notebook_path, headers):
    """Read the content of a notebook from Databricks."""
    url = f"{databricks_host}/api/2.0/workspace/export"
    response = requests.get(url, headers=headers, params={"path": notebook_path, "format": "SOURCE"})
    
    if response.status_code == 200:
        content = response.json().get('content', '')
        return base64.b64decode(content).decode('utf-8')
    else:
        print(f"Failed to read notebook content for {notebook_path}: {response.text}")
        return None

def contains_hive_metastore_path(content):
    """Check if notebook content contains any reference to 'hive_metastore'."""
    hive_path_pattern = r"\bhive_metastore\.(\w+)(\.\w+)?\b"
    return bool(re.search(hive_path_pattern, content))

def find_notebooks_with_hive_metastore(directory_path):
    headers = {
        "Authorization": f"Bearer {DATABRICKS_TOKEN}"
    }
    notebooks_with_hive_metastore = []
    all_notebooks = list_notebooks_in_directory(directory_path, headers)
    
    # Iterate over each notebook to check for hive_metastore references
    for notebook in all_notebooks:
        if notebook['object_type'] == 'NOTEBOOK':
            notebook_path = notebook['path']
            content = read_notebook_content(notebook_path, headers)
            
            if content and contains_hive_metastore_path(content):
                notebooks_with_hive_metastore.append(notebook_path)
        
        elif notebook['object_type'] == 'DIRECTORY':
            # Recursively check subdirectories
            notebooks_with_hive_metastore.extend(find_notebooks_with_hive_metastore(notebook['path']))
    
    return notebooks_with_hive_metastore

def update_notebook_paths(content):
    # Pattern to match only full `hive_metastore.database.table` references, ignoring comments and standalone words
    hive_path_pattern = r"\bhive_metastore\.(\w+)(\.\w+)\b"
    
    # Replace with the UC catalog, preserving database and table
    updated_content = re.sub(hive_path_pattern, f"{uc_catalog_name}.\\1\\2", content)
    return updated_content

def update_notebook_in_databricks(notebook_path, updated_content, headers):
    url = f"{databricks_host}/api/2.0/workspace/import"
    encoded_content = base64.b64encode(updated_content.encode('utf-8')).decode('utf-8')
    data = {
        "path": notebook_path,
        "format": "SOURCE",
        "language": "PYTHON",  # Adjust this based on the notebook language
        "content": encoded_content,
        "overwrite": True  # Overwrite existing notebooks
    }
    
    response = requests.post(url, headers=headers, json=data)
    
    if response.status_code == 200:
        print(f"Successfully updated {notebook_path}")
    else:
        print(f"Failed to update notebook {notebook_path}: {response.text}")

def update_notebooks_with_hive_metastore(directory_path):
    headers = {
        "Authorization": f"Bearer {DATABRICKS_TOKEN}"
    }
    
    # Find all notebooks that need migration
    notebooks_to_update = find_notebooks_with_hive_metastore(directory_path)
    
    # Show the list of notebooks needing migration
    if notebooks_to_update:
        print("Notebooks needing migration:")
        for notebook_path in notebooks_to_update:
            print(notebook_path)
    else:
        print("No notebooks with 'hive_metastore' references found.")

    # Update each notebook with hive_metastore paths
    for notebook_path in notebooks_to_update:
        content = read_notebook_content(notebook_path, headers)
        if content:
            updated_content = update_notebook_paths(content)
            update_notebook_in_databricks(notebook_path, updated_content, headers)

# Run the function to find and update notebooks with hive_metastore references
update_notebooks_with_hive_metastore(UPDATED_DIRECTORY)


#### Testing script that checks if the notebooks have the correct path after the update

In [0]:
import requests
import base64
import re

# Databricks host and token
databricks_host = DATABRICKS_Host
headers = {
    "Authorization": f"Bearer {DATABRICKS_TOKEN}" # Replace with your token
}

# UC catalog name
uc_catalog_name = UC_CATALOG_NAME

# Path to check for changes
directory_path = UPDATED_DIRECTORY

def check_notebook_paths_in_directory(directory_path):
    # List all notebooks in the directory
    url = f"{databricks_host}/api/2.0/workspace/list"
    response = requests.get(url, headers=headers, params={"path": directory_path})
    
    if response.status_code != 200:
        print(f"Failed to list notebooks: {response.status_code} - {response.text}")
        return

    try:
        notebooks = response.json().get('objects', [])
    except Exception as e:
        print(f"Failed to parse JSON response: {e}")
        print(f"Response content: {response.text}")
        return

    # Iterate over notebooks and check paths
    for notebook in notebooks:
        if notebook['object_type'] == 'NOTEBOOK':
            notebook_path = notebook['path']
            print(f"Checking notebook: {notebook_path}")
            
            # Read notebook content
            content = read_notebook_content(notebook_path)
            if content:
                # Check if Hive paths have been replaced
                check_path_replacement(content, notebook_path)

        elif notebook['object_type'] == 'DIRECTORY':
            # Recursively check notebooks in directories
            check_notebook_paths_in_directory(notebook['path'])

def read_notebook_content(notebook_path):
    url = f"{databricks_host}/api/2.0/workspace/export"
    response = requests.get(url, headers=headers, params={"path": notebook_path, "format": "SOURCE"})
    
    if response.status_code == 200:
        content = response.json().get('content', '')
        return base64.b64decode(content).decode('utf-8')
    else:
        print(f"Failed to read notebook content for {notebook_path}: {response.text}")
        return None

def check_path_replacement(content, notebook_path):
    # Check for any remaining 'hive_metastore' references
    if "hive_metastore" in content:
        print(f"Path not updated correctly in: {notebook_path}")
    else:
        # Check if it now contains references to the UC catalog
        uc_pattern = f"{uc_catalog_name}\\."
        if re.search(uc_pattern, content):
            print(f"Notebook paths updated correctly in: {notebook_path}")
        else:
            print(f"No Hive or UC references found in: {notebook_path}")

# Run the check process on your notebook directory
check_notebook_paths_in_directory(UPDATED_DIRECTORY)
