In [1]:
import pyspark
from pyspark.sql import SparkSession
import os

print("Loading hive-site.xml from", os.environ.get("HADOOP_CONF_DIR"))

conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .set('spark.sql.catalog.type', 'hive')
        .set('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.iceberg_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        .set('spark.jars.packages', 'com.amazonaws:aws-java-sdk-bundle:1.11.1026,org.apache.hadoop:hadoop-aws:3.3.2,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,org.apache.iceberg:iceberg-spark3-extensions:0.13.1')
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
)

Loading hive-site.xml from /opt/spark/conf/


In [2]:
# Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()

:: loading settings :: url = jar:file:/home/docker/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/docker/.ivy2/cache
The jars for the packages stored in: /home/docker/.ivy2/jars
com.amazonaws#aws-java-sdk-bundle added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.iceberg#iceberg-spark-runtime-3.3_2.12 added as a dependency
org.apache.iceberg#iceberg-spark3-extensions added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-149583cf-f28f-4797-a5be-421cf18922eb;1.0
	confs: [default]
	found com.amazonaws#aws-java-sdk-bundle;1.11.1026 in central
	found org.apache.hadoop#hadoop-aws;3.3.2 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.3.1 in central
	found org.apache.iceberg#iceberg-spark3-extensions;0.13.1 in central
	found org.antlr#antlr4;4.7.1 in central
	found org.antlr#antlr4-runtime;4.7.1 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego

23/08/15 13:46:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Create database
spark.sql("CREATE DATABASE IF NOT EXISTS demo_hms").show()

23/08/15 13:46:06 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
++
||
++
++



In [4]:
# Create table
spark.sql("CREATE TABLE IF NOT EXISTS demo_hms.currency (name STRING) USING iceberg;").show()

++
||
++
++



In [5]:
# Insert Some Data
spark.sql("INSERT INTO demo_hms.currency VALUES ('EUR'), ('USD'), ('GBP')").show()

                                                                                

++
||
++
++



In [6]:
# Query the Data
spark.sql("SELECT * FROM demo_hms.currency;").show()

+----+
|name|
+----+
| EUR|
| USD|
| GBP|
+----+



In [7]:
# Remove the Data
spark.sql("DELETE FROM demo_hms.currency;").show()

++
||
++
++



In [8]:
# View snapshots history
spark.sql("SELECT * FROM spark_catalog.demo_hms.currency.snapshots;").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2023-08-15 13:46:...|5474710415031502818|               null|   append|s3a://warehouse-h...|{spark.app.id -> ...|
|2023-08-15 13:46:...|3772559213968713431|5474710415031502818|   delete|s3a://warehouse-h...|{spark.app.id -> ...|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+



In [9]:
# View timestamp of snapshot when data was appended(before deletion)
spark.sql("SELECT committed_at FROM spark_catalog.demo_hms.currency.snapshots WHERE operation='append'").show(10, False)

23/08/15 13:46:15 WARN SparkScanBuilder: Failed to check if IsNotNull(operation) can be pushed down: Cannot find field 'operation' in struct: struct<>
23/08/15 13:46:15 WARN SparkScanBuilder: Failed to check if EqualTo(operation,append) can be pushed down: Cannot find field 'operation' in struct: struct<>
+----------------------+
|committed_at          |
+----------------------+
|2023-08-15 13:46:12.98|
+----------------------+



In [11]:
# Let's query the data before deletion with time travel, replace timestamp with commited_at from above
spark.sql("SELECT * FROM spark_catalog.demo_hms.currency TIMESTAMP AS OF '2023-08-15 13:46:12.98'").show()

+----+
|name|
+----+
| EUR|
| USD|
| GBP|
+----+



In [None]:
# Cleanup
spark.sql("DROP TABLE demo_hms.currency PURGE").show()