# Unity Catalog Absolute Path Test

**Goal**: Test whether a Delta table with absolute S3 paths pointing to multiple buckets works when registered in Unity Catalog.

## Test Setup

We've created:
- Parquet files in TWO different S3 buckets
- A Delta log at `s3://your-bucket-east-1a/uc_test_delta/` that references:
  - `s3://your-bucket-east-1a/uc_test/file1.parquet` (rows 1-3)
  - `s3://your-bucket-east-1b/uc_test/file2.parquet` (rows 4-6)

## Key Question

When a Delta table's `add` actions contain **absolute paths** to files in buckets other than the table's LOCATION bucket, does Unity Catalog:

1. **Read all 6 rows** (both buckets accessible via UC storage credentials)
2. **Read only 3 rows** (only the LOCATION bucket is scoped)
3. **Fail entirely** (UC rejects cross-bucket references)

## Step 1: Register in Unity Catalog

Register the pre-created Delta table. The table's LOCATION is in `east-1a` bucket but references files in both `east-1a` and `east-1b`.

In [None]:
import os

# Get configuration from environment or use placeholders
BUCKET_EAST_1A = os.getenv("HIVE_EVAL_BUCKET_EAST_1A", "your-bucket-east-1a")
UC_CATALOG = os.getenv("HIVE_EVAL_UC_CATALOG", "your_catalog")
UC_SCHEMA = os.getenv("HIVE_EVAL_UC_SCHEMA", "your_uc_schema")
TABLE_NAME = f"{UC_CATALOG}.{UC_SCHEMA}.uc_absolute_path_test"

print("=== REGISTERING IN UNITY CATALOG ===")
print(f"Table: {TABLE_NAME}")

# Drop if exists
spark.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}")

# Register the Delta table
spark.sql(f"""
    CREATE TABLE {TABLE_NAME}
    USING DELTA
    LOCATION 's3://{BUCKET_EAST_1A}/uc_test_delta/'
""")

print("Table registered successfully")

In [None]:
# Verify the table exists and show schema
spark.sql(f"DESCRIBE TABLE EXTENDED {TABLE_NAME}").show(100, truncate=False)

## Step 2: Read via UC Table Name

**This is the critical test.** When reading via the UC table name, UC provides credentials scoped to the table's LOCATION. The question is whether those credentials can access files stored in a DIFFERENT bucket that are referenced via absolute paths in the Delta log.

In [None]:
print("=== READING VIA UC TABLE NAME ===")
print()

try:
    result = spark.sql(f"SELECT * FROM {TABLE_NAME}")
    count = result.count()
    print(f"SUCCESS! Row count: {count}")
    print()
    result.show()
    
    print("\nRows per bucket (CRITICAL - should be 3 and 3):")
    result.groupBy("bucket").count().show()
    
    if count == 6:
        print("\n*** FINDING: UC CAN read cross-bucket absolute paths in Delta tables! ***")
    elif count == 3:
        print("\n*** FINDING: UC is scoped to LOCATION bucket - only same-bucket files work ***")
    else:
        print(f"\n*** UNEXPECTED: Got {count} rows - investigate further ***")
        
except Exception as e:
    print(f"FAILED!")
    print(f"Error type: {type(e).__name__}")
    print(f"Error message: {str(e)[:2000]}")
    print("\n*** FINDING: UC rejects Delta tables with cross-bucket absolute paths ***")

## Step 3: Test Write Operations (if read works)

In [None]:
print("=== TESTING INSERT ===")

try:
    spark.sql(f"""
        INSERT INTO {TABLE_NAME}
        VALUES (7, 'g', 'inserted')
    """)
    print("INSERT succeeded")
    
    # Check new count
    new_count = spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME}").collect()[0][0]
    print(f"New row count: {new_count}")
    
    # Show all data
    spark.sql(f"SELECT * FROM {TABLE_NAME} ORDER BY id").show()
    
except Exception as e:
    print(f"INSERT FAILED: {str(e)[:500]}")

In [None]:
print("=== TESTING UPDATE ===")

try:
    spark.sql(f"""
        UPDATE {TABLE_NAME}
        SET value = 'updated'
        WHERE id = 1
    """)
    print("UPDATE succeeded")
    
    # Show the updated row
    spark.sql(f"SELECT * FROM {TABLE_NAME} WHERE id = 1").show()
    
except Exception as e:
    print(f"UPDATE FAILED: {str(e)[:500]}")

In [None]:
print("=== TESTING DELETE ===")

try:
    spark.sql(f"""
        DELETE FROM {TABLE_NAME}
        WHERE id = 7
    """)
    print("DELETE succeeded")
    
    # Show remaining rows
    final_count = spark.sql(f"SELECT COUNT(*) FROM {TABLE_NAME}").collect()[0][0]
    print(f"Final row count: {final_count}")
    
except Exception as e:
    print(f"DELETE FAILED: {str(e)[:500]}")

## Step 4: Summary

In [None]:
print("=== SUMMARY ===")
print()
print("| Test | Works? | Row Count | Notes |")
print("|------|--------|-----------|-------|")

# Test UC table SELECT
try:
    uc_df = spark.sql(f"SELECT * FROM {TABLE_NAME}")
    uc_count = uc_df.count()
    if uc_count == 6:
        print(f"| UC table SELECT | YES | {uc_count} | Cross-bucket absolute paths WORK |")
    elif uc_count == 3:
        print(f"| UC table SELECT | PARTIAL | {uc_count} | Only LOCATION bucket accessible |")
    else:
        print(f"| UC table SELECT | YES | {uc_count} | Unexpected count - investigate |")
except Exception as e:
    err = str(e)[:50].replace('|', '/')
    print(f"| UC table SELECT | NO | - | {err} |")

print()
print("Interpretation:")
print("- If count = 6: UC can read files from multiple buckets via absolute paths in Delta log")
print("- If count = 3: UC scopes credentials to LOCATION bucket only")
print("- If failed: UC rejects cross-bucket absolute paths entirely")

## Step 5: Cleanup

In [None]:
import os

# Get bucket names from environment or use placeholders
BUCKET_EAST_1A = os.getenv("HIVE_EVAL_BUCKET_EAST_1A", "your-bucket-east-1a")
BUCKET_EAST_1B = os.getenv("HIVE_EVAL_BUCKET_EAST_1B", "your-bucket-east-1b")
UC_CATALOG = os.getenv("HIVE_EVAL_UC_CATALOG", "your_catalog")
UC_SCHEMA = os.getenv("HIVE_EVAL_UC_SCHEMA", "your_uc_schema")
TABLE_NAME = f"{UC_CATALOG}.{UC_SCHEMA}.uc_absolute_path_test"

# Drop the UC table
spark.sql(f"DROP TABLE IF EXISTS {TABLE_NAME}")
print("UC table dropped")

# Note: S3 cleanup should be done separately if needed
print("\nS3 data remains for re-running. To clean up:")
print(f"  aws s3 rm s3://{BUCKET_EAST_1A}/uc_test/ --recursive")
print(f"  aws s3 rm s3://{BUCKET_EAST_1B}/uc_test/ --recursive")
print(f"  aws s3 rm s3://{BUCKET_EAST_1A}/uc_test_delta/ --recursive")