In [1]:
import pyspark
from pyspark.sql import SparkSession
import os

In [2]:
conf = (
pyspark.SparkConf()
.setAppName('spark-iceberg-lab1')
.set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.83.1,org.apache.hadoop:hadoop-aws:3.3.4')
.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
.set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
.set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
.set('spark.sql.catalog.nessie.uri', 'http://nessie:19120/api/v1')
.set('spark.sql.catalog.nessie.ref', 'main')
.set('spark.sql.catalog.nessie.authentication.type', 'NONE')
.set('spark.sql.catalog.nessie.warehouse', 's3a://warehouse/')
.set('spark.hadoop.fs.s3a.endpoint', 'http://minio:9000')
.set('spark.hadoop.fs.s3a.access.key', 'admin')
.set('spark.hadoop.fs.s3a.secret.key', 'password')
.set('spark.hadoop.fs.s3a.path.style.access', 'true')
.set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false')
)

In [3]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

:: loading settings :: url = jar:file:/home/docker/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/docker/.ivy2/cache
The jars for the packages stored in: /home/docker/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fa5ee98e-14f4-44c8-b56f-697a4e0d78c6;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.2 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.83.1 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 462ms :: artifacts dl 20ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central

In [5]:
spark.sql("show databases in nessie").show()

+---------+
|namespace|
+---------+
|     demo|
+---------+



In [6]:
spark.sql("show tables in nessie.demo").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|     demo|      tbl|      false|
+---------+---------+-----------+



In [7]:
spark.sql("""
CREATE TABLE nessie.demo.employee (
id INT,
role STRING,
department STRING,
salary FLOAT,
region STRING)
USING iceberg
""")

25/12/19 23:35:24 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


DataFrame[]

In [8]:
spark.sql("show databases in nessie").show()

+---------+
|namespace|
+---------+
|     demo|
+---------+



In [9]:
spark.sql("show tables in nessie.demo").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|     demo| employee|      false|
|     demo|      tbl|      false|
+---------+---------+-----------+



In [13]:
spark.sql("""
CREATE TABLE nessie.demo.emp_partitioned (
id INT,
role STRING,
department STRING)
USING iceberg
PARTITIONED BY (department)
""")

DataFrame[]

In [14]:
spark.sql("show tables in nessie.demo").show()

+---------+---------------+-----------+
|namespace|      tableName|isTemporary|
+---------+---------------+-----------+
|     demo|emp_partitioned|      false|
|     demo|       employee|      false|
|     demo|            tbl|      false|
+---------+---------------+-----------+



In [16]:
spark.sql("""
CREATE TABLE nessie.demo.emp_partitioned_month (
id INT,
role STRING,
department STRING,
join_date DATE
)
USING iceberg
PARTITIONED BY (months(join_date))
""")

DataFrame[]

In [17]:
spark.sql("show tables in nessie.demo").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|     demo|     emp_partitioned|      false|
|     demo|emp_partitioned_m...|      false|
|     demo|            employee|      false|
|     demo|                 tbl|      false|
+---------+--------------------+-----------+



In [18]:
spark.sql("""
ALTER TABLE nessie.demo.tbl RENAME TO tbl_renamed
""")

DataFrame[]

In [19]:
spark.sql("show tables in nessie.demo").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|     demo|     emp_partitioned|      false|
|     demo|emp_partitioned_m...|      false|
|     demo|            employee|      false|
|     demo|         tbl_renamed|      false|
+---------+--------------------+-----------+



In [20]:
spark.sql("""
ALTER TABLE nessie.demo.employee SET TBLPROPERTIES ('write.wap.enabled'='true')
""")

DataFrame[]

In [22]:
spark.sql("""
ALTER TABLE nessie.demo.employee ADD COLUMN manager STRING
""")

DataFrame[]

In [23]:
spark.sql("""
ALTER TABLE nessie.demo.employee RENAME COLUMN role TO title
""")

DataFrame[]

In [24]:
spark.sql("""
DESC TABLE nessie.demo.employee
""")

DataFrame[col_name: string, data_type: string, comment: string]

In [25]:
spark.sql("""
DESC TABLE nessie.demo.employee
""").show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|        id|      int|   NULL|
|     title|   string|   NULL|
|department|   string|   NULL|
|    salary|    float|   NULL|
|    region|   string|   NULL|
|   manager|   string|   NULL|
+----------+---------+-------+



In [26]:
spark.sql("""
DESC TABLE EXTENDED nessie.demo.employee
""").show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|                  id|                 int|   NULL|
|               title|              string|   NULL|
|          department|              string|   NULL|
|              salary|               float|   NULL|
|              region|              string|   NULL|
|             manager|              string|   NULL|
|                    |                    |       |
|  # Metadata Columns|                    |       |
|            _spec_id|                 int|       |
|          _partition|            struct<>|       |
|               _file|              string|       |
|                _pos|              bigint|       |
|            _deleted|             boolean|       |
|                    |                    |       |
|# Detailed Table ...|                    |       |
|                Name|nessie.demo.employee|       |
|           

In [30]:
spark.sql("""
DESC TABLE EXTENDED nessie.demo.employee
""").show(200, truncate=False)

+----------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                                                                                                                                                                                         |comment|
+----------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|id                          |int                         

In [31]:
spark.sql("""
DESC TABLE EXTENDED nessie.demo.emp_partitioned
""").show(200, False)

+----------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                                                                                                                                                                                  |comment|
+----------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|id                          |int                                                                                              

In [32]:
spark.sql("""
ALTER TABLE nessie.demo.employee ADD PARTITION FIELD region
""")

DataFrame[]

In [33]:
spark.sql("""
ALTER TABLE nessie.demo.employee WRITE ORDERED BY id ASC
""")

DataFrame[]

In [34]:
spark.sql("""
ALTER TABLE nessie.demo.employee SET IDENTIFIER FIELDS id
""")

IllegalArgumentException: Cannot add field id as an identifier field: not a required field

In [36]:
spark.sql("""
ALTER TABLE nessie.demo.employee ALTER COLUMN id SET NOT NULL;
""")

AnalysisException: Cannot change nullable column to non-nullable: id.; line 2 pos 0;
AlterColumn resolvedfieldname(StructField(id,IntegerType,true)), false
+- ResolvedTable org.apache.iceberg.spark.SparkCatalog@5d4beb37, demo.employee, nessie.demo.employee, [id#442, title#443, department#444, salary#445, region#446, manager#447]


In [37]:
spark.sql("DROP TABLE nessie.demo.tbl_renamed")

DataFrame[]

In [40]:
spark.sql("show tables in nessie.demo").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|     demo|     emp_partitioned|      false|
|     demo|emp_partitioned_m...|      false|
|     demo|            employee|      false|
+---------+--------------------+-----------+



In [41]:
spark.sql("SELECT * FROM nessie.demo.employee").show()

+---+-----+----------+------+------+-------+
| id|title|department|salary|region|manager|
+---+-----+----------+------+------+-------+
+---+-----+----------+------+------+-------+



In [42]:
df_emp = spark.table("nessie.demo.employee")

In [43]:
df_emp.show()

+---+-----+----------+------+------+-------+
| id|title|department|salary|region|manager|
+---+-----+----------+------+------+-------+
+---+-----+----------+------+------+-------+



In [44]:
spark.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [45]:
spark.sql("show tables in nessie").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|     demo|     emp_partitioned|      false|
|     demo|emp_partitioned_m...|      false|
|     demo|            employee|      false|
+---------+--------------------+-----------+



In [50]:
spark.sql("""
INSERT INTO nessie.demo.employee 
VALUES 
(1, 'Software Engineer', 'Engineering', 25000, 'NA', 'Jack'), 
(2, 'Director', 'Sales', 22000, 'EMEA', 'Bo W')
""")

                                                                                

DataFrame[]

In [52]:
spark.sql("""
select * from nessie.demo.employee
""").show()

[Stage 4:>                                                          (0 + 1) / 1]

+---+-----------------+-----------+-------+------+-------+
| id|            title| department| salary|region|manager|
+---+-----------------+-----------+-------+------+-------+
|  2|         Director|      Sales|22000.0|  EMEA|   Bo W|
|  1|Software Engineer|Engineering|25000.0|    NA|   Jack|
+---+-----------------+-----------+-------+------+-------+



                                                                                

In [53]:
# Read the Avro manifest list
df = spark.read.format("avro").load("s3a://warehouse/demo/employee_f1ae59e9-eab2-4bba-bf4a-1b2fff895d26/metadata/snap-4776718034125825857-1-a461d5ed-9ebd-4a76-88ef-83515d85d8a9.avro")

# Show it as a table (easy reading)
df.show(truncate=False)

AnalysisException: Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of Apache Avro Data Source Guide.

In [60]:
spark.sql("""
SELECT * FROM nessie.demo.employee.snapshots
""").show(truncate=False)

+-----------------------+-------------------+---------+---------+------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id|operation|manifest_list                                                                                                                                   |summary                                                                                                                                                                                                                                                                                         |


In [64]:
spark.sql("""
SELECT * FROM nessie.demo.employee.manifests
""").show(truncate=False)

+-------+------------------------------------------------------------------------------------------------------------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+--------------------------+
|content|path                                                                                                                    |length|partition_spec_id|added_snapshot_id  |added_data_files_count|existing_data_files_count|deleted_data_files_count|added_delete_files_count|existing_delete_files_count|deleted_delete_files_count|partition_summaries       |
+-------+------------------------------------------------------------------------------------------------------------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+------------------------+------------

In [65]:
spark.sql("""
SELECT * FROM nessie.demo.employee.metadata_log_entries
""").show(truncate=False)

+-----------------------+------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------------+----------------------+
|timestamp              |file                                                                                                                                |latest_snapshot_id |latest_schema_id|latest_sequence_number|
+-----------------------+------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------------+----------------------+
|2025-12-20 19:06:28.317|s3a://warehouse/demo/employee_f1ae59e9-eab2-4bba-bf4a-1b2fff895d26/metadata/00000-d636e9e1-f73b-4c55-9712-132f8c4e7d8d.metadata.json|NULL               |NULL            |NULL                  |
|2025-12-20 19:26:43.429|s3a://warehouse/demo/employee_f1ae59e9-eab2-4bba-bf4a-1b2fff895d26/metadata/00001-3efbdb2d-34de-4d2

In [66]:
spark.sql("""
SELECT * FROM nessie.demo.employee.files
""").show(truncate=False)

+-------+------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------+---------+------------+------------------+------------------------------------------------------+------------------------------------------------+------------------------------------------------+----------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-------------+------------+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|co

In [70]:
# Load the metadata file from your MinIO/S3 path
df = spark.read.option("multiLine", True).json("s3a://warehouse/demo/employee_f1ae59e9-eab2-4bba-bf4a-1b2fff895d26/metadata/00006-6f7f0bbf-6f64-46ec-8460-b8608b1ec724.metadata.json")

# View the structure
df.printSchema()
df.show()

root
 |-- current-schema-id: long (nullable = true)
 |-- current-snapshot-id: long (nullable = true)
 |-- default-sort-order-id: long (nullable = true)
 |-- default-spec-id: long (nullable = true)
 |-- format-version: long (nullable = true)
 |-- last-column-id: long (nullable = true)
 |-- last-partition-id: long (nullable = true)
 |-- last-sequence-number: long (nullable = true)
 |-- last-updated-ms: long (nullable = true)
 |-- location: string (nullable = true)
 |-- metadata-log: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- metadata-file: string (nullable = true)
 |    |    |-- timestamp-ms: long (nullable = true)
 |-- partition-specs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- fields: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- field-id: long (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |