Self check to write a small delta table and then read it from a new spark session.

In [None]:
result = {
    "area": "spark",
    "description": "Check that spark can use iceberg tables.",
    "passed": False,
    "message": "",
    "plugin": "spark",
}

In [4]:
import json

from freeds.spark import get_spark_session, show_spark_info

try:
    spark = get_spark_session("self-check1")
    cfg = spark.sparkContext.getConf().getAll()
    cfg.sort()
    for item in cfg:
        print(item)
    db_name = "freeds_cat.selfcheck_db"
    table_name = f"{db_name}.selfcheck_tbl"

    # create some data in delta
    print(f"Dropping and recreating database {db_name}")
    spark.sql(f"DROP DATABASE IF EXISTS {db_name} CASCADE")
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
    print(f"Writing table {table_name}")
    data = spark.range(100)
    (
        data.write.option(  # .mode("overwrite")  # Options: 'overwrite', 'append', 'ignore', 'error' (default)
            "mergeSchema", "true"
        )
        .format("iceberg")  # Options: 'parquet', 'csv', 'json', 'orc', 'iceberg', 'delta' etc.
        .saveAsTable(table_name)
    )
    spark.stop()

    # read some data in delta
    spark = get_spark_session("self-check2")
    data = spark.table(table_name)
    show_spark_info(spark)
    data.show(5)

    # clean up
    spark.sql(f"DROP DATABASE IF EXISTS {db_name} CASCADE")
    spark.stop()

    result["message"] = "Executed spark cell ok."
    result["passed"] = True

except Exception as ex:
    result["message"] = str(ex)
    print(str(ex))
    result["passed"] = False

25/09/24 18:37:48 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


('spark.app.id', 'app-20250924183423-0001')
('spark.app.name', 'self-check1')
('spark.app.startTime', '1758738862975')
('spark.app.submitTime', '1758738862833')
('spark.driver.extraClassPath', '/opt/freeds/spark/jars/*')
('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/jdk.internal.ref=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL

In [None]:
from pyspark.sql import SparkSession

# --- Secrets/configs ---
s3_cfg = {
    "endpoint": "http://s3-minio:9900",
    "access_key": "kbmB5u0xCQs97d0vJxjx",
    "secret_key": "Tjq8PW7h0GU7rhiMrNMRrvCfpOhfIgvV8dwEhyIJ",
    "warehouse": "s3a://dwh/warehouse/"
}

jdbc_cfg = {
    "uri": "jdbc:postgresql://postgres:5432/iceberg",
    "user": "jdbc_cat",
    "password": "muppgpt"
}

# --- Spark session setup ---
spark = (
    SparkSession.builder
    .appName("IcebergJDBCTest")
    .master("spark://spark-master:7077")

    # Extra jars (Iceberg + Hadoop + Postgres)
    .config("spark.driver.extraClassPath", "/opt/freeds/spark/jars/*")
    .config("spark.executor.extraClassPath", "/opt/freeds/spark/jars/*")

    # Iceberg extensions
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")

    # JDBC catalog
    .config("spark.sql.catalog.freeds_cat", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.freeds_cat", "org.apache.iceberg.spark.SparkCatalog")
    spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.jdbc.JdbcCatalog \
    .config("spark.sql.catalog.freeds_cat.type", "jdbc")
    .config("spark.sql.catalog.freeds_cat.uri", jdbc_cfg["uri"])
    .config("spark.sql.catalog.freeds_cat.warehouse", s3_cfg["warehouse"])
    .config("spark.sql.catalog.freeds_cat.jdbc.user", jdbc_cfg["user"])
    .config("spark.sql.catalog.freeds_cat.jdbc.password", jdbc_cfg["password"])

    # S3 (MinIO) access
    .config("spark.hadoop.fs.s3a.endpoint", s3_cfg["endpoint"])
    .config("spark.hadoop.fs.s3a.access.key", s3_cfg["access_key"])
    .config("spark.hadoop.fs.s3a.secret.key", s3_cfg["secret_key"])
    .config("spark.hadoop.fs.s3a.path.style.access", "true")

    .getOrCreate()
)

# --- Test Spark session ---
print("Available catalogs:")
for cat in spark.catalog.listCatalogs():
    print(cat)

# Example: create a database in the JDBC catalog
spark.sql("CREATE SCHEMA IF NOT EXISTS freeds_cat.iceberg_db")

# Example: create a table using Iceberg
spark.sql("""
CREATE TABLE IF NOT EXISTS freeds_cat.iceberg_db.test_table (
    id INT,
    name STRING
) USING ICEBERG
""")


In [None]:
print(json.dumps(result, indent=4))