In [None]:
import subprocess
import os
import sys

def run_shell_command(command):
    try:
        process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
        
        # Print output in real-time
        while True:
            output = process.stdout.readline()
            if output == '' and process.poll() is not None:
                break
            if output:
                print(output.strip())
                
        # Get the return code
        return_code = process.poll()
        
        # Print any errors if the command failed
        if return_code != 0:
            print("Error output:", file=sys.stderr)
            print(process.stderr.read(), file=sys.stderr)
            raise subprocess.CalledProcessError(return_code, command)
            
    except Exception as e:
        print(f"Error executing command: {e}", file=sys.stderr)
        raise

os.environ['SPARK_VERSION'] = '3.5.5'  # Set Spark version
print(f"SPARK_VERSION set to: {os.environ['SPARK_VERSION']}")
# Execute the setup_hms.sh script with clean_metastore argument
run_shell_command("./setup_hms.sh clean_metastore")


In [None]:
import os
from pyspark.sql import SparkSession

iceberg_version = "1.9.0"
spark_version = "3.5"
scala_version = "2.12"
iceberg_package = f"org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}"

# --- Spark Session Builder ---
# This configuration is cleaner and follows best practices.
spark = SparkSession.builder \
    .appName("Hive Federation Demo") \
    .config("spark.jars.packages", iceberg_package) \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.spark_catalog.uri", "thrift://localhost:9083") \
    .config("spark.sql.catalog.spark_catalog.warehouse", "/tmp/data/spark-warehouse") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()

# --- Verification ---
# Print the Spark version and some key configurations to verify.
print(f"Spark Session created successfully with Spark version: {spark.version}")
conf = spark.sparkContext.getConf()
print(f"Spark Configuration: {conf.getAll()}")


# --- Example Usage: Create Namespaces and Tables ---
# The key fix is adding "USING iceberg" to your CREATE TABLE statements.
# Create namespaces and tables
commands = [
    "CREATE NAMESPACE IF NOT EXISTS ns1",
    "CREATE NAMESPACE IF NOT EXISTS ns2",
    "DROP TABLE IF EXISTS ns1.table1",
    "DROP TABLE IF EXISTS ns2.table2",
    "CREATE TABLE ns1.table1 (key STRING, value STRING, version INT) STORED AS ICEBERG",
    "CREATE TABLE ns2.table2 (key STRING, value STRING, version INT) STORED AS ICEBERG"
]

# Execute each command and print the result
for cmd in commands:
    print(f"\nExecuting: {cmd}")
    try:
        result = spark.sql(cmd)
        if cmd.strip().upper().startswith(("SHOW", "SELECT", "DESCRIBE")):
            result.show()
        else:
            print("Success!")
    except Exception as e:
        print(f"Error: {str(e)}")

# Insert some data
print("\n--- Inserting Data ---")
spark.sql("INSERT INTO spark_catalog.ns1.table1 VALUES ('Engine', 'Spark', 1), ('Engine', 'Apache Spark', 2), ('Catalog', 'Hive', 1)")
spark.sql("INSERT INTO spark_catalog.ns2.table2 VALUES ('Engine', 'Snowflake', 1), ('Alt Engine', 'Apache Spark', 1), ('Catalog', 'Open Catalog', 1)")
print("Data inserted successfully.")

# Verify the tables were created
print("\n--- Verifying Tables ---")
print("Listing all tables in ns1:")
spark.sql("SHOW TABLES IN spark_catalog.ns1").show()
print("\nListing all tables in ns2:")
spark.sql("SHOW TABLES IN spark_catalog.ns2").show()

# Verify data can be read
print("\n--- Reading Data ---")
print("Data from ns1.table1:")
spark.table("spark_catalog.ns1.table1").show()
print("\nData from ns2.table2:")
spark.table("spark_catalog.ns2.table2").show()


# Stop the session when done
spark.stop()
print("\nSpark session stopped.")

In [None]:
# Set up POLARIS auth token. 
import os
import requests
import json

POLARIS_HOST = os.getenv('POLARIS_HOST', 'localhost')
BASE_URL = f"http://{POLARIS_HOST}:8181"

def get_polaris_token():
    response = requests.post(f"{BASE_URL}/api/catalog/v1/oauth/tokens",
                             data={'grant_type': 'client_credentials', 'scope': 'PRINCIPAL_ROLE:ALL'},
                             auth=('root', 's3cr3t'))
    return response.json()['access_token']

POLARIS_TOKEN = get_polaris_token()

os.environ['POLARIS_TOKEN'] = POLARIS_TOKEN

In [None]:
import subprocess

def curl_command(cmd):
    process = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    print(f"Status Code: {process.returncode}")
    print(process.stdout if process.stdout else process.stderr)

In [None]:
# Define the JSON payload
json_data = '''{
  "type": "EXTERNAL",
  "name": "federated_hive",
  "connectionConfigInfo": {
    "connectionType": "HIVE",
    "uri": "thrift://127.0.0.1:9083",
    "warehouse": "hms",
    "authenticationParameters": {
      "authenticationType": "IMPLICIT"
    }
  },
  "properties": {
    "default-base-location": "file:///tmp/data/spark-warehouse"
  },
  "storageConfigInfo": {
    "storageType": "FILE",
    "allowedLocations": [
      "file:///tmp/data/spark-warehouse"
    ]
  }
}'''

# Construct and execute the curl command
cmd = f'''curl -X POST http://polaris:8181/api/management/v1/catalogs \
    --resolve polaris:8181:127.0.0.1 \
    -H "Accepts: application/json" \
    -H "Content-Type: application/json" \
    -H "Authorization: Bearer $POLARIS_TOKEN" \
    -d '{json_data}' '''

curl_command(cmd)

In [None]:
cmd = f'''curl -i -X GET http://localhost:8181/api/management/v1/catalogs/federated_hive/ \
  -H "Authorization: Bearer $POLARIS_TOKEN" -H "Accept: application/json" -H "Content-Type: application/json" '''

curl_command(cmd)

In [None]:
# List all tables in the ns1 namespace. 
cmd = f'''curl -i -X GET http://localhost:8181/api/catalog/v1/federated_hive/namespaces/ns1/tables/ \
  -H "Authorization: Bearer $POLARIS_TOKEN" -H "Accept: application/json" -H "Content-Type: application/json" '''

curl_command(cmd)

In [None]:
# List all namespaces. 
cmd = f'''curl -i -X GET http://localhost:8181/api/catalog/v1/federated_hive/namespaces/ \
  -H "Authorization: Bearer $POLARIS_TOKEN" -H "Accept: application/json" -H "Content-Type: application/json" '''

curl_command(cmd)


In [None]:
# List all tables in the ns1 namespace. 
cmd = f'''curl -i -X GET http://localhost:8181/api/catalog/v1/federated_hive/namespaces/ns1/tables/ \
  -H "Authorization: Bearer $POLARIS_TOKEN" -H "Accept: application/json" -H "Content-Type: application/json" '''

curl_command(cmd)

In [None]:
# List table metadata for ns1.table1. 
cmd = f'''curl -i -X GET http://localhost:8181/api/catalog/v1/federated_hive/namespaces/ns1/tables/table1 \
  -H "Authorization: Bearer $POLARIS_TOKEN" -H "Accept: application/json" -H "Content-Type: application/json" '''

curl_command(cmd)

In [None]:
# Grant TABLE_WRITE_DATA to the catalog_admin role. 
json_data = '''{"type": "catalog", "privilege": "TABLE_WRITE_DATA"}'''
cmd = f'''curl -i -X PUT -H "Authorization: Bearer $POLARIS_TOKEN" -H "Accept: application/json" -H "Content-Type: application/json" \
    http://localhost:8181/api/management/v1/catalogs/federated_hive/catalog-roles/catalog_admin/grants \
  -d '{json_data}' '''

curl_command(cmd)

In [None]:
# Assign the catalog_admin role to the service_admin.
json_data = '''{"name": "catalog_admin"}'''
cmd = f'''curl -i -X PUT -H "Authorization: Bearer $POLARIS_TOKEN" -H "Accept: application/json" -H "Content-Type: application/json" \
    http://localhost:8181/api/management/v1/principal-roles/service_admin/catalog-roles/federated_hive \
  -d '{json_data}' '''

curl_command(cmd)

In [None]:
import os
from pyspark.sql import SparkSession


iceberg_version = "1.9.0"
spark_version = "3.5"
scala_version = "2.12"
iceberg_package = f"org.apache.iceberg:iceberg-spark-runtime-{spark_version}_{scala_version}:{iceberg_version}"

# Create Spark session with the same configurations as spark-sql command
spark = SparkSession.builder \
    .appName("Polaris Hive Federation") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.jars.packages",  "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.1,org.apache.iceberg:iceberg-aws-bundle:1.7.1,software.amazon.awssdk:bundle:2.30.25,net.snowflake:snowflake-jdbc:3.13.32,com.azure:azure-storage-file-datalake:12.17.0,software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:0.1.5") \
     .config("spark.sql.catalog.polaris", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.polaris.token", os.environ.get('POLARIS_TOKEN')) \
    .config("spark.sql.catalog.polaris.type", "rest") \
    .config("spark.sql.catalog.polaris.uri", "http://localhost:8181/api/catalog") \
    .config("spark.sql.catalog.polaris.warehouse", "federated_hive") \
    .config("spark.sql.defaultCatalog", "polaris") \
    .config("spark.sql.catalog.polaris.client.region", "us-east-1") \
    .config("spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation", "vended-credentials") \
    .config("spark.driver.host", "127.0.0.1") \
    .getOrCreate()

print("Spark version:", spark.version)

# Verify configuration
print("Active Spark Configuration:")
print(spark.sparkContext.getConf().getAll())

In [None]:
# List all namespaces
print("Listing all namespaces:")
spark.sql("SHOW NAMESPACES").show()

# List tables in ns1 and ns2
print("\nListing tables in ns1:")
spark.sql("SHOW TABLES IN ns1").show()

print("\nListing tables in ns2:")
spark.sql("SHOW TABLES IN ns2").show()

# Insert sample data into tables
insert_commands = [
    """INSERT INTO ns1.table1 (key, value, version) 
       VALUES 
       ('key1', 'value1', 1),
       ('key2', 'value2', 2),
       ('key3', 'value3', 3)""",
    
    """INSERT INTO ns2.table2 (key, value, version)
       VALUES
       ('keyA', 'valueA', 10),
       ('keyB', 'valueB', 20),
       ('keyC', 'valueC', 30)"""
]

# Execute inserts
for cmd in insert_commands:
    print(f"\nExecuting: {cmd}")
    spark.sql(cmd)
    print("Success!")

# Verify data in tables
print("\nData in ns1.table1:")
spark.sql("SELECT * FROM ns1.table1").show()

print("\nData in ns2.table2:")
spark.sql("SELECT * FROM ns2.table2").show()
