#### Define Critical Words List
- Creates a list of sensitive column names that may contain personally identifiable information (PII) or require special access controls
- Includes privilege-related fields, catalog metadata, and security-sensitive column names


In [0]:
critical_words =[
"grantor",
"grantee",
"catalog_name",
"privilege_type",
"is_grantable",
"inherited_from",
"catalog_name",
"tag_name",
"tag_value",
"catalog_name",
"catalog_owner",
"comment",
"created",
"created_by",
"last_altered",
"last_altered_by",
"constraint_catalog",
"constraint_schema",
"constraint_name",
"check_clause",
"sql_path",
"comment",
"table_catalog",
"table_schema",
"table_name",
"column_name",
"mask_name",
"using_",
"catalog_name",
"schema_name",
"table_name",
"column_name"
]

#### Export Critical Words to JSON
- Saves the critical words list to a JSON file in the volume for use in subsequent AI-based analysis queries


In [0]:
import json

critical_words_path = '/Volumes/mc/teva/files/critical_words.json'

with open(critical_words_path, 'w') as f:
    f.write(json.dumps(critical_words))

#### Scan All Catalogs, Databases, Tables and Columns
- Iterates through the first 10 catalogs and extracts the complete hierarchy of all databases, tables, and columns
- Excludes system objects starting with underscore
- Creates a DataFrame with the full lineage: catalog > database > table > column


In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Get first 5 catalogs
catalogs_df = spark.sql("SHOW CATALOGS")
catalogs = [row.catalog for row in catalogs_df.limit(10).collect() if not row.catalog.startswith('_')]

rows = []

for catalog in catalogs:
    try:
        # Switch catalog
        spark.sql(f"USE CATALOG {catalog}")
        
        # Get all databases in this catalog
        databases_df = spark.sql("SHOW DATABASES")
        for db_row in databases_df.collect():
            db = db_row.databaseName
            
            if db.startswith('_'):
                continue
            
            try:
                # Switch database
                spark.sql(f"USE {db}")
                
                # Get all tables
                tables_df = spark.sql("SHOW TABLES")
                for tbl_row in tables_df.collect():
                    tbl = tbl_row.tableName
                    
                    if tbl.startswith('_'):
                        continue
                    
                    # Get columns for table
                    cols_df = spark.sql(f"DESCRIBE {tbl}")
                    for col_row in cols_df.collect():
                        if not col_row.col_name.startswith('_'):
                            rows.append((catalog, db, tbl, col_row.col_name))
                        
            except Exception as e:
                print(f"Skipping database {catalog}.{db}")
                continue
                
    except Exception as e:
        print(f"Skipping catalog {catalog}")
        continue

# Create DataFrame
columns = ["catalog", "database", "table", "column"]
df = spark.createDataFrame(rows, columns)
df.display()


#### Save Column Inventory to Table
- Writes the complete column inventory DataFrame to a Delta table for persistent storage and further analysis


In [0]:
df.write.mode("overwrite").saveAsTable("mc.db.columns")

#### AI-Based Critical Score Assessment (Decimal Score)
- Uses AI to analyze the first 100 column names and assign a decimal score (0-1) indicating how closely each column name relates to the critical words list
- Higher scores indicate stronger matches to sensitive data patterns


In [0]:
%sql
WITH critical_words AS (
      SELECT value as critical_words
      FROM read_files('/Volumes/mc/teva/files/critical_words.json', format => 'text')
    )
SELECT column, ai_query(
    "databricks-claude-sonnet-4",
    "column name :" || column || 
    "return a DECIMAL SCORE between  0 and 1 the column name relates to any of the following words :" || critical_words || 
    "Only output Allowed is DECIMAL, nothing else. Do not comment, When in doubt output"
) AS critical_score
FROM mc.db.columns, critical_words LIMIT 100;

#### AI-Based Critical Word Matching (Boolean + Match)
- Uses AI to analyze the first 10 column names and return TRUE/FALSE plus the specific critical word that matched
- Output format is JSON: {true;critical_word} or FALSE


In [0]:
%sql
WITH critical_words AS (
      SELECT value as critical_words
      FROM read_files('/Volumes/mc/teva/files/critical_words.json', format => 'text')
    )
SELECT column, ai_query(
    "databricks-claude-sonnet-4",
    "column name :" || column || 
    "return TRUE if the column name relates to any of the following words :" || critical_words || 
    "Only output Allowed is TRUE or FALSE + THE Critical Word that was an actual match , nothing else Do not comment, When in doubt output FALSE"
    "in json format example {true;critical_word} ONLY"
) AS critical
FROM mc.db.columns, critical_words LIMIT 10;

#### AI-Based PII Detection
- Uses AI to analyze the first 100 column names and determine if they might contain Personally Identifiable Information (PII)
- Returns TRUE/FALSE for each column based on AI assessment


In [0]:
%sql
WITH critical_words AS (
      SELECT value as critical_words
      FROM read_files('/Volumes/mc/teva/files/critical_words.json', format => 'text')
    )
SELECT column, ai_query(
    "databricks-claude-sonnet-4",
    "column name :" || column || 
    "return TRUE if the column name might contain PII"  
    "Only output Allowed is TRUE or FALSE, nothing else. Do not comment, When in doubt output FALSE"
) AS PII
FROM mc.db.columns, critical_words LIMIT 100;