In [0]:
dbutils.library.restartPython() 

In [0]:
import requests
import json
import time
from typing import Dict, Any, List
from datetime import datetime
import pandas as pd

# Configure notebook widgets for parameters
dbutils.widgets.removeAll()
dbutils.widgets.text("pat_token", "", "Personal Access Token")
dbutils.widgets.text("catalog_name", "security_demo", "Catalog Name")
dbutils.widgets.text("schema_name", "orders_data", "Schema Name")

dbutils.widgets.text("app_sp_client_id", "", "Entra ID App Client ID")
dbutils.widgets.text("app_sp_client_secret", "", "Entra ID App Client Secret")
dbutils.widgets.text("app_sp_tenant_id", "", "Azure Tenant ID")
dbutils.widgets.text("app_sp_display_name", "marcin-sp-app-orchestrator", "App SP Display Name")

# Get widget values
pat_token = dbutils.widgets.get("pat_token")
catalog_name = dbutils.widgets.get("catalog_name")
schema_name = dbutils.widgets.get("schema_name")

# Get Entra ID SP credentials
app_sp_client_id = dbutils.widgets.get("app_sp_client_id")
app_sp_client_secret = dbutils.widgets.get("app_sp_client_secret")
app_sp_tenant_id = dbutils.widgets.get("app_sp_tenant_id")
app_sp_display_name = dbutils.widgets.get("app_sp_display_name")

# Get workspace URL from notebook context
workspace_url = spark.conf.get("spark.databricks.workspaceUrl")
if not workspace_url.startswith("https://"):
    workspace_url = f"https://{workspace_url}"

# Validate inputs
if not pat_token:
    raise ValueError("Please provide pat_token through widget")

print(f"✅ Configuration loaded")
print(f"   Workspace: {workspace_url}")
print(f"   Catalog: {catalog_name}")
print(f"   Schema: {schema_name}")


## 1. Register Entra ID SP in Databricks

In [0]:
from entra_sp_manager import EntraIDServicePrincipalManager

try:
    # Initialize the Entra ID manager
    entra_manager = EntraIDServicePrincipalManager(workspace_url, pat_token)
    
    # Register the SP in the workspace
    print(f"\n📝 Registering Entra ID SP in Databricks workspace...")
    app_sp = entra_manager.register_sp_in_workspace(
        client_id=app_sp_client_id,
        display_name=app_sp_display_name
    )
    
    if app_sp:
        print(f"\n✅ Entra ID SP successfully registered:")
        print(f"   - Display Name: {app_sp.get('displayName')}")
        print(f"   - Application ID: {app_sp.get('applicationId')}")
        print(f"   - Databricks ID: {app_sp.get('id')}")
        
        # Get OAuth token for the SP to use for API calls
        print(f"\n🔑 Obtaining OAuth token for SP authentication...")
        app_sp_token = entra_manager.get_oauth_token(
            tenant_id=app_sp_tenant_id,
            client_id=app_sp_client_id,
            client_secret=app_sp_client_secret
        )
        
        if app_sp_token:
            print(f"✅ OAuth token obtained - jobs will be created BY the Entra ID SP")
            print(f"   Jobs will show '{app_sp.get('displayName')}' as the creator")
        else:
            print(f"⚠️  Could not get OAuth token - jobs will be created by current user")
            print(f"   Check your Entra ID credentials and permissions")
    else:
        print(f"❌ Failed to register Entra ID SP in workspace")
        print(f"   Continuing with current user authentication...")
    
except Exception as e:
    print(f"❌ Error during Entra ID SP setup: {e}")
    print(f"   Continuing with current user authentication...")
    app_sp = None
    app_sp_token = None

## 2. Unity Catalog Setup - Create Tables Using Sample Data

In [0]:
%sql
-- Use existing catalog and schema
USE CATALOG ${catalog_name};
USE SCHEMA ${schema_name};

-- Show current context
SELECT current_catalog(), current_schema();

In [0]:

%sql
-- Create customers table using sample retail data
-- We'll use the samples.tpch.customer table as a base and add regional data
CREATE OR REPLACE TABLE ${catalog_name}.${schema_name}.customers AS
SELECT 
    c_custkey as customer_id,
    c_name as customer_name,
    CONCAT(LOWER(REPLACE(c_name, ' ', '.')), '@example.com') as email,
    c_phone as phone,
    CASE 
        WHEN c_nationkey IN (1, 2, 3, 4, 5) THEN 'US-EAST'
        WHEN c_nationkey IN (6, 7, 8, 9, 10) THEN 'US-WEST'
        WHEN c_nationkey IN (11, 12, 13, 14, 15) THEN 'EU-WEST'
        WHEN c_nationkey IN (16, 17, 18, 19, 20) THEN 'EU-EAST'
        ELSE 'OTHER'
    END as region,
    n.n_name as country,
    CASE c_mktsegment
        WHEN 'BUILDING' THEN 'ENTERPRISE'
        WHEN 'AUTOMOBILE' THEN 'PREMIUM'
        WHEN 'MACHINERY' THEN 'STANDARD'
        WHEN 'HOUSEHOLD' THEN 'BASIC'
        ELSE 'STANDARD'
    END as customer_segment,
    DATE_SUB(current_date(), CAST(RAND() * 365 AS INT)) as created_date
FROM samples.tpch.customer c
JOIN samples.tpch.nation n ON c.c_nationkey = n.n_nationkey
LIMIT 500;  -- Take first 500 customers for demo

-- Show sample of customers table
SELECT * FROM ${catalog_name}.${schema_name}.customers LIMIT 5;

In [0]:
%sql
-- Create orders table using sample retail orders data
-- We'll use samples.tpch.orders and lineitem to create a denormalized orders table
CREATE OR REPLACE TABLE ${catalog_name}.${schema_name}.orders AS
WITH order_details AS (
    SELECT 
        o.o_orderkey as order_id,
        o.o_custkey as customer_id,
        o.o_orderdate as order_date,
        SUM(l.l_extendedprice * (1 - l.l_discount)) as order_amount,
        CASE MOD(o.o_orderkey, 3)
            WHEN 0 THEN 'Software'
            WHEN 1 THEN 'Hardware'
            ELSE 'Services'
        END as product_category,
        CONCAT('rep', MOD(o.o_orderkey, 10), '@company.com') as sales_rep,
        CASE o.o_orderstatus
            WHEN 'F' THEN 'COMPLETED'
            WHEN 'O' THEN 'PENDING'
            ELSE 'PROCESSING'
        END as order_status
    FROM samples.tpch.orders o
    JOIN samples.tpch.lineitem l ON o.o_orderkey = l.l_orderkey
    WHERE o.o_custkey IN (SELECT customer_id FROM ${catalog_name}.${schema_name}.customers)
    GROUP BY o.o_orderkey, o.o_custkey, o.o_orderdate, o.o_orderstatus
    LIMIT 1000  -- Take first 1000 orders for demo
)
SELECT 
    od.order_id,
    od.customer_id,
    od.order_date,
    od.order_amount,
    od.product_category,
    od.sales_rep,
    c.region,
    c.country,
    od.order_status
FROM order_details od
JOIN ${catalog_name}.${schema_name}.customers c ON od.customer_id = c.customer_id;

-- Show sample of orders table
SELECT * FROM ${catalog_name}.${schema_name}.orders LIMIT 5;

In [0]:
# Verify data distribution across regions
display(spark.sql(f"""
    SELECT 
        'customers' as table_name,
        region,
        COUNT(*) as record_count
    FROM {catalog_name}.{schema_name}.customers
    GROUP BY region
    UNION ALL
    SELECT 
        'orders' as table_name,
        region,
        COUNT(*) as record_count
    FROM {catalog_name}.{schema_name}.orders
    GROUP BY region
    ORDER BY table_name, region
"""))

print("✅ Tables created successfully using Databricks sample datasets")
print(f"   - Customers table: Based on samples.tpch.customer")
print(f"   - Orders table: Based on samples.tpch.orders and lineitem")
print(f"   - Data distributed across US-EAST, US-WEST, EU-EAST, EU-WEST regions")

## 3. Service Principal and Group Management (Workspace-Scoped)

In [0]:
from workspace_sp_manager import WorkspaceServicePrincipalManager

sp_manager = WorkspaceServicePrincipalManager(workspace_url, pat_token, spark)
print("✅ Workspace Service Principal Manager initialized")

In [0]:
# Create three service principals with different roles
# Fixed names for service principals
# app_sp_name = "marcin_sp_app_orchestrator"
us_sp_name = "marcin_sp_user_us_region"
eu_sp_name = "marcin_sp_user_eu_region"

# Group names as variables
us_group_name = "marcin_us_region_access"
eu_group_name = "marcin_eu_region_access"
app_group_name = "marcin_job_orchestrators"

print("Creating or retrieving service principals...")

# Use the helper method to get or create service principals
# app_sp = sp_manager.get_or_create_service_principal(
#     app_sp_name, 
#     "Job orchestration and management"
# )

us_sp = sp_manager.get_or_create_service_principal(
    us_sp_name,
    "US region data access only"
)

eu_sp = sp_manager.get_or_create_service_principal(
    eu_sp_name,
    "EU region data access only"
)

In [0]:
## Manually assign the groups via the UI
## @TODO: fix the below.

# if us_sp and us_group:
#     result = sp_manager.add_member_to_uc_group(us_group_name, us_sp)
#     if result:
#         print(f"✅ Added {us_sp_name} to {us_group_name}")
#     else:
#         print(f"ℹ️  {us_sp_name} may already be in {us_group_name}")

# if eu_sp and eu_group:
#     result = sp_manager.add_member_to_uc_group(eu_group_name, eu_sp)
#     if result:
#         print(f"✅ Added {eu_sp_name} to {eu_group_name}")
#     else:
#         print(f"ℹ️  {eu_sp_name} may already be in {eu_group_name}")

# if app_sp and app_group:
#     result = sp_manager.add_member_to_uc_group(app_group_name, app_sp)
#     if result:
#         print(f"✅ Added {app_sp_name} to {app_group_name}")
#     else:
#         print(f"ℹ️  {app_sp_name} may already be in {app_group_name}")

## 4. Grant Least-Privilege Permissions to Groups

In [0]:
print("Granting least-privilege permissions to Unity Catalog groups...")

In [0]:
# US region group - only SELECT on tables they need
spark.sql(f"GRANT USE CATALOG ON CATALOG {catalog_name} TO `{us_sp['applicationId']}`")
spark.sql(f"GRANT USE SCHEMA ON SCHEMA {catalog_name}.{schema_name} TO `{us_sp['applicationId']}`")
spark.sql(f"GRANT SELECT ON TABLE {catalog_name}.{schema_name}.customers TO `{us_sp['applicationId']}`")
spark.sql(f"GRANT SELECT ON TABLE {catalog_name}.{schema_name}.orders TO `{us_sp['applicationId']}`")

# EU region group - only SELECT on customers table (demonstrating different access levels)
spark.sql(f"GRANT USE CATALOG ON CATALOG {catalog_name} TO `{eu_sp['applicationId']}`")
spark.sql(f"GRANT USE SCHEMA ON SCHEMA {catalog_name}.{schema_name} TO `{eu_sp['applicationId']}`")
spark.sql(f"GRANT SELECT ON TABLE {catalog_name}.{schema_name}.customers TO `{eu_sp['applicationId']}`")
# Note: EU group does NOT get access to orders table

# Application orchestrator group - needs additional permissions for job management
# but NO direct data access (principle of least privilege)
spark.sql(f"GRANT USE CATALOG ON CATALOG {catalog_name} TO `{app_sp['applicationId']}`")
spark.sql(f"GRANT USE SCHEMA ON SCHEMA {catalog_name}.{schema_name} TO `{app_sp['applicationId']}`")

print("✅ Least-privilege permissions granted to Unity Catalog groups:")
print(f"   - {us_group_name}: SELECT on customers and orders (filtered by RLS)")
print(f"   - {eu_group_name}: SELECT on customers only (filtered by RLS)")
print(f"   - {app_group_name}: Catalog/schema access only (for job creation, no data access)")

## 5. Implement Row-Level Security (RLS)

In [0]:
spark.sql(f"""
    CREATE OR REPLACE FUNCTION {catalog_name}.{schema_name}.region_row_filter(region_value STRING)
    RETURNS BOOLEAN
    RETURN 
      CASE
        WHEN IS_ACCOUNT_GROUP_MEMBER('{us_group_name}') THEN region_value IN ('US-EAST', 'US-WEST')
        WHEN IS_ACCOUNT_GROUP_MEMBER('{eu_group_name}') THEN region_value IN ('EU-EAST', 'EU-WEST')
        WHEN IS_ACCOUNT_GROUP_MEMBER('account users') THEN TRUE  -- Admin access
        ELSE FALSE
      END
""")

print(f"✅ Created row filter function with groups: {us_group_name}, {eu_group_name}")

In [0]:
%sql
-- Apply row filter to customers table
ALTER TABLE ${catalog_name}.${schema_name}.customers 
SET ROW FILTER ${catalog_name}.${schema_name}.region_row_filter ON (region);

-- Apply row filter to orders table
ALTER TABLE ${catalog_name}.${schema_name}.orders 
SET ROW FILTER ${catalog_name}.${schema_name}.region_row_filter ON (region);

## 6. Implement Column-Level Security (CLS)

In [0]:
# Create column masking functions using Python variables for group names
spark.sql(f"""
    CREATE OR REPLACE FUNCTION {catalog_name}.{schema_name}.mask_email(email STRING)
    RETURNS STRING
    RETURN 
      CASE 
        WHEN IS_ACCOUNT_GROUP_MEMBER('account users') THEN email
        WHEN IS_ACCOUNT_GROUP_MEMBER('{us_group_name}') OR IS_ACCOUNT_GROUP_MEMBER('{eu_group_name}') 
          THEN CONCAT(SUBSTRING(email, 1, 3), '***@***', SUBSTRING(email, INSTR(email, '.'), LENGTH(email)))
        ELSE '***@***.***'
      END
""")

spark.sql(f"""
    CREATE OR REPLACE FUNCTION {catalog_name}.{schema_name}.mask_phone(phone STRING)
    RETURNS STRING
    RETURN 
      CASE
        WHEN IS_ACCOUNT_GROUP_MEMBER('account users') THEN phone
        ELSE CONCAT('***-', SUBSTRING(phone, LENGTH(phone) - 3, 4))
      END
""")

print(f"✅ Created column masking functions with groups: {us_group_name}, {eu_group_name}")

In [0]:
%sql
-- Apply column masks
ALTER TABLE ${catalog_name}.${schema_name}.customers 
ALTER COLUMN email SET MASK ${catalog_name}.${schema_name}.mask_email;

ALTER TABLE ${catalog_name}.${schema_name}.customers 
ALTER COLUMN phone SET MASK ${catalog_name}.${schema_name}.mask_phone;

## 7. Create Jobs with Application SP Managing User SP Execution

make sure to give service principal user permission.

In [0]:
from job_manager import ServerlessJobManager

job_manager = ServerlessJobManager(workspace_url, app_sp_token) #use pat_token for local user.

print("✅ Serverless Job Manager initialized")

In [0]:
notebook_path = "/Workspace/Users/marcin.jimenez@databricks.com/dbx-multi-user-extract/notebooks/process_regional_data"

print(f"📝 Creating serverless jobs with notebook: {notebook_path}")
print("   Note: Ensure this notebook exists in your workspace\n")

jobs_created = []
creation_context = f"Entra ID SP ({app_sp['displayName']})"

In [0]:
print("\n🔧 Processing US Region job...")
print(f"   Creator: {creation_context}")
print(f"   Run as: {us_sp['displayName']}")

try:
    us_job = job_manager.create_or_get_serverless_job(
        job_name="Marcin_US_Processing",
        notebook_path=notebook_path,
        catalog=catalog_name,
        schema=schema_name,
        run_as_sp_id=us_sp['applicationId'],
        description=f"US region processing - Created by {creation_context}, runs as {us_sp_name}"
    )
    
    if us_job:
        # Grant App SP management permission
        if app_sp:
            try:
                job_manager.grant_permission(
                    us_job['job_id'],
                    app_sp['applicationId'],
                    "CAN_MANAGE"
                )
                print(f"   ✅ Granted CAN_MANAGE to {app_sp.get('displayName')}")
            except:
                pass
        
        jobs_created.append({
            'job_id': us_job['job_id'],
            'name': "Marcin_US_Processing",
            'created_by': creation_context,
            'run_as': us_sp['displayName']
        })
        
        print(f"✅ US job ready: ID {us_job['job_id']}")
except Exception as e:
    print(f"❌ Failed to create job: {e}")