# Iceberg Variant Demo

This notebook demonstrates how to connect Spark to Snowflake Horizon Catalog's Iceberg REST API and work with VARIANT data types in Iceberg tables.

## Prerequisites
1. Snowflake account with Iceberg support enabled
2. Personal Access Token (PAT) with appropriate permissions
3. Update the configuration variables below with your account details


## Configuration
Configuration is loaded from environment variables. Set these in `.env/iceberg.env`:


In [None]:
import os

# Snowflake Horizon REST Catalog endpoint
# Format: https://<account_identifier>.snowflakecomputing.com/polaris/api/catalog
horizon_catalog_uri = os.environ["SPARK_HORIZON_CATALOG_URI"]

# Your Snowflake database name (this will be your catalog in Iceberg)
catalog_name = os.environ["SPARK_CATALOG_NAME"]

# Your Personal Access Token from Snowflake
# Get this from: Account Settings â†’ Security â†’ Personal Access Tokens
token = os.environ["SPARK_SNOWFLAKE_PAT"]

# Snowflake role to use (typically ACCOUNTADMIN for testing)
horizon_role = os.environ["SPARK_HORIZON_ROLE"]

# External Volume cloud/region
# If AWS, region where your external volume's S3 bucket is located
aws_region = os.environ["AWS_REGION"]

# Iceberg version to use
ICEBERG_VER = os.environ.get("SPARK_ICEBERG_VERSION", "1.10.0")


In [24]:
from pyspark.sql import SparkSession
import findspark
findspark.init()

## Create Spark Session
This sets up the Spark session with all necessary Iceberg and AWS configurations:


In [25]:
# Stop any existing Spark session
try:
    spark.stop()
except:
    pass

print("Creating Spark session with Iceberg support...")

spark = (
    SparkSession.builder
      .master("local[*]")
      .config("spark.ui.port", "0")
      .config("spark.driver.bindAddress", "127.0.0.1")
      .config("spark.driver.host", "127.0.0.1")
      .config("spark.driver.port", "0")
      .config("spark.blockManager.port", "0")
      # ðŸ”‘ Pull the needed JARs automatically for Spark 4
      .config(
          "spark.jars.packages",
          f"org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:{ICEBERG_VER},"
          # If storage is S3, then below
          f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VER}"
      )
        # hide UI progress
      .config("spark.ui.showConsoleProgress", "false")
      .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      .config("spark.sql.defaultCatalog","horizoncatalog")
      .config("spark.sql.catalog.horizoncatalog","org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.horizoncatalog.type","rest")
      .config("spark.sql.catalog.horizoncatalog.uri",horizon_catalog_uri)
      .config("spark.sql.catalog.horizoncatalog.warehouse",catalog_name)
      .config("spark.sql.catalog.horizoncatalog.header.X-Iceberg-Access-Delegation","vended-credentials")
      .config("spark.sql.catalog.horizoncatalog.io-impl","org.apache.iceberg.aws.s3.S3FileIO")
      .config("spark.sql.catalog.horizoncatalog.file-io-impl","org.apache.iceberg.aws.s3.S3FileIO")
      .config("spark.sql.catalog.horizoncatalog.client.region",aws_region) # If storage is S3, then include this
      .config("spark.sql.catalog.horizoncatalog.credential",token) # use .credential for PAT. for External Oauth & key-pair auth use .token 
      .config("spark.sql.catalog.horizoncatalog.scope",horizon_role)
      .config("spark.sql.iceberg.vectorization.enabled","false")
      .getOrCreate()
)

# Set log level to reduce noise
spark.sparkContext.setLogLevel("ERROR")

print("âœ… Spark session created successfully!")
print(f"Spark UI available at: {spark.sparkContext.uiWebUrl}")


Creating Spark session with Iceberg support...
âœ… Spark session created successfully!
Spark UI available at: http://127.0.0.1:53495


In [26]:
spark.sql("SHOW NAMESPACES").show()

+---------+
|namespace|
+---------+
|      RAW|
+---------+



In [27]:
spark.sql("SHOW TABLES IN RAW").show()

+---------+---------------+-----------+
|namespace|      tableName|isTemporary|
+---------+---------------+-----------+
|      RAW|CUSTOMER_EVENTS|      false|
+---------+---------------+-----------+



In [None]:
spark.sql("DESCRIBE TABLE RAW.CUSTOMER_EVENTS").show()
# See the Variant column in the table schema.

In [None]:
spark.sql("""
SELECT
variant_get(EVENT_DATA, '$.event_id', 'string') AS event_id,
variant_get(EVENT_DATA, '$.event_type', 'string') AS event_type,
variant_get(EVENT_DATA, '$.timestamp', 'timestamp') AS event_timestamp,
variant_get(EVENT_DATA, '$.customer.name', 'string') AS customer_name,
variant_get(EVENT_DATA, '$.customer.email', 'string') AS customer_email,
variant_get(EVENT_DATA, '$.customer.phone', 'string') AS customer_phone
FROM RAW.CUSTOMER_EVENTS
ORDER BY event_timestamp
""").show()

In [None]:
spark.sql("SHOW TABLES IN REDACTED").show()

In [None]:
spark.sql("""
SELECT
variant_get(EVENT_DATA, '$.event_id', 'string') AS event_id,
variant_get(EVENT_DATA, '$.event_type', 'string') AS event_type,
variant_get(EVENT_DATA, '$.timestamp', 'timestamp') AS event_timestamp,
variant_get(EVENT_DATA, '$.customer.name', 'string') AS customer_name,
variant_get(EVENT_DATA, '$.customer.email', 'string') AS customer_email,
variant_get(EVENT_DATA, '$.customer.phone', 'string') AS customer_phone
FROM REDACTED.CUSTOMER_EVENTS_REDACTED
ORDER BY event_timestamp
""").show()