Nessie Demo
===========
This demo showcases how to use Nessie python API along with Spark3 from Iceberg

Initialize Pyspark + Nessie environment
----------------------------------------------

In [1]:
import os
from pyspark.sql import *
from pyspark import SparkConf, SparkContext
from py4j.java_gateway import java_import

spark = SparkSession.builder \
                    .config("spark.sql.execution.pyarrow.enabled", "true") \
                    .config("spark.hadoop.fs.defaultFS", 'file://' + os.getcwd() + '/spark_warehouse') \
                    .config("spark.hadoop.nessie.url", "http://nessie:19120/api/v1") \
                    .config("spark.hadoop.nessie.ref", "main") \
                    .config("spark.sql.catalog.nessie", "com.dremio.nessie.iceberg.spark.NessieIcebergSparkCatalog") \
                    .getOrCreate()
sc = spark.sparkContext
jvm = sc._gateway.jvm

java_import(jvm, "com.dremio.nessie.iceberg.NessieCatalog")
java_import(jvm, "org.apache.iceberg.catalog.TableIdentifier")
java_import(jvm, "org.apache.iceberg.Schema")
java_import(jvm, "org.apache.iceberg.types.Types")
java_import(jvm, "org.apache.iceberg.PartitionSpec")

Set up nessie branches
----------------------------

- Branch `main` already exists
- Create branch `dev`
- List all branches (pipe JSON result into jq)

In [2]:
!nessie branch dev




In [3]:
!nessie --verbose branch

[33m* main  e0b41c30f0710277532f51242994e10acfdc46bf comment
[0m  dev   e0b41c30f0710277532f51242994e10acfdc46bf comment



Create tables under dev branch
-------------------------------------

Creating two tables under the `dev` branch:
- region
- nation

It is not yet possible to create table using pyspark and iceberg, so Java code
is used instead

In [4]:
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("nessie.ref", "dev")
catalog = jvm.NessieCatalog(sc._jsc.hadoopConfiguration())

# Creating region table
region_name = jvm.TableIdentifier.parse("testing.region")
region_schema = jvm.Schema([
    jvm.Types.NestedField.optional(1, "R_REGIONKEY", jvm.Types.LongType.get()),
    jvm.Types.NestedField.optional(2, "R_NAME", jvm.Types.StringType.get()),
    jvm.Types.NestedField.optional(3, "R_COMMENT", jvm.Types.StringType.get()),
])
region_spec = jvm.PartitionSpec.unpartitioned()

region_table = catalog.createTable(region_name, region_schema, region_spec)
region_df = spark.read.load("data/region.parquet")
region_df.write.option('hadoop.nessie.ref', 'dev').format("iceberg").mode("overwrite").save("testing.region")

# Creating nation table
nation_name = jvm.TableIdentifier.parse("testing.nation")
nation_schema = jvm.Schema([
    jvm.Types.NestedField.optional(1, "N_NATIONKEY", jvm.Types.LongType.get()),
    jvm.Types.NestedField.optional(2, "N_NAME", jvm.Types.StringType.get()),
    jvm.Types.NestedField.optional(3, "N_REGIONKEY", jvm.Types.LongType.get()),
    jvm.Types.NestedField.optional(4, "N_COMMENT", jvm.Types.StringType.get()),
])
nation_spec = jvm.PartitionSpec.builderFor(nation_schema).truncate("N_NAME", 2).build()
nation_table = catalog.createTable(nation_name, nation_schema, nation_spec)

nation_df = spark.read.load("data/nation.parquet")
nation_df.write.option('hadoop.nessie.ref', 'dev').format("iceberg").mode("overwrite").save("testing.nation")


Check generated tables
----------------------------

Check tables generated under the dev branch (and that the main branch does not
have any tables)

In [None]:
!nessie contents --list

In [None]:
!nessie contents --list --ref dev

Note that the `dev` and `main` branches point to different commits now

In [None]:
!nessie --verbose branch

Dev promotion
-------------

Promote dev branch promotion to main.

* main now has the same tables as dev
* main and dev point to the same commit

In [None]:
!nessie merge dev --force

In [None]:
!nessie contents --list

In [None]:
!nessie --verbose branch

Create `etl` branch
----------------------

- Create a branch `etl` out of `main`
- add data to nation
- alter the schema of region
- create table city
- query the tables in `etl`
- query the tables in `main`
- promote `etl` branch to `main`

In [None]:
!nessie branch etl main

In [None]:
Nation = Row("N_NATIONKEY", "N_NAME", "N_REGIONKEY", "N_COMMENT")
new_nations = spark.createDataFrame([
    Nation(25, "SYLDAVIA", 3, "King Ottokar's Sceptre"),
    Nation(26, "SAN THEODOROS", 1, "The Picaros")])
new_nations.write.option('hadoop.nessie.ref', 'etl').format("iceberg").mode("append").save("testing.nation")

In [None]:
# changing the default branch
hadoop_conf.set('nessie.ref', 'etl')

etl_catalog = jvm.NessieCatalog(hadoop_conf)
etl_catalog.loadTable(region_name).updateSchema().addColumn('R_ABBREV', jvm.Types.StringType.get()).commit()

In [None]:
# Creating city table
sc.getConf().set("spark.hadoop.nessie.ref", "etl")
spark.sql("CREATE TABLE nessie.testing.city (C_CITYKEY BIGINT, C_NAME STRING, N_NATIONKEY BIGINT, C_COMMNT STRING) USING iceberg PARTITIONED BY (N_NATIONKEY)")

In [None]:
from pynessie import init
nessie = init()
nessie.list_keys('main').entries

In [None]:
[i.name for i in nessie.list_keys('etl').entries]

In [None]:
{i.name:i.hash_ for i in nessie.list_references()}

In [None]:
nessie.merge('main', 'etl')

In [None]:
{i.name:i.hash_ for i in nessie.list_references()}

Create `experiment` branch
--------------------------------

- create `experiment` branch from `main`
- drop `nation` table
- add data to `region` table
- compare `experiment` and `main` tables

In [None]:
!nessie branch experiment main

In [None]:
# changing the default branch
hadoop_conf.set('nessie.ref', 'experiment')

catalog = jvm.NessieCatalog(hadoop_conf)
catalog.dropTable(jvm.TableIdentifier.parse("testing.nation"), False)

In [None]:
spark.sql("set spark.hadoop.nessie.ref=experiment")
spark.sql('INSERT INTO TABLE nessie.testing.region VALUES (5, "AUSTRALIA", "Let\'s hop there", "AUS")')
spark.sql('INSERT INTO TABLE nessie.testing.region VALUES (6, "ANTARTICA", "It\'s cold", "ANT")')

In [None]:
!nessie contents --list --ref experiment

Lets take a look at the contents of the region table on the experiment branch.
Notice the use of the `nessie` catalog.

In [None]:
spark.sql("select * from nessie.testing.region").toPandas()

and compare to the contents of the region table on the main branch. Notice the
use of `@main` to view data on the main branch

In [None]:
spark.sql("select * from nessie.testing.`region@main`").toPandas()