Nessie Demo
===========
This demo showcases how to use Nessie python API along with Spark3 from Iceberg

Initialize Pyspark + Nessie environment
----------------------------------------------

In [None]:
import os
import findspark
from pyspark.sql import *
from pyspark import SparkConf, SparkContext
from py4j.java_gateway import java_import
findspark.init()

conf = SparkConf()
conf.set("spark.jars", "/path/to/iceberg-spark3-runtime-apache-iceberg-0.11.0.jar ")
conf.set("spark.sql.execution.pyarrow.enabled", "true")
conf.set("spark.sql.catalog.nessie.warehouse", 'file://' + os.getcwd() + '/spark_warehouse')
conf.set("spark.sql.catalog.nessie.url", "http://localhost:19120/api/v1")
conf.set("spark.sql.catalog.nessie.ref", "main")
conf.set("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
conf.set("spark.sql.catalog.nessie.auth_type", "NONE")
conf.set("spark.sql.catalog.nessie.cache-enabled", "false")
conf.set("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
jvm = sc._gateway.jvm

java_import(jvm, "org.apache.iceberg.nessie.NessieCatalog")
java_import(jvm, "org.apache.iceberg.catalog.TableIdentifier")
java_import(jvm, "org.apache.iceberg.Schema")
java_import(jvm, "org.apache.iceberg.types.Types")
java_import(jvm, "org.apache.iceberg.PartitionSpec")

Create Extra Spark Contexts
------------------------------

Because of the way Spark Sessions cache Catalogs we can only have 1 Nessie Catalog per Spark Session. Below we create a few Spark Sessions to deal with the different Nessie branches we will use in the course of the Demo. Typically in a real life scenario we would create one Nessie catalog per Spark insance and each of the below sections would be run by different users on different branches.

In [None]:
# session for dev branch
spark_dev = spark.newSession()
spark_dev.conf.set("spark.sql.catalog.nessie.ref", "dev" )

# session for ETL branch
spark_etl = spark.newSession()
spark_etl.conf.set("spark.sql.catalog.nessie.ref", "etl" )

# session for experiment branch
spark_experiment = spark.newSession()
spark_experiment.conf.set("spark.sql.catalog.nessie.ref", "experiment" )

Set up nessie branches
----------------------------

- Branch `main` already exists
- Create branch `dev`
- List all branches (pipe JSON result into jq)

In [None]:
!nessie branch dev

In [None]:
!nessie --verbose branch

Create tables under dev branch
-------------------------------------

Creating two tables under the `dev` branch:
- region
- nation

It is not yet possible to create table using pyspark and iceberg, so Java code
is used instead

In [None]:
catalog = jvm.NessieCatalog()
catalog.setConf(sc._jsc.hadoopConfiguration())
catalog.initialize("nessie", {'ref': 'dev', 'url': 'http://localhost:19120/api/v1', "warehouse": 'file://' + os.getcwd() + '/spark_warehouse'})

# Creating region table
region_name = jvm.TableIdentifier.parse("testing.region")
region_schema = jvm.Schema([
    jvm.Types.NestedField.optional(1, "R_REGIONKEY", jvm.Types.LongType.get()),
    jvm.Types.NestedField.optional(2, "R_NAME", jvm.Types.StringType.get()),
    jvm.Types.NestedField.optional(3, "R_COMMENT", jvm.Types.StringType.get()),
])
region_spec = jvm.PartitionSpec.unpartitioned()

region_table = catalog.createTable(region_name, region_schema, region_spec)
region_df = spark_dev.read.load("data/region.parquet")
region_df.write.format("iceberg").mode("overwrite").save("nessie.testing.region")

# Creating nation table
nation_name = jvm.TableIdentifier.parse("testing.nation")
nation_schema = jvm.Schema([
    jvm.Types.NestedField.optional(1, "N_NATIONKEY", jvm.Types.LongType.get()),
    jvm.Types.NestedField.optional(2, "N_NAME", jvm.Types.StringType.get()),
    jvm.Types.NestedField.optional(3, "N_REGIONKEY", jvm.Types.LongType.get()),
    jvm.Types.NestedField.optional(4, "N_COMMENT", jvm.Types.StringType.get()),
])
nation_spec = jvm.PartitionSpec.builderFor(nation_schema).truncate("N_NAME", 2).build()
nation_table = catalog.createTable(nation_name, nation_schema, nation_spec)

nation_df = spark_dev.read.load("data/nation.parquet")
nation_df.write.format("iceberg").mode("overwrite").save("nessie.testing.nation")


Check generated tables
----------------------------

Check tables generated under the dev branch (and that the main branch does not
have any tables)

In [None]:
!nessie contents --list

In [None]:
!nessie contents --list --ref dev

Note that the `dev` and `main` branches point to different commits now

In [None]:
!nessie --verbose branch

Dev promotion
-------------

Promote dev branch promotion to main.

* main now has the same tables as dev
* main and dev point to the same commit

In [None]:
!nessie merge dev -b main --force

In [None]:
!nessie contents --list --ref etl

In [None]:
!nessie --verbose branch

Create `etl` branch
----------------------

- Create a branch `etl` out of `main`
- add data to nation
- alter the schema of region
- create table city
- query the tables in `etl`
- query the tables in `main`
- promote `etl` branch to `main`

In [None]:
!nessie branch etl main

In [None]:
Nation = Row("N_NATIONKEY", "N_NAME", "N_REGIONKEY", "N_COMMENT")
new_nations = spark_etl.createDataFrame([
    Nation(25, "SYLDAVIA", 3, "King Ottokar's Sceptre"),
    Nation(26, "SAN THEODOROS", 1, "The Picaros")])
new_nations.write.format("iceberg").mode("append").save("nessie.testing.nation")

In [None]:
# changing the default branch
etl_catalog = jvm.NessieCatalog()
etl_catalog.setConf(sc._jsc.hadoopConfiguration())
etl_catalog.initialize("etl", {'ref': 'etl', 'url': 'http://localhost:19120/api/v1', "warehouse": 'file://' + os.getcwd() + '/spark_warehouse'})
etl_catalog.loadTable(region_name).updateSchema().addColumn('R_ABBREV', jvm.Types.StringType.get()).commit()

In [None]:
# Creating city table
spark_etl.sql("CREATE TABLE nessie.testing.city (C_CITYKEY BIGINT, C_NAME STRING, N_NATIONKEY BIGINT, C_COMMNT STRING) USING iceberg PARTITIONED BY (N_NATIONKEY)")

In [None]:
from pynessie import init
nessie = init()
nessie.list_keys('main').entries

In [None]:
[i.name for i in nessie.list_keys('etl').entries]

In [None]:
{i.name:i.hash_ for i in nessie.list_references()}

In [None]:
nessie.merge('main', 'etl')

In [None]:
{i.name:i.hash_ for i in nessie.list_references()}

Create `experiment` branch
--------------------------------

- create `experiment` branch from `main`
- drop `nation` table
- add data to `region` table
- compare `experiment` and `main` tables

In [None]:
!nessie branch experiment main

In [None]:
# changing the default branch
catalog = jvm.NessieCatalog()
catalog.setConf(sc._jsc.hadoopConfiguration())
catalog.initialize("experiment", {'ref': 'experiment', 'url': 'http://localhost:19120/api/v1', "warehouse": 'file://' + os.getcwd() + '/spark_warehouse'})

catalog.dropTable(jvm.TableIdentifier.parse("testing.nation"), False)

In [None]:
spark_experiment.sql('INSERT INTO TABLE nessie.testing.region VALUES (5, "AUSTRALIA", "Let\'s hop there", "AUS")')
spark_experiment.sql('INSERT INTO TABLE nessie.testing.region VALUES (6, "ANTARTICA", "It\'s cold", "ANT")')

In [None]:
!nessie contents --list --ref experiment

Lets take a look at the contents of the region table on the experiment branch.
Notice the use of the `nessie` catalog.

In [None]:
spark.sql("select * from nessie.testing.`region@experiment`").toPandas()

and compare to the contents of the region table on the main branch. Notice the
use of `@main` to view data on the main branch

In [None]:
spark.sql("select * from nessie.testing.region").toPandas()