In [2]:
# define Spark client

from pyspark.sql import Row
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .config("spark.sql.warehouse.dir", "/usr/local/hadoop/warehouse") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hive") \
    .config("spark.sql.catalog.iceberg.uri", "thrift://hivemetastore:9083") \
    .config("spark.sql.catalog.iceberg.cache-enabled", False) \
    .getOrCreate()

spark.sparkContext.setLogLevel('ERROR')

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/04 14:02:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# clean the table if already exists

spark.sql("""
DROP TABLE IF EXISTS iceberg.default.bank_transfers PURGE
""")

In [None]:
# create the table we will use in further excercises

spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg.default.bank_transfers (
    id bigint COMMENT 'transfer id',
    amount int COMMENT 'transferred amount, expressed in cents',
    transferred_from string COMMENT 'initiator of the transfer',
    transferred_to string COMMENT 'receiver of the transfer',
    timestamp timestamp COMMENT 'time of the transfer'
) PARTITIONED BY (days(timestamp))
""")

In [None]:
# describe the created table

spark.sql("""
DESCRIBE TABLE EXTENDED iceberg.default.bank_transfers
""").show(50, False)

In [None]:
# insert data into the table using SQL

spark.sql("""
INSERT INTO iceberg.default.bank_transfers VALUES
    (1, 12000, "ACME INC",   "ASTROCORP",  TIMESTAMP"2022-11-14T00:55:00"),
    (2, 24000, "John Doe",   "Jane Doe",   TIMESTAMP"2022-11-15T02:11:00"),
    (3,   500, "Deborah S.", "Michael C.", TIMESTAMP"2022-11-17T16:25:07")
""")

In [None]:
# insert more data to the table, using the DataFrame API

from datetime import datetime
from pyspark.sql import Row
from pyspark.sql.functions import to_timestamp


df = spark.createDataFrame([
    Row(
        id=4,
        amount=200,
        transferred_from="CTX Inc.",
        transferred_to="XYZ GmbH",
        timestamp=datetime.fromisoformat("2022-12-01T07:32:18")
    ),
])

df.writeTo("iceberg.default.bank_transfers").append()

In [None]:
# check the correctness of the inserted data

spark.sql("""
SELECT * FROM iceberg.default.bank_transfers
ORDER BY id
""").show()

In [None]:
# delete all inserted data and verify that the table is empty

spark.sql("""
DELETE FROM iceberg.default.bank_transfers
""")

spark.sql("""
SELECT * FROM iceberg.default.bank_transfers
""").show()

In [5]:
# query the history of table commits

spark.sql("""
SELECT * FROM iceberg.default.bank_transfers.history
""").show(truncate=False)

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2023-03-04 11:24:35.442|2746907607546786975|null               |true               |
|2023-03-04 11:32:18.54 |5993786463064479871|2746907607546786975|true               |
|2023-03-04 12:14:04.219|6461066575791651155|5993786463064479871|true               |
|2023-03-04 12:52:45.43 |3405190307090044624|6461066575791651155|true               |
|2023-03-04 12:53:00.982|7378494672306874711|3405190307090044624|true               |
+-----------------------+-------------------+-------------------+-------------------+



In [7]:
# query existing snapshots

spark.sql("""
SELECT snapshot_id, manifest_list FROM iceberg.default.bank_transfers.snapshots
""").show(truncate=False)

+-------------------+-----------------------------------------------------------------------------------------------------------------------------+
|snapshot_id        |manifest_list                                                                                                                |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------+
|2746907607546786975|file:/usr/local/hadoop/warehouse/bank_transfers/metadata/snap-2746907607546786975-1-b56309c7-e352-4a2d-ba40-8fc3e3e82e68.avro|
|5993786463064479871|file:/usr/local/hadoop/warehouse/bank_transfers/metadata/snap-5993786463064479871-1-b6a71d61-fd2a-4093-a5d3-821db514303c.avro|
|6461066575791651155|file:/usr/local/hadoop/warehouse/bank_transfers/metadata/snap-6461066575791651155-1-76496de8-a61a-4773-af93-a08c847c54fc.avro|
|3405190307090044624|file:/usr/local/hadoop/warehouse/bank_transfers/metadata/snap-3405190307090044624-1-87c94d5

In [9]:
# query metadata log entries

spark.sql("""
SELECT file FROM iceberg.default.bank_transfers.metadata_log_entries
""").show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------+
|file                                                                                                             |
+-----------------------------------------------------------------------------------------------------------------+
|file:/usr/local/hadoop/warehouse/bank_transfers/metadata/00000-f916bee8-78e6-4dd9-aedf-675833d6f57e.metadata.json|
|file:/usr/local/hadoop/warehouse/bank_transfers/metadata/00001-b504896f-ae68-4da2-a6b3-652535a64a3f.metadata.json|
|file:/usr/local/hadoop/warehouse/bank_transfers/metadata/00002-0cff973b-9951-4b41-8c4f-2e2049160521.metadata.json|
|file:/usr/local/hadoop/warehouse/bank_transfers/metadata/00003-7e6516e8-2cb5-4365-805b-75b603c9309a.metadata.json|
|file:/usr/local/hadoop/warehouse/bank_transfers/metadata/00004-f876d760-0aae-410a-a9c3-a2b9aadd49ff.metadata.json|
|file:/usr/local/hadoop/warehouse/bank_transfers/metadata/00005-a950b278