In [3]:
from pyspark.sql.types import DoubleType, FloatType, LongType, StructType,StructField, StringType
schema = StructType([
  StructField("vendor_id", LongType(), True),
  StructField("trip_id", LongType(), True),
  StructField("trip_distance", FloatType(), True),
  StructField("fare_amount", DoubleType(), True),
  StructField("store_and_fwd_flag", StringType(), True)
])

df = spark.createDataFrame([], schema)
df.writeTo("demo.nyc.taxis").create()

                                                                                

In [4]:
schema = spark.table("demo.nyc.taxis").schema
data = [
    (1, 1000371, 1.8, 15.32, "N"),
    (2, 1000372, 2.5, 22.15, "N"),
    (2, 1000373, 0.9, 9.01, "N"),
    (1, 1000374, 8.4, 42.13, "Y")
  ]
df = spark.createDataFrame(data, schema)
df.writeTo("demo.nyc.taxis").append()

                                                                                

In [5]:
df = spark.table("demo.nyc.taxis").show()

+---------+-------+-------------+-----------+------------------+
|vendor_id|trip_id|trip_distance|fare_amount|store_and_fwd_flag|
+---------+-------+-------------+-----------+------------------+
|        1|1000371|          1.8|      15.32|                 N|
|        2|1000372|          2.5|      22.15|                 N|
|        2|1000373|          0.9|       9.01|                 N|
|        1|1000374|          8.4|      42.13|                 Y|
+---------+-------+-------------+-----------+------------------+



In [24]:
for k, v in spark.sparkContext.getConf().getAll():
    if "catalog" in k.lower() or "extensions" in k.lower():
        print(f"{k:<34} = {v}")

spark.sql.catalog.demo.s3.endpoint = http://minio:9000
spark.sql.catalogImplementation    = in-memory
spark.sql.catalog.demo.warehouse   = s3://warehouse/wh/
spark.sql.catalog.demo.io-impl     = org.apache.iceberg.aws.s3.S3FileIO
spark.sql.extensions               = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.demo.uri         = http://rest:8181
spark.sql.catalog.demo.type        = rest
spark.sql.catalog.demo             = org.apache.iceberg.spark.SparkCatalog
spark.sql.defaultCatalog           = demo


Iceberg Catalog Configuration Breakdown

- `spark.sql.catalogImplementation = in-memory`: 是一個 Spark 的傳統設定，用來指定 Spark SQL 的內建 catalog 是使用哪一種 metadata 儲存方式。這個設定跟你用的 Iceberg catalog（如 REST / Hive / Hadoop） 不一樣，它主要控制 Spark SQL 自己的預設行為，
- `spark.sql.defaultCatalog = demo`: Makes `demo` the default catalog. You can then run SQL queries without prefixing the catalog name (e.g., `SELECT * FROM db.table`).
- `spark.sql.catalog.demo = org.apache.iceberg.spark.SparkCatalog`: 指定使用哪一種 Iceberg catalog 實作。這裡用的是 SparkCatalog，即 Spark 的整合接口，supports a Hive Metastore or a Hadoop warehouse as a catalog。另個則是`org.apache.iceberg.spark.SparkSessionCatalog`: adds support for Iceberg tables to Spark's built-in catalog, and delegates to the built-in catalog for non-Iceberg tables(覆寫 Spark SQL 內建的 catalog, 會直接接管 Spark 預設命名空間)
- `spark.sql.catalog.demo.type = rest`
- `spark.sql.catalog.demo.uri = http://rest:8181`
- `spark.sql.catalog.demo.io-impl = org.apache.iceberg.aws.s3.S3FileIO`
- `spark.sql.catalog.demo.s3.endpoint = http://minio:9000`
- `spark.sql.catalog.demo.warehouse = s3://warehouse/wh/`: Base path for the warehouse directory