In [None]:
# Define the AWS env variables if you are using AWS Auth:
%env AWS_REGION=REGION
%env AWS_ACCESS_KEY_ID=KEY
%env AWS_SECRET_ACCESS_KEY=SECRET

In [2]:
import os
import findspark
from pyspark.sql import *
from pyspark import SparkConf

findspark.init()

conf = SparkConf()
# we need iceberg libraries and the nessie sql extensions
conf.set(
    "spark.jars.packages",
    f"org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,org.projectnessie:nessie-spark-extensions-3.3_2.12:0.44.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178",
)
# ensure python <-> java interactions are w/ pyarrow
conf.set("spark.sql.execution.pyarrow.enabled", "true")

# Config to change the IO implementation of target catalog; write to object store
conf.set("spark.sql.catalog.arctic.io-impl","org.apache.iceberg.aws.s3.S3FileIO")

# create catalog named arctic as an iceberg catalog
conf.set("spark.sql.catalog.arctic", "org.apache.iceberg.spark.SparkCatalog")

# tell the catalog that its a Nessie catalog
conf.set("spark.sql.catalog.arctic.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")

# set the location for the catalog to store data. Spark writes to this directory
conf.set("spark.sql.catalog.arctic.warehouse", "s3://bucket/")

# set the location of the Arctic/Nessie server.
conf.set("spark.sql.catalog.arctic.uri", "http://localhost:19120/api/v1")

# default branch for Arctic catalog to work on
conf.set("spark.sql.catalog.arctic.ref", "main")

# Authentication mechanism. Here, we use AWS with BEARER
conf.set("spark.sql.catalog.arctic.authentication.type", "BEARER")
conf.set("spark.sql.catalog.arctic.authentication.token", "ACCESS TOKEN")

# enable the extensions for both Nessie and Iceberg
conf.set(
    "spark.sql.extensions",
    "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions",
)

# finally, start up the Spark server
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

:: loading settings :: url = jar:file:/home/docker/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/docker/.ivy2/cache
The jars for the packages stored in: /home/docker/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.3_2.12 added as a dependency
org.projectnessie#nessie-spark-extensions-3.3_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1f92b084-5338-43a3-b8af-b4b4fa42f79b;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.0.0 in central
	found org.projectnessie#nessie-spark-extensions-3.3_2.12;0.44.0 in central
	found org.scala-lang#scala-reflect;2.12.17 in central
	found org.projectnessie#nessie-spark-extensions-grammar;0.44.0 in central
	found org.projectnessie#nessie-spark-antlr-runtime;0.44.0 in central
	found org.antlr#antlr4-runtime;4.11.1 in central
	found org.projectnessie#nessie-spark-extensions-base_2.12;0.44.0 in central
	found org.projectness

22/11/25 18:49:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/25 18:50:03 ERROR Inbox: Ignoring error
java.lang.NullPointerException
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$register(BlockManagerMasterEndpoint.scala:579)
	at org.apache.spark.storage.BlockManagerMasterEndpoint$$anonfun$receiveAndReply$1.applyOrElse(BlockManagerMasterEndpoint.scala:121)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:103)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
	at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
	at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.

# Create a new branch called work in the catalog for our project

In [4]:
spark.sql("CREATE BRANCH work IN arctic").toPandas()

Unnamed: 0,refType,name,hash
0,Branch,work,d1d5da6c513e6bc2b615a1de89c7a30b587302a84bb60b...


# Use the newly created branch

In [4]:
spark.sql("USE REFERENCE work IN arctic")

DataFrame[refType: string, name: string, hash: string]

# Create a new Iceberg table 'sales'

In [5]:
spark.sql(
    """CREATE TABLE IF NOT EXISTS arctic.salesdip.sales
            (id STRING, name STRING, product STRING, price STRING, date STRING) USING iceberg"""
)

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


DataFrame[]

# INSERT new data from a CSV file

In [6]:
spark.sql(
    """CREATE OR REPLACE TEMPORARY VIEW salesview USING csv
            OPTIONS (path "salesdata.csv", header true)"""
)
spark.sql("INSERT INTO arctic.salesdip.sales SELECT * FROM salesview")

                                                                                

DataFrame[]

# Read the values

In [11]:
spark.sql("SELECT * FROM arctic.salesdip.sales LIMIT 5").toPandas()

                                                                                

Unnamed: 0,id,name,product,price,date
0,1,Selinda Rheaume,Wine - Prosecco Valdobiaddene,10.31,10/20/2022
1,2,Wynnie Gozzard,"Wine - Red, Wolf Blass, Yellow",20.05,10/20/2022
2,3,Patten Whitter,Crackers - Soda / Saltins,39.75,10/20/2022
3,4,Hulda Eslie,Roe - White Fish,100.0,10/20/2022
4,5,Chrystal Haggie,Wine - Delicato Merlot,35.82,10/20/2022


# DML - Update the table with new data

In [10]:
spark.sql("UPDATE arctic.salesdip.sales SET price = 100 WHERE id = 4")

                                                                                

DataFrame[]

# Iceberg default Metadata Tables: we query the 'files', 'history', 'snapshots' table

In [13]:
spark.sql("SELECT * FROM arctic.salesdip.sales.files").toPandas()

                                                                                

Unnamed: 0,content,file_path,file_format,spec_id,record_count,file_size_in_bytes,column_sizes,value_counts,null_value_counts,nan_value_counts,lower_bounds,upper_bounds,key_metadata,split_offsets,equality_ids,sort_order_id
0,0,s3://dremio-58d63711-a193-4ad2-a226-52dfcb1291...,PARQUET,0,500,14775,"{1: 949, 2: 4912, 3: 5979, 4: 1584, 5: 101}","{1: 500, 2: 500, 3: 500, 4: 500, 5: 500}","{1: 0, 2: 0, 3: 0, 4: 0, 5: 0}",{},"{1: [49], 2: [65, 98, 98, 111, 116, 32, 72, 10...","{1: [57, 57], 2: [90, 101, 98, 117, 108, 111, ...",,[4],,0


In [14]:
spark.sql("SELECT * FROM arctic.salesdip.sales.history").toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,made_current_at,snapshot_id,parent_id,is_current_ancestor
0,2022-11-25 19:00:31.662,6202746828304796124,,True
1,2022-11-25 20:23:15.635,2878892377637294950,6.202747e+18,True
2,2022-11-25 20:24:46.203,1747027818651198456,2.878892e+18,True


In [15]:
spark.sql("SELECT * FROM arctic.salesdip.sales.snapshots").toPandas()

  series = series.astype(t, copy=False)


Unnamed: 0,committed_at,snapshot_id,parent_id,operation,manifest_list,summary
0,2022-11-25 19:00:31.662,6202746828304796124,,append,s3://dremio-58d63711-a193-4ad2-a226-52dfcb1291...,"{'spark.app.id': 'local-1669402184833', 'chang..."
1,2022-11-25 20:23:15.635,2878892377637294950,6.202747e+18,overwrite,s3://dremio-58d63711-a193-4ad2-a226-52dfcb1291...,"{'spark.app.id': 'local-1669402184833', 'chang..."
2,2022-11-25 20:24:46.203,1747027818651198456,2.878892e+18,overwrite,s3://dremio-58d63711-a193-4ad2-a226-52dfcb1291...,"{'added-data-files': '1', 'total-equality-dele..."


# Iceberg Time Travel demo

In [16]:
spark.sql("SELECT * FROM arctic.salesdip.sales TIMESTAMP AS OF '2022-11-25 19:00:31.662' ").toPandas()

                                                                                

Unnamed: 0,id,name,product,price,date
0,1,Selinda Rheaume,Wine - Prosecco Valdobiaddene,10.31,10/20/2022
1,2,Wynnie Gozzard,"Wine - Red, Wolf Blass, Yellow",20.05,10/20/2022
2,3,Patten Whitter,Crackers - Soda / Saltins,39.75,10/20/2022
3,4,Hulda Eslie,Roe - White Fish,37.10,10/20/2022
4,5,Chrystal Haggie,Wine - Delicato Merlot,35.82,10/20/2022
...,...,...,...,...,...
495,496,Winny McGlone,Pasta - Fusili Tri - Coloured,67.78,10/20/2022
496,497,Hadleigh Ellinor,Oats Large Flake,58.19,10/20/2022
497,498,Kimberlee Hancill,"Soup - Knorr, Country Bean",81.64,10/20/2022
498,499,Anet Scaife,Soup - French Onion,98.51,10/20/2022


In [18]:
spark.sql("SELECT * FROM arctic.salesdip.sales TIMESTAMP AS OF '2022-11-25 20:24:46.203' ").toPandas()

                                                                                

Unnamed: 0,id,name,product,price,date
0,1,Selinda Rheaume,Wine - Prosecco Valdobiaddene,10.31,10/20/2022
1,2,Wynnie Gozzard,"Wine - Red, Wolf Blass, Yellow",20.05,10/20/2022
2,3,Patten Whitter,Crackers - Soda / Saltins,39.75,10/20/2022
3,4,Hulda Eslie,Roe - White Fish,100,10/20/2022
4,5,Chrystal Haggie,Wine - Delicato Merlot,35.82,10/20/2022
...,...,...,...,...,...
495,496,Winny McGlone,Pasta - Fusili Tri - Coloured,67.78,10/20/2022
496,497,Hadleigh Ellinor,Oats Large Flake,58.19,10/20/2022
497,498,Kimberlee Hancill,"Soup - Knorr, Country Bean",81.64,10/20/2022
498,499,Anet Scaife,Soup - French Onion,98.51,10/20/2022
