In [0]:
# ============================================
# CELL 0: ENVIRONMENT VALIDATION
# ============================================

print("=" * 80)
print("üîß STEP 1: VALIDATING DATABRICKS ENVIRONMENT")
print("=" * 80)

# Check Java version
java_version = spark.sparkContext._jvm.java.lang.System.getProperty("java.version")
print(f"\n‚úÖ Java Version: {java_version}")

# Check Spark version
spark_version = spark.version
print(f"‚úÖ Spark Version: {spark_version}")

# Check Python version
import sys
python_version = sys.version.split()[0]
print(f"‚úÖ Python Version: {python_version}")

# Verify Spark is working
try:
    test_df = spark.range(1, 3)
    test_count = test_df.count()
    print(f"‚úÖ Spark Context: Active (test count = {test_count})")
except Exception as e:
    print(f"‚ùå Spark Context Error: {e}")
    
print("\n" + "=" * 80)
print("‚úÖ ENVIRONMENT VALIDATION COMPLETE")
print("=" * 80)

In [0]:
# ============================================
# CELL 1: SNOWFLAKE POLARIS CONFIGURATION
# ============================================

print("=" * 80)
print("üîß STEP 2: CONFIGURING SNOWFLAKE POLARIS CATALOG CONNECTION")
print("=" * 80)

# Snowflake Polaris credentials - UPDATE WITH YOUR VALUES
ACCOUNTADMIN_PAT = "YOUR_SNOWFLAKE_PAT_TOKEN_HERE"  # Get from: Snowflake ‚Üí Profile ‚Üí Security ‚Üí Tokens
SNOWFLAKE_ACCOUNT = "YOUR-SNOWFLAKE-ACCOUNT"  # Format: ORGNAME-ACCOUNTNAME (no underscores!)
DATABASE_NAME = "DEMO_TESTDB"
horizon_role = "session:role:ACCOUNTADMIN"

# Construct Polaris API endpoint
account_uri = f"https://{SNOWFLAKE_ACCOUNT}.snowflakecomputing.com/polaris/api/catalog"

print(f"\nüìç Configuration Details:")
print(f"   Account: {SNOWFLAKE_ACCOUNT}")
print(f"   Database: {DATABASE_NAME}")
print(f"   Polaris URI: {account_uri}")
print(f"   Role: ACCOUNTADMIN")

# Configure Spark to use Snowflake Polaris as Iceberg catalog
print(f"\n‚öôÔ∏è  Configuring Spark catalog 'hz' for Polaris...")

try:
    # Clear any existing catalog configuration first
    try:
        spark.conf.unset("spark.sql.catalog.hz")
        spark.conf.unset("spark.sql.catalog.hz.type")
    except:
        pass  # Ignore if not set
    
    # Core Polaris/Iceberg configuration
    # Use catalog-impl approach (not type) to avoid conflicts
    spark.conf.set("spark.sql.catalog.hz", "org.apache.iceberg.spark.SparkCatalog")
    spark.conf.set("spark.sql.catalog.hz.catalog-impl", "org.apache.iceberg.rest.RESTCatalog")
    spark.conf.set("spark.sql.catalog.hz.uri", account_uri)
    spark.conf.set("spark.sql.catalog.hz.warehouse", DATABASE_NAME)
    spark.conf.set("spark.sql.catalog.hz.credential", ACCOUNTADMIN_PAT)
    spark.conf.set("spark.sql.catalog.hz.scope", horizon_role)
    spark.conf.set("spark.sql.catalog.hz.header.X-Snowflake-Authorization-Token-Type", "PROGRAMMATIC_ACCESS_TOKEN")
    spark.conf.set("spark.sql.iceberg.vectorization.enabled", "false")
    
    print("‚úÖ Spark catalog configuration set successfully")
    
except Exception as e:
    print(f"‚ùå Configuration Error: {e}")
    raise

# Verify configuration was applied
print(f"\nüîç Verifying configuration...")
catalog_uri = spark.conf.get("spark.sql.catalog.hz.uri")
catalog_warehouse = spark.conf.get("spark.sql.catalog.hz.warehouse")
print(f"   ‚úÖ Catalog URI: {catalog_uri}")
print(f"   ‚úÖ Warehouse: {catalog_warehouse}")

print("\n" + "=" * 80)
print("‚úÖ POLARIS CONFIGURATION COMPLETE")
print("=" * 80)



In [0]:
# ============================================
# CELL 2: TEST CONNECTION & DISCOVERY
# ============================================

print("=" * 80)
print("üîß STEP 3: TESTING POLARIS CONNECTION & DISCOVERING SCHEMAS")
print("=" * 80)

# Test 1: List namespaces (schemas)
print("\nüìã Test 1: Listing namespaces in Polaris catalog...")
try:
    namespaces_df = spark.sql("SHOW NAMESPACES IN hz")
    namespace_count = namespaces_df.count()
    
    print(f"‚úÖ Successfully connected to Polaris catalog!")
    print(f"‚úÖ Found {namespace_count} namespace(s):\n")
    namespaces_df.show(truncate=False)
    
    # Store namespaces for validation
    # Note: The column might be 'databaseName' or 'namespace' depending on Iceberg version
    try:
        namespaces = [row.databaseName for row in namespaces_df.collect()]
    except:
        namespaces = [row.namespace for row in namespaces_df.collect()]
    
    # Verify PUBLIC schema exists
    if "PUBLIC" in namespaces:
        print("‚úÖ PUBLIC schema found (expected)")
    else:
        print("‚ö†Ô∏è  PUBLIC schema not found - check Snowflake setup")
        
except Exception as e:
    print(f"‚ùå CONNECTION FAILED: {e}")
    print("\nüí° Troubleshooting:")
    print("   1. Verify Polaris catalog is enabled in Snowflake")
    print("   2. Check your PAT token is valid")
    print("   3. Ensure database DEMO_TESTDB exists")
    print("   4. Verify Iceberg libraries are installed on cluster")
    raise

# Test 2: List tables in PUBLIC schema
print("\n" + "=" * 80)
print("üìã Test 2: Listing tables in PUBLIC schema...")
try:
    tables_df = spark.sql("SHOW TABLES IN hz.PUBLIC")
    table_count = tables_df.count()
    
    print(f"‚úÖ Found {table_count} table(s) in PUBLIC schema:\n")
    tables_df.show(truncate=False)
    
    # Store table names for validation
    # Get the first row to check available columns
    if table_count > 0:
        first_row = tables_df.first()
        # Try different possible column names
        try:
            tables = [row.tableName for row in tables_df.collect()]
        except:
            try:
                tables = [row.name for row in tables_df.collect()]
            except:
                # Just get all column values from first column
                tables = [row[0] for row in tables_df.collect()]
        
        # Verify USER_INFO table exists
        tables_upper = [t.upper() if t else "" for t in tables]
        if "USER_INFO" in tables_upper:
            print("‚úÖ USER_INFO table found (target table confirmed)")
        else:
            print("‚ö†Ô∏è  USER_INFO table not found - check Snowflake table creation")
            print(f"   Available tables: {tables}")
    else:
        print("‚ö†Ô∏è  No tables found in PUBLIC schema")
        
except Exception as e:
    print(f"‚ùå DISCOVERY FAILED: {e}")
    raise

print("\n" + "=" * 80)
print("‚úÖ CONNECTION TEST & DISCOVERY COMPLETE")
print("=" * 80)         

In [0]:
# ============================================
# CELL 3: QUERY USER_INFO TABLE
# ============================================

print("=" * 80)
print("üîß STEP 4: READING USER_INFO TABLE FROM SNOWFLAKE")
print("=" * 80)

# Query the USER_INFO table
print("\nüë• Querying USER_INFO table via Polaris catalog...")

user_info_df = spark.sql("SELECT * FROM hz.PUBLIC.USER_INFO")

print("\nüìä USER_INFO Table:")
user_info_df.show(truncate=False)

# Show row count
row_count = user_info_df.count()
print(f"\n‚úÖ Total rows: {row_count}")

print("\n" + "=" * 80)
print("‚úÖ QUERY COMPLETE")
print("=" * 80)

In [None]:
# ============================================
# CELL 4: FINAL VALIDATION & DEMO SUMMARY
# ============================================

print("=" * 80)
print("üéØ FINAL VALIDATION & DEMO SUMMARY")
print("=" * 80)

# Summary of what was accomplished
print("\n‚úÖ DEMO COMPLETED SUCCESSFULLY!\n")

print("üìã What We Accomplished:")
print("   ‚úÖ Step 1: Validated Databricks environment")
print("   ‚úÖ Step 2: Configured Snowflake Polaris catalog connection")
print("   ‚úÖ Step 3: Tested connection and discovered schemas/tables")
print("   ‚úÖ Step 4: Read and validated USER_INFO table")

print("\nüèóÔ∏è  Architecture Demonstrated:")
print("   ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê")
print("   ‚îÇ         SNOWFLAKE                        ‚îÇ")
print("   ‚îÇ  Database: DEMO_TESTDB                   ‚îÇ")
print("   ‚îÇ  Table: USER_INFO (Iceberg format)       ‚îÇ")
print("   ‚îÇ  Polaris Catalog (REST API)              ‚îÇ")
print("   ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò")
print("                  ‚îÇ")
print("                  ‚îÇ REST API over HTTPS")
print("                  ‚îÇ (Iceberg protocol)")
print("                  ‚îÇ")
print("   ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚ñº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê")
print("   ‚îÇ         DATABRICKS                       ‚îÇ")
print("   ‚îÇ  Spark + Iceberg libraries               ‚îÇ")
print("   ‚îÇ  Querying remote Iceberg tables          ‚îÇ")
print("   ‚îÇ  No data duplication!                    ‚îÇ")
print("   ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò")

print("\nüîë Key Capabilities Shown:")
print("   ‚Ä¢ Data Federation: Query Snowflake data from Databricks")
print("   ‚Ä¢ Open Standards: Using Apache Iceberg format")
print("   ‚Ä¢ No ETL: Direct access without copying data")
print("   ‚Ä¢ Polaris Catalog: Snowflake's REST-based catalog service")

print("\nüìä Table Statistics:")
try:
    final_count = spark.sql("SELECT COUNT(*) as count FROM hz.PUBLIC.USER_INFO").collect()[0]['count']
    print(f"   Total rows in USER_INFO: {final_count}")
    
    final_sample = spark.sql("SELECT * FROM hz.PUBLIC.USER_INFO LIMIT 3")
    print("\n   Sample data:")
    final_sample.show(truncate=False)
    
except Exception as e:
    print(f"   Could not fetch final stats: {e}")

print("\nüöÄ Next Steps - Phase 2:")
print("   1. Add more tables (CUSTOMERS, ORDERS, PRODUCTS)")
print("   2. Set up Snowflake Cortex Analyst (semantic model)")
print("   3. Create Cortex Agent for natural language queries")
print("   4. Build demo presentation and talking points")

print("\n" + "=" * 80)
print("‚úÖ DEMO READY FOR PRESENTATION!")
print("=" * 80)

# Create a flag to indicate success
print("\nüíæ Saving connection state for future use...")
spark.conf.set("demo.polaris.connection.validated", "true")


In [None]:
# ============================================
# CELL 5: TROUBLESHOOTING & DIAGNOSTICS (Optional)
# ============================================
# Run this cell only if you encounter errors in previous cells

print("=" * 80)
print("üîß TROUBLESHOOTING & DIAGNOSTICS")
print("=" * 80)

print("\nüîç Diagnostic Information:\n")

# Check 1: Verify Iceberg library is installed
print("1Ô∏è‚É£ Checking for Iceberg libraries...")
try:
    iceberg_class = spark.sparkContext._jvm.org.apache.iceberg.spark.SparkCatalog
    print("   ‚úÖ Iceberg libraries are installed")
except Exception as e:
    print("   ‚ùå Iceberg libraries NOT found!")
    print("   üí° Install: org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.3")
    print(f"   Error: {e}")

# Check 2: Verify catalog configuration
print("\n2Ô∏è‚É£ Checking catalog configuration...")
try:
    catalog_impl = spark.conf.get("spark.sql.catalog.hz.catalog-impl")
    catalog_uri = spark.conf.get("spark.sql.catalog.hz.uri")
    print(f"   ‚úÖ Catalog implementation: {catalog_impl}")
    print(f"   ‚úÖ Catalog URI: {catalog_uri}")
except Exception as e:
    print(f"   ‚ùå Catalog not configured: {e}")

# Check 3: Network connectivity test
print("\n3Ô∏è‚É£ Testing network connectivity to Snowflake...")
try:
    import urllib.request
    import ssl
    
    # Test basic HTTPS connectivity to Snowflake
    context = ssl.create_default_context()
    url = f"https://{SNOWFLAKE_ACCOUNT}.snowflakecomputing.com"
    
    try:
        req = urllib.request.Request(url, method='HEAD')
        response = urllib.request.urlopen(req, context=context, timeout=10)
        print(f"   ‚úÖ Can reach Snowflake at {url}")
        print(f"   HTTP Status: {response.status}")
    except Exception as net_err:
        print(f"   ‚ö†Ô∏è  Network issue: {net_err}")
        
except Exception as e:
    print(f"   ‚ö†Ô∏è  Could not test network: {e}")

# Check 4: PAT token format validation
print("\n4Ô∏è‚É£ Validating PAT token format...")
try:
    if ACCOUNTADMIN_PAT and len(ACCOUNTADMIN_PAT) > 50:
        print("   ‚úÖ PAT token appears to be set (length > 50 chars)")
        # Check if it looks like a JWT
        if ACCOUNTADMIN_PAT.count('.') == 2:
            print("   ‚úÖ PAT token format looks like a JWT (3 parts)")
        else:
            print("   ‚ö†Ô∏è  PAT token doesn't look like a JWT")
    else:
        print("   ‚ùå PAT token not set or too short")
except Exception as e:
    print(f"   ‚ùå PAT token validation failed: {e}")

# Check 5: List all Spark catalog configurations
print("\n5Ô∏è‚É£ All Spark catalog configurations:")
try:
    all_configs = spark.sparkContext.getConf().getAll()
    catalog_configs = [conf for conf in all_configs if 'catalog.hz' in conf[0]]
    
    if catalog_configs:
        for key, value in catalog_configs:
            # Mask the credential
            if 'credential' in key:
                value = value[:20] + "..." if len(value) > 20 else "***"
            print(f"   {key} = {value}")
    else:
        print("   ‚ö†Ô∏è  No catalog configurations found")
        
except Exception as e:
    print(f"   ‚ùå Could not list configurations: {e}")

# Check 6: Common error patterns
print("\n6Ô∏è‚É£ Common Issues & Solutions:")
print("   ‚ùå 'Cannot find catalog plugin' ‚Üí Install Iceberg libraries")
print("   ‚ùå 'RESTException' ‚Üí Check Polaris is enabled in Snowflake")
print("   ‚ùå '401 Unauthorized' ‚Üí Verify PAT token is valid")
print("   ‚ùå '404 Not Found' ‚Üí Check database name and Snowflake account")
print("   ‚ùå 'Connection refused' ‚Üí Network/firewall issues")

print("\n" + "=" * 80)
print("‚úÖ DIAGNOSTICS COMPLETE")
print("=" * 80)
