In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType,
    StructField,
    LongType,
    StringType,
    TimestampType,
    DoubleType,
)

# =============================================================================
# 1. CREATE SPARK SESSION WITH BIGQUERY METASTORE CATALOG
# =============================================================================

# Replace these with your actual values
PROJECT_ID = "rainbow-data-production-483609"
LOCATION = "us-central1"
WAREHOUSE = "gs://rainbow-data-production-iceberg/"
CATALOG_NAME = "rainbow-data-production-iceberg"

spark = (
    SparkSession.builder.appName("BigQuery Metastore Iceberg Example")
    .config("spark.master", "local[*]")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.port", "7078")
    .config("spark.blockManager.port", "7079")
    .config("spark.driver.memory", "2G")
    .config("spark.executor.memory", "2G")
    .config("spark.driver.userClassPathFirst", "false")
    .config("spark.executor.userClassPathFirst", "false")
    .config(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    )
    .config(
        f"spark.sql.catalog.{CATALOG_NAME}", "org.apache.iceberg.spark.SparkCatalog"
    )
    .config(
        f"spark.sql.catalog.{CATALOG_NAME}.catalog-impl",
        "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog",
    )
    .config(f"spark.sql.catalog.{CATALOG_NAME}.gcp.bigquery.project-id", PROJECT_ID)
    .config(f"spark.sql.catalog.{CATALOG_NAME}.gcp.bigquery.location", LOCATION)
    .config(f"spark.sql.catalog.{CATALOG_NAME}.warehouse", WAREHOUSE)
    .config(
        f"spark.sql.catalog.{CATALOG_NAME}.io-impl",
        "org.apache.iceberg.gcp.gcs.GCSFileIO",
    )
    .config(
        "spark.jars.packages",
        ",".join(
            [
                "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.1",
                "org.apache.iceberg:iceberg-gcp-bundle:1.10.1",
                "org.apache.iceberg:iceberg-gcp:1.10.1"
            ]
        ),
    )
    .getOrCreate()
)

:: loading settings :: url = jar:file:/Users/tuan.tran1/Workspaces/Test/TestIceberg/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/tuan.tran1/.ivy2/cache
The jars for the packages stored in: /Users/tuan.tran1/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.apache.iceberg#iceberg-gcp-bundle added as a dependency
org.apache.iceberg#iceberg-gcp added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1bea8fe9-fd0b-4e55-9344-9a71135f8155;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.10.1 in central
	found org.apache.iceberg#iceberg-gcp-bundle;1.10.1 in central
	found org.apache.iceberg#iceberg-gcp;1.10.1 in central
	found org.apache.iceberg#iceberg-api;1.10.1 in central
	found org.slf4j#slf4j-api;2.0.17 in central
	found org.apache.iceberg#iceberg-bundled-guava;1.10.1 in central
	found org.apache.iceberg#iceberg-common;1.10.1 in central
	found org.apache.iceberg#iceberg-core;1.10.1 in central
	found org.apache.avro#avro;1.12.0 in local-m2-cache
	found com.fasterxml.jackson.core#jack

In [2]:
spark.sql(
    "SHOW NAMESPACES IN `rainbow-data-production-iceberg`"
).show()

IllegalArgumentException: Cannot initialize Catalog implementation org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog
	Missing org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog]

In [3]:
spark.sql(
    "SHOW CATALOGS"
).show()

+-------------+
|      catalog|
+-------------+
|spark_catalog|
+-------------+



In [4]:
spark.sql(f"USE {CATALOG_NAME}")


ParseException: 
[INVALID_IDENTIFIER] The identifier rainbow-data-production-iceberg is invalid. Please, consider quoting it with back-quotes as `rainbow-data-production-iceberg`.(line 1, pos 11)

== SQL ==
USE rainbow-data-production-iceberg
-----------^^^


In [None]:
# =============================================================================
# 2. CREATE A NAMESPACE (DATASET IN BIGQUERY)
# =============================================================================

# Switch to the catalog
spark.sql(f"USE {CATALOG_NAME}")

# Create a namespace (this creates a BigQuery dataset)
spark.sql("CREATE NAMESPACE IF NOT EXISTS sales_data")

# List all namespaces
print("=== Available Namespaces ===")
spark.sql("SHOW NAMESPACES").show()

# Use the namespace
spark.sql("USE NAMESPACE sales_data")

In [None]:
# =============================================================================
# 3. CREATE AN ICEBERG TABLE
# =============================================================================

# Option A: Create table using SQL
spark.sql("""
    CREATE TABLE IF NOT EXISTS orders (
        order_id BIGINT,
        customer_id BIGINT,
        product_name STRING,
        quantity INT,
        price DOUBLE,
        order_date TIMESTAMP
    )
    USING iceberg
    PARTITIONED BY (days(order_date))
    TBLPROPERTIES (
        'write.format.default' = 'parquet',
        'write.parquet.compression-codec' = 'snappy'
    )
""")

# Option B: Create table from a DataFrame schema
schema = StructType(
    [
        StructField("customer_id", LongType(), False),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("created_at", TimestampType(), True),
    ]
)

spark.sql("""
    CREATE TABLE IF NOT EXISTS customers (
        customer_id BIGINT,
        name STRING,
        email STRING,
        created_at TIMESTAMP
    )
    USING iceberg
""")

# List tables in the namespace
print("=== Tables in sales_data namespace ===")
spark.sql("SHOW TABLES").show()

# =============================================================================
# 4. INSERT DATA
# =============================================================================

# Option A: Insert using SQL VALUES
spark.sql("""
    INSERT INTO orders VALUES
    (1, 101, 'Laptop', 1, 1299.99, timestamp('2024-01-15 10:30:00')),
    (2, 102, 'Mouse', 2, 29.99, timestamp('2024-01-15 11:45:00')),
    (3, 101, 'Keyboard', 1, 79.99, timestamp('2024-01-16 09:00:00')),
    (4, 103, 'Monitor', 1, 399.99, timestamp('2024-01-16 14:20:00')),
    (5, 102, 'USB Cable', 3, 9.99, timestamp('2024-01-17 16:00:00'))
""")

spark.sql("""
    INSERT INTO customers VALUES
    (101, 'Alice Johnson', 'alice@example.com', timestamp('2024-01-01 08:00:00')),
    (102, 'Bob Smith', 'bob@example.com', timestamp('2024-01-05 12:00:00')),
    (103, 'Carol White', 'carol@example.com', timestamp('2024-01-10 15:00:00'))
""")

# Option B: Insert from a DataFrame
from datetime import datetime

order_data = [
    (6, 104, "Headphones", 1, 149.99, datetime(2024, 1, 18, 10, 0, 0)),
    (7, 105, "Webcam", 1, 89.99, datetime(2024, 1, 18, 11, 30, 0)),
]

order_schema = StructType(
    [
        StructField("order_id", LongType(), False),
        StructField("customer_id", LongType(), False),
        StructField("product_name", StringType(), True),
        StructField("quantity", LongType(), True),
        StructField("price", DoubleType(), True),
        StructField("order_date", TimestampType(), True),
    ]
)

df_new_orders = spark.createDataFrame(order_data, order_schema)
df_new_orders.writeTo(f"{CATALOG_NAME}.sales_data.orders").append()

print("=== Data inserted successfully ===")

# =============================================================================
# 5. QUERY DATA
# =============================================================================

# Simple SELECT
print("=== All Orders ===")
spark.sql("SELECT * FROM orders ORDER BY order_id").show()

# Query with filters
print("=== Orders over $100 ===")
spark.sql("""
    SELECT order_id, product_name, price
    FROM orders
    WHERE price > 100
    ORDER BY price DESC
""").show()

# JOIN tables
print("=== Orders with Customer Names ===")
spark.sql("""
    SELECT 
        o.order_id,
        c.name AS customer_name,
        o.product_name,
        o.quantity,
        o.price,
        o.order_date
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id
    ORDER BY o.order_date
""").show()

# Aggregation
print("=== Sales Summary by Customer ===")
spark.sql("""
    SELECT 
        c.name AS customer_name,
        COUNT(*) AS total_orders,
        SUM(o.quantity) AS total_items,
        ROUND(SUM(o.price * o.quantity), 2) AS total_spent
    FROM orders o
    JOIN customers c ON o.customer_id = c.customer_id
    GROUP BY c.name
    ORDER BY total_spent DESC
""").show()

# =============================================================================
# 6. UPDATE AND DELETE DATA
# =============================================================================

# Update a record
spark.sql("""
    UPDATE orders 
    SET price = 1199.99 
    WHERE order_id = 1
""")
print("=== After price update for order_id=1 ===")
spark.sql("SELECT * FROM orders WHERE order_id = 1").show()

# Delete a record
spark.sql("DELETE FROM orders WHERE order_id = 5")
print("=== After deleting order_id=5 ===")
spark.sql("SELECT * FROM orders ORDER BY order_id").show()

# =============================================================================
# 7. MERGE (UPSERT) OPERATION
# =============================================================================

# Create a temporary view with updates
updates_data = [
    (
        1,
        101,
        "Laptop Pro",
        1,
        1499.99,
        datetime(2024, 1, 15, 10, 30, 0),
    ),  # Update existing
    (8, 106, "Tablet", 1, 599.99, datetime(2024, 1, 19, 9, 0, 0)),  # New record
]

df_updates = spark.createDataFrame(updates_data, order_schema)
df_updates.createOrReplaceTempView("order_updates")

spark.sql("""
    MERGE INTO orders AS target
    USING order_updates AS source
    ON target.order_id = source.order_id
    WHEN MATCHED THEN 
        UPDATE SET 
            product_name = source.product_name,
            price = source.price
    WHEN NOT MATCHED THEN 
        INSERT *
""")

print("=== After MERGE operation ===")
spark.sql("SELECT * FROM orders ORDER BY order_id").show()

# =============================================================================
# 8. SCHEMA EVOLUTION
# =============================================================================

# Add a new column
spark.sql("ALTER TABLE orders ADD COLUMN discount DOUBLE")

# Update with new column
spark.sql("UPDATE orders SET discount = 0.1 WHERE price > 500")

print("=== After schema evolution (added discount column) ===")
spark.sql(
    "SELECT order_id, product_name, price, discount FROM orders ORDER BY order_id"
).show()

# =============================================================================
# 9. TIME TRAVEL QUERIES
# =============================================================================

# View table history
print("=== Table History (Snapshots) ===")
spark.sql("SELECT * FROM bq_catalog.sales_data.orders.history").show(truncate=False)

# View snapshots
print("=== Snapshots ===")
spark.sql(
    "SELECT snapshot_id, committed_at, operation FROM bq_catalog.sales_data.orders.snapshots"
).show()

# Query a specific snapshot (replace with actual snapshot_id from history)
# spark.sql("SELECT * FROM orders VERSION AS OF <snapshot_id>").show()

# Query as of a timestamp
# spark.sql("SELECT * FROM orders TIMESTAMP AS OF '2024-01-17 12:00:00'").show()

# =============================================================================
# 10. TABLE MAINTENANCE
# =============================================================================

# Expire old snapshots (keep last 5)
spark.sql("""
    CALL bq_catalog.system.expire_snapshots(
        table => 'sales_data.orders',
        older_than => TIMESTAMP '2024-12-31 00:00:00',
        retain_last => 5
    )
""")

# Rewrite data files for optimization
spark.sql("""
    CALL bq_catalog.system.rewrite_data_files(
        table => 'sales_data.orders'
    )
""")

# =============================================================================
# 11. TABLE METADATA & INSPECTION
# =============================================================================

# Describe table
print("=== Table Schema ===")
spark.sql("DESCRIBE TABLE EXTENDED orders").show(truncate=False)

# View table properties
print("=== Table Properties ===")
spark.sql("SHOW TBLPROPERTIES orders").show(truncate=False)

# View partitions
print("=== Partitions ===")
spark.sql("SELECT * FROM bq_catalog.sales_data.orders.partitions").show()

# View files
print("=== Data Files ===")
spark.sql(
    "SELECT file_path, file_format, record_count FROM bq_catalog.sales_data.orders.files"
).show(truncate=False)

# =============================================================================
# 12. CLEANUP (OPTIONAL)
# =============================================================================

# Drop tables
# spark.sql("DROP TABLE IF EXISTS orders")
# spark.sql("DROP TABLE IF EXISTS customers")

# Drop namespace
# spark.sql("DROP NAMESPACE IF EXISTS sales_data")

# Stop Spark session
spark.stop()

In [1]:
from pyspark.sql import SparkSession

catalog_name = "rainbow-data-production-iceberg"
project_id = "rainbow-data-production-483609"
spark = (
    SparkSession.builder.appName("Spark-Iceberg")
    .config("spark.master", "local[*]")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.driver.port", "7078")
    .config("spark.blockManager.port", "7079")
    .config("spark.driver.memory", "2G")
    .config("spark.executor.memory", "2G")
    .config("spark.driver.userClassPathFirst", "false")
    .config("spark.executor.userClassPathFirst", "false")
    .config(
        f"spark.sql.catalog.{catalog_name}", "org.apache.iceberg.spark.SparkCatalog"
    )
    .config(f"spark.sql.catalog.{catalog_name}.type", "rest")
    .config(
        f"spark.sql.catalog.{catalog_name}.uri",
        "https://biglake.googleapis.com/iceberg/v1/restcatalog",
    )
    .config(
        f"spark.sql.catalog.{catalog_name}.warehouse",
        f"bq://projects/{project_id}",
    )
    .config(
        f"spark.sql.catalog.{catalog_name}.header.x-goog-user-project",
        project_id,
    )
    .config(
        f"spark.sql.catalog.{catalog_name}.rest.auth.type",
        "org.apache.iceberg.gcp.auth.GoogleAuthManager",
    )
    .config(
        f"spark.sql.catalog.{catalog_name}.io-impl",
        "org.apache.iceberg.gcp.gcs.GCSFileIO",
    )
    .config(f"spark.sql.catalog.{catalog_name}.rest-metrics-reporting-enabled", "false")
    .config(
        "spark.sql.extensions",
        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
    )
    .config("spark.sql.defaultCatalog", catalog_name)
    .config(
        "spark.jars.packages",
        ",".join(
            [
                "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.1",
                "org.apache.iceberg:iceberg-gcp-bundle:1.10.1",
                # "com.google.auth:google-auth-library-oauth2-http:1.41.0",
                # "com.google.auth:google-auth-library-credentials:1.41.0",
                # "com.google.guava:guava:32.1.2-jre",
                # "com.google.cloud:google-cloud-storage:2.61.0",
                # "com.google.cloud:libraries-bom:26.73.0",
            ]
        ),
    )
    .getOrCreate()
)


:: loading settings :: url = jar:file:/Users/tuan.tran1/Workspaces/Test/TestIceberg/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/tuan.tran1/.ivy2/cache
The jars for the packages stored in: /Users/tuan.tran1/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.apache.iceberg#iceberg-gcp-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f1482565-cc86-4b60-b400-9fac21e9da57;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.10.1 in central
	found org.apache.iceberg#iceberg-gcp-bundle;1.10.1 in central
:: resolution report :: resolve 115ms :: artifacts dl 17ms
	:: modules in use:
	org.apache.iceberg#iceberg-gcp-bundle;1.10.1 from central in [default]
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.10.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	----------------------------------

In [16]:
spark.sql("SHOW CATALOGS").show()

+--------------------+
|             catalog|
+--------------------+
|rainbow-data-prod...|
|       spark_catalog|
+--------------------+



In [17]:
spark.sql("SELECT current_catalog();").show()

+--------------------+
|   current_catalog()|
+--------------------+
|rainbow-data-prod...|
+--------------------+



In [None]:
# spark.sql("CREATE NAMESPACE IF NOT EXISTS test_namespace_cv ;")
spark.sql(
    "CREATE NAMESPACE IF NOT EXISTS test_namespace_cv LOCATION 'gs://rainbow-data-production-iceberg/test_namespace_cv' WITH DBPROPERTIES ('gcp-region' = 'us-central1');"
)

DataFrame[]

In [19]:
spark.sql("SHOW NAMESPACES").show()

+-----------------+
|        namespace|
+-----------------+
|             test|
|   test_namespace|
|  test_namespace1|
|test_namespace_cv|
+-----------------+



In [20]:
spark.sql("USE test_namespace_cv;")

DataFrame[]

In [21]:
spark.sql("USE test_namespace_cv;")

DataFrame[]

In [None]:
spark.sql(
    "CREATE TABLE IF NOT EXISTS sample_table (id BIGINT, data STRING) USING ICEBERG;"
)

DataFrame[]

In [23]:
spark.sql("SHOW TABLES").show()

+-----------------+------------+-----------+
|        namespace|   tableName|isTemporary|
+-----------------+------------+-----------+
|test_namespace_cv|sample_table|      false|
+-----------------+------------+-----------+



In [24]:
spark.sql("""
INSERT INTO sample_table VALUES
  (1, 'first'), (2, 'second'), (3, 'third')
""")

                                                                                

DataFrame[]

In [25]:
spark.sql("SELECT * FROM sample_table").show()

[Stage 8:>                                                          (0 + 1) / 1]

+---+------+
| id|  data|
+---+------+
|  1| first|
|  2|second|
|  3| third|
+---+------+



                                                                                

In [12]:
spark.sql("""
INSERT INTO sample_table VALUES
  (4, 'fourth'), (5, 'fifth'), (6, 'sixth');
""")

                                                                                

DataFrame[]

In [13]:
spark.sql("SELECT * FROM sample_table;").show()

[Stage 5:>                                                          (0 + 1) / 1]

+---+------+
| id|  data|
+---+------+
|  1| first|
|  2|second|
|  3| third|
|  4|fourth|
|  5| fifth|
|  6| sixth|
+---+------+



                                                                                