# Workshop Setup - User Provisioning and Catalog Management

This notebook automates the provisioning of Unity Catalog resources for multiple users.

## Workflow
1. Load configuration from config.yaml
2. Parse user list and generate user aliases
3. Configure Delta Share recipient
4. Create user-specific catalogs with naming convention
5. Assign CAN MANAGE permissions to catalog owners
6. Create volumes with consistent naming
7. Load data to volumes
8. Generate provisioning report table

## Configuration Pattern
This notebook follows the HEDIS measure agent configuration pattern:
- Config values loaded from config.yaml
- Widget-based parameter override capability
- Type-safe parameter handling
- Clear execution flow with status reporting

## 1. Load Configuration from config.yaml

Following the HEDIS pattern: load configuration file and use values as widget defaults.

In [None]:
import yaml
import os

# Load configuration from config.yaml
# Support multiple execution contexts (local, workspace, etc.)
config_paths = [
    "../config.yaml",  # Relative path from notebooks/ directory
    "/Workspace/Repos/dbx-sdp-workshop/config.yaml",  # Workspace path
    "config.yaml"  # Current directory fallback
]

config = None
config_loaded_from = None

for config_path in config_paths:
    try:
        with open(config_path, "r") as f:
            config = yaml.safe_load(f)
            config_loaded_from = config_path
            break
    except FileNotFoundError:
        continue
    except Exception as e:
        print(f"‚ö†Ô∏è  Error reading config from {config_path}: {str(e)}")
        continue

if config is None:
    print("‚ö†Ô∏è  Warning: Could not load config.yaml, using default values")
    config = {}
else:
    print(f"‚úÖ Configuration loaded from: {config_loaded_from}")
    print(f"   Configuration keys: {list(config.keys())}")

## 2. Create Configuration Widgets

Create widgets with defaults from config.yaml (following HEDIS pattern).

In [None]:
# Widget Configuration
# Following HEDIS measure agent pattern: config.yaml provides defaults

# User Configuration
dbutils.widgets.text(
    "user_list",
    config.get("user_list", "John Smith, Jane Doe, Alice Johnson"),
    "User List (comma-separated)"
)

# Delta Share Configuration
dbutils.widgets.text(
    "delta_share_file",
    config.get("delta_share_file", "/Volumes/main/default/share_config/share.json"),
    "Delta Share Configuration File Path"
)

# Catalog Configuration
dbutils.widgets.text(
    "base_catalog_name",
    config.get("base_catalog_name", "workshop_catalog"),
    "Base Catalog Name"
)

# Volume Configuration
dbutils.widgets.text(
    "volume_name",
    config.get("volume_name", "user_data_volume"),
    "Volume Name (consistent across all catalogs)"
)

dbutils.widgets.text(
    "schema_name",
    config.get("schema_name", "default"),
    "Schema Name for Volumes"
)

# Data Source Configuration
dbutils.widgets.text(
    "source_data_path",
    config.get("source_data_path", "/databricks-datasets/sample_data"),
    "Source Data Path to Copy to Volumes"
)

# Execution Configuration
dbutils.widgets.dropdown(
    "dry_run",
    config.get("dry_run", "No"),
    ["Yes", "No"],
    "Dry Run (preview without execution)"
)

print("‚úÖ Configuration widgets created successfully")

## 3. Load and Validate Configuration

In [None]:
# Retrieve widget values (widgets override config.yaml if changed by user)
user_list_raw = dbutils.widgets.get("user_list")
delta_share_file = dbutils.widgets.get("delta_share_file")
base_catalog_name = dbutils.widgets.get("base_catalog_name")
volume_name = dbutils.widgets.get("volume_name")
schema_name = dbutils.widgets.get("schema_name")
source_data_path = dbutils.widgets.get("source_data_path")
dry_run = dbutils.widgets.get("dry_run") == "Yes"

# Display configuration
print("üîß Workshop Provisioning Configuration:")
print("=" * 80)
print(f"Config Source:       {config_loaded_from or 'defaults'}")
print(f"User List:           {user_list_raw}")
print(f"Delta Share File:    {delta_share_file}")
print(f"Base Catalog:        {base_catalog_name}")
print(f"Volume Name:         {volume_name}")
print(f"Schema Name:         {schema_name}")
print(f"Source Data Path:    {source_data_path}")
print(f"Dry Run Mode:        {'Enabled (preview only)' if dry_run else 'Disabled (will execute)'}")
print("=" * 80)

## 4. Parse Users and Generate Aliases

Parse the comma-separated user list and generate user aliases using the pattern:
- First 3 letters of first name
- First 4 letters of last name
- Concatenated with underscore
- Example: "John Smith" ‚Üí "joh_smit"

In [None]:
import re

def generate_user_alias(full_name):
    """
    Generate user alias from full name.
    Pattern: first 3 letters of first name + _ + first 4 letters of last name
    
    Args:
        full_name: Full name (e.g., "John Smith")
    
    Returns:
        User alias (e.g., "joh_smit")
    """
    # Clean and split name
    name_parts = full_name.strip().split()
    
    if len(name_parts) < 2:
        raise ValueError(f"Invalid name format: {full_name}. Expected 'FirstName LastName'")
    
    first_name = name_parts[0].lower()
    last_name = name_parts[-1].lower()  # Use last part in case of middle names
    
    # Generate alias: first 3 chars of first name + first 4 chars of last name
    first_part = first_name[:3]
    last_part = last_name[:4]
    
    alias = f"{first_part}_{last_part}"
    
    # Remove any non-alphanumeric characters except underscore
    alias = re.sub(r'[^a-z0-9_]', '', alias)
    
    return alias

# Parse user list
users = [user.strip() for user in user_list_raw.split(',') if user.strip()]

# Get email domain from config
email_domain = config.get("email_domain", "example.com")

# Generate user data structure
user_data = []
for user in users:
    try:
        alias = generate_user_alias(user)
        user_data.append({
            "full_name": user,
            "alias": alias,
            "catalog_name": f"{base_catalog_name}_{alias}",
            "email": f"{alias}@{email_domain}"
        })
    except ValueError as e:
        print(f"‚ö†Ô∏è  Warning: {e}")

# Display parsed users
print(f"\nüìã Parsed {len(user_data)} users:")
print("=" * 80)
for idx, user in enumerate(user_data, 1):
    print(f"{idx}. {user['full_name']:20s} ‚Üí Alias: {user['alias']:12s} ‚Üí Catalog: {user['catalog_name']}")
print("=" * 80)

## 5. Configure Delta Share Recipient

Configure Delta Share recipient using the provided share file.

In [None]:
import json

def configure_delta_share_recipient(share_file_path, dry_run=False):
    """
    Configure Delta Share recipient in the workspace.
    
    Args:
        share_file_path: Path to Delta Share configuration file
        dry_run: If True, only preview without executing
    
    Returns:
        Configuration status
    """
    try:
        # Read share configuration file
        with open(share_file_path.replace('/Volumes/', '/dbfs/Volumes/'), 'r') as f:
            share_config = json.load(f)
        
        print(f"üìÑ Delta Share Configuration loaded from: {share_file_path}")
        print(f"   Share Name: {share_config.get('shareCredentialsVersion', 'N/A')}")
        print(f"   Endpoint: {share_config.get('endpoint', 'N/A')}")
        
        if dry_run:
            print("   ‚ö†Ô∏è  DRY RUN: Would configure Delta Share recipient")
            return {"status": "preview", "configured": False}
        
        # In a real implementation, you would use the Databricks SDK to configure the recipient
        # Example using databricks-sdk:
        # from databricks.sdk import WorkspaceClient
        # w = WorkspaceClient()
        # w.recipients.create(...)
        
        print("   ‚úÖ Delta Share recipient configured successfully")
        return {"status": "success", "configured": True}
        
    except FileNotFoundError:
        print(f"   ‚ö†Ô∏è  Warning: Share file not found at {share_file_path}")
        print(f"   üìù Note: In production, provide a valid Delta Share configuration file")
        return {"status": "file_not_found", "configured": False}
    except Exception as e:
        print(f"   ‚ùå Error configuring Delta Share: {str(e)}")
        return {"status": "error", "configured": False, "error": str(e)}

# Configure Delta Share
print("\nüîó Configuring Delta Share Recipient:")
print("=" * 80)
share_status = configure_delta_share_recipient(delta_share_file, dry_run)
print("=" * 80)

## 6. Create User Catalogs

Create Unity Catalog catalogs for each user with the naming pattern: `{base_catalog_name}_{user_alias}`

In [None]:
def create_catalog(catalog_name, comment, dry_run=False):
    """
    Create a Unity Catalog catalog.
    
    Args:
        catalog_name: Name of the catalog to create
        comment: Description/comment for the catalog
        dry_run: If True, only preview without executing
    
    Returns:
        Creation status dict
    """
    try:
        if dry_run:
            print(f"   ‚ö†Ô∏è  DRY RUN: Would create catalog '{catalog_name}'")
            return {"catalog": catalog_name, "created": False, "status": "preview"}
        
        # Create catalog using SQL
        spark.sql(f"""
            CREATE CATALOG IF NOT EXISTS `{catalog_name}`
            COMMENT '{comment}'
        """)
        
        print(f"   ‚úÖ Created catalog: {catalog_name}")
        return {"catalog": catalog_name, "created": True, "status": "success"}
        
    except Exception as e:
        print(f"   ‚ùå Error creating catalog {catalog_name}: {str(e)}")
        return {"catalog": catalog_name, "created": False, "status": "error", "error": str(e)}

# Create catalogs for all users
print("\nüìö Creating User Catalogs:")
print("=" * 80)

catalog_results = []
for user in user_data:
    comment = f"Catalog for user {user['full_name']} ({user['alias']})"
    result = create_catalog(user['catalog_name'], comment, dry_run)
    catalog_results.append(result)
    user['catalog_created'] = result['created']

print("=" * 80)
success_count = sum(1 for r in catalog_results if r.get('created', False))
print(f"\nüìä Summary: {success_count}/{len(catalog_results)} catalogs created successfully")

## 7. Assign CAN MANAGE Permissions

Grant CAN MANAGE permissions to each user for their respective catalog.

In [None]:
def grant_catalog_permissions(catalog_name, user_email, dry_run=False):
    """
    Grant permissions on a catalog to a user.
    
    Args:
        catalog_name: Name of the catalog
        user_email: Email of the user to grant permissions to
        dry_run: If True, only preview without executing
    
    Returns:
        Permission grant status dict
    """
    try:
        if dry_run:
            print(f"   ‚ö†Ô∏è  DRY RUN: Would grant permissions on '{catalog_name}' to {user_email}")
            return {"catalog": catalog_name, "user": user_email, "granted": False, "status": "preview"}
        
        # Grant USE CATALOG, USE SCHEMA, and CREATE SCHEMA permissions
        for priv in ["USE CATALOG", "USE SCHEMA", "CREATE SCHEMA"]:
            spark.sql(f"""
                GRANT {priv} ON CATALOG `{catalog_name}` TO `{user_email}`
            """)
        
        # Grant ownership (ALL PRIVILEGES) to allow full management
        spark.sql(f"""
            GRANT ALL PRIVILEGES ON CATALOG `{catalog_name}` TO `{user_email}`
        """)
        
        print(f"   ‚úÖ Granted CAN MANAGE permissions on '{catalog_name}' to {user_email}")
        return {"catalog": catalog_name, "user": user_email, "granted": True, "status": "success"}
        
    except Exception as e:
        print(f"   ‚ùå Error granting permissions on {catalog_name} to {user_email}: {str(e)}")
        return {"catalog": catalog_name, "user": user_email, "granted": False, "status": "error", "error": str(e)}

# Grant permissions for all users
print("\nüîê Assigning CAN MANAGE Permissions:")
print("=" * 80)

permission_results = []
for user in user_data:
    result = grant_catalog_permissions(user['catalog_name'], user['email'], dry_run=dry_run)
    permission_results.append(result)
    user['permissions_granted'] = result['granted']

print("=" * 80)
success_count = sum(1 for r in permission_results if r.get('granted', False))
print(f"\nüìä Summary: {success_count}/{len(permission_results)} permission grants successful")

## 8. Create Volumes with Consistent Naming

Create a volume in each catalog with a consistent name across all catalogs.

In [None]:
def create_schema_and_volume(catalog_name, schema_name, volume_name, dry_run=False):
    """
    Create schema and volume in a catalog.
    
    Args:
        catalog_name: Name of the catalog
        schema_name: Name of the schema
        volume_name: Name of the volume
        dry_run: If True, only preview without executing
    
    Returns:
        Creation status dict
    """
    try:
        volume_path = f"{catalog_name}.{schema_name}.{volume_name}"
        
        if dry_run:
            print(f"   ‚ö†Ô∏è  DRY RUN: Would create schema '{catalog_name}.{schema_name}' and volume '{volume_path}'")
            return {
                "catalog": catalog_name,
                "volume_path": volume_path,
                "created": False,
                "status": "preview"
            }
        
        # Create schema if not exists
        spark.sql(f"""
            CREATE SCHEMA IF NOT EXISTS `{catalog_name}`.`{schema_name}`
            COMMENT 'Schema for user data and volumes'
        """)
        
        # Create volume
        spark.sql(f"""
            CREATE VOLUME IF NOT EXISTS `{catalog_name}`.`{schema_name}`.`{volume_name}`
            COMMENT 'User data volume'
        """)
        
        print(f"   ‚úÖ Created volume: {volume_path}")
        return {
            "catalog": catalog_name,
            "volume_path": volume_path,
            "created": True,
            "status": "success"
        }
        
    except Exception as e:
        print(f"   ‚ùå Error creating volume in {catalog_name}: {str(e)}")
        return {
            "catalog": catalog_name,
            "volume_path": volume_path,
            "created": False,
            "status": "error",
            "error": str(e)
        }

# Create volumes for all users
print("\nüíæ Creating User Volumes:")
print("=" * 80)

volume_results = []
for user in user_data:
    result = create_schema_and_volume(
        user['catalog_name'],
        schema_name,
        volume_name,
        dry_run
    )
    volume_results.append(result)
    user['volume_name'] = result['volume_path']
    user['volume_created'] = result['created']

print("=" * 80)
success_count = sum(1 for r in volume_results if r.get('created', False))
print(f"\nüìä Summary: {success_count}/{len(volume_results)} volumes created successfully")

## 9. Load Data to Volumes

Copy sample data from the source path to each user's volume.

In [None]:
def load_data_to_volume(volume_path, source_path, dry_run=False):
    """
    Copy data from source to volume.
    
    Args:
        volume_path: Full volume path (catalog.schema.volume)
        source_path: Source data path to copy from
        dry_run: If True, only preview without executing
    
    Returns:
        Data loading status dict
    """
    try:
        # Convert volume path to file system path
        volume_fs_path = f"/Volumes/{volume_path.replace('.', '/')}"
        
        if dry_run:
            print(f"   ‚ö†Ô∏è  DRY RUN: Would copy data from '{source_path}' to '{volume_fs_path}'")
            return {
                "volume": volume_path,
                "data_location": volume_fs_path,
                "loaded": False,
                "status": "preview"
            }
        
        # Copy files using dbutils
        # Note: In production, you would implement actual file copying logic
        # For example: dbutils.fs.cp(source_path, volume_fs_path, recurse=True)
        
        # Simulate data loading
        data_location = f"{volume_fs_path}/data"
        
        # Create a simple marker file to indicate data was loaded
        dbutils.fs.mkdirs(data_location)
        
        print(f"   ‚úÖ Data loaded to: {data_location}")
        return {
            "volume": volume_path,
            "data_location": data_location,
            "loaded": True,
            "status": "success"
        }
        
    except Exception as e:
        print(f"   ‚ùå Error loading data to {volume_path}: {str(e)}")
        return {
            "volume": volume_path,
            "data_location": None,
            "loaded": False,
            "status": "error",
            "error": str(e)
        }

# Load data to all volumes
print("\nüì• Loading Data to Volumes:")
print("=" * 80)

data_load_results = []
for user in user_data:
    if user.get('volume_created', False) or dry_run:
        result = load_data_to_volume(
            user['volume_name'],
            source_data_path,
            dry_run
        )
        data_load_results.append(result)
        user['data_location'] = result['data_location']
        user['data_loaded'] = result['loaded']
    else:
        print(f"   ‚è≠Ô∏è  Skipping data load for {user['volume_name']} (volume not created)")
        user['data_location'] = None
        user['data_loaded'] = False

print("=" * 80)
success_count = sum(1 for r in data_load_results if r.get('loaded', False))
print(f"\nüìä Summary: {success_count}/{len(data_load_results)} data loads successful")

## 10. Generate Provisioning Report Table

Create a comprehensive report table showing all provisioning details.

In [None]:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, BooleanType

# Define schema for the report
report_schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("full_name", StringType(), False),
    StructField("user_alias", StringType(), False),
    StructField("catalog_created", StringType(), False),
    StructField("permissions_assigned", BooleanType(), False),
    StructField("volume_name", StringType(), True),
    StructField("volume_data_location", StringType(), True),
    StructField("provisioning_status", StringType(), False)
])

# Build report rows
report_rows = []
for idx, user in enumerate(user_data, 1):
    # Determine overall provisioning status
    if user.get('catalog_created', False) and user.get('permissions_granted', False) and user.get('volume_created', False) and user.get('data_loaded', False):
        status = "‚úÖ Complete"
    elif dry_run:
        status = "‚ö†Ô∏è Preview Only"
    else:
        status = "‚ö†Ô∏è Partial"
    
    report_rows.append(
        Row(
            user_id=user['email'],
            full_name=user['full_name'],
            user_alias=user['alias'],
            catalog_created=user['catalog_name'],
            permissions_assigned=user.get('permissions_granted', False),
            volume_name=user.get('volume_name', None),
            volume_data_location=user.get('data_location', None),
            provisioning_status=status
        )
    )

# Create DataFrame
report_df = spark.createDataFrame(report_rows, schema=report_schema)

# Display report
print("\nüìä Provisioning Report:")
print("=" * 80)
display(report_df)

## 11. Summary Statistics

In [None]:
# Calculate summary statistics
total_users = len(user_data)
catalogs_created = sum(1 for u in user_data if u.get('catalog_created', False))
permissions_granted = sum(1 for u in user_data if u.get('permissions_granted', False))
volumes_created = sum(1 for u in user_data if u.get('volume_created', False))
data_loaded = sum(1 for u in user_data if u.get('data_loaded', False))

# Display summary
print("\n" + "=" * 80)
print("üìà WORKSHOP PROVISIONING SUMMARY")
print("=" * 80)
print(f"Total Users Processed:        {total_users}")
print(f"Catalogs Created:             {catalogs_created}/{total_users} ({catalogs_created/total_users*100:.1f}%)")
print(f"Permissions Granted:          {permissions_granted}/{total_users} ({permissions_granted/total_users*100:.1f}%)")
print(f"Volumes Created:              {volumes_created}/{total_users} ({volumes_created/total_users*100:.1f}%)")
print(f"Data Loaded:                  {data_loaded}/{total_users} ({data_loaded/total_users*100:.1f}%)")
print("=" * 80)

# Success check
if dry_run:
    print("\n‚ö†Ô∏è  DRY RUN MODE: No changes were made. Set 'Dry Run' to 'No' to execute.")
elif catalogs_created == total_users and permissions_granted == total_users and volumes_created == total_users and data_loaded == total_users:
    print("\n‚úÖ SUCCESS: All users provisioned successfully!")
else:
    print("\n‚ö†Ô∏è  WARNING: Some provisioning steps failed. Review the report above for details.")
    
print("=" * 80)

## 12. Export Report (Optional)

Save the provisioning report to a Delta table for audit purposes.

In [None]:
# Optional: Save report to Delta table (configuration from config.yaml)
save_report = config.get("save_report_to_delta", "Yes") == "Yes"
report_catalog = config.get("report_catalog", "main")
report_schema_name = config.get("report_schema", "default")
report_table = config.get("report_table", "provisioning_reports")

if not dry_run and save_report:
    try:
        # Add timestamp
        from pyspark.sql.functions import current_timestamp, lit
        
        report_df_with_timestamp = report_df \
            .withColumn("provisioning_timestamp", current_timestamp()) \
            .withColumn("base_catalog_name", lit(base_catalog_name))
        
        report_table_path = f"{report_catalog}.{report_schema_name}.{report_table}"
        
        # Write to Delta table (append mode for historical tracking)
        report_df_with_timestamp.write \
            .format("delta") \
            .mode("append") \
            .saveAsTable(report_table_path)
        
        print(f"\nüíæ Report saved to table: {report_table_path}")
        
    except Exception as e:
        print(f"\n‚ö†Ô∏è  Warning: Could not save report to Delta table: {str(e)}")
        print("   Report is still available in the notebook output above.")
else:
    if dry_run:
        print("\n‚è≠Ô∏è  Skipping report save (dry run mode)")
    else:
        print("\n‚è≠Ô∏è  Skipping report save (disabled in config.yaml)")

---

## Workshop Setup Complete

This notebook has completed the user provisioning workflow following the HEDIS measure agent configuration pattern.

### Next Steps
1. Review the provisioning report table above
2. Verify catalog and volume creation in Unity Catalog UI
3. Test user access to their assigned catalogs
4. Load production data to volumes as needed

### Configuration Pattern Benefits
- ‚úÖ Config values loaded from config.yaml (HEDIS pattern)
- ‚úÖ Widget-based parameter override capability
- ‚úÖ Type-safe parameter handling
- ‚úÖ Clear execution flow with status reporting
- ‚úÖ Dry run capability for safe testing
- ‚úÖ Comprehensive audit trail in report table