Nessie Demo
===========
This demo showcases how to use Nessie python API along with Spark

Initialize Pyspark + Nessie environment
----------------------------------------------

In [None]:
import os
import findspark
from pyspark.sql import *
from pyspark import SparkConf, SparkContext
from py4j.java_gateway import java_import
findspark.init()

spark = SparkSession.builder \
                    .config("spark.jars", "../../clients/deltalake/spark3/target/nessie-deltalake-spark3-0.4.1-SNAPSHOT.jar")
                    .config("spark.sql.execution.pyarrow.enabled", "true") \
                    .config("spark.hadoop.fs.defaultFS", 'file://' + os.getcwd() + '/spark_warehouse') \
                    .config("spark.hadoop.nessie.url", "http://localhost:19120/api/v1") \
                    .config("spark.hadoop.nessie.ref", "main") \
                    .config("spark.hadoop.nessie.auth_type", "NONE") \
                    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
                    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                    .config("spark.delta.logFileHandler.class", "org.projectnessie.deltalake.NessieLogFileMetaParser") \
                    .config("spark.delta.logStore.class", "org.projectnessie.deltalake.NessieLogStore") \
                    .getOrCreate()
sc = spark.sparkContext
jvm = sc._gateway.jvm

java_import(jvm, "org.apache.spark.sql.delta.DeltaLog")
java_import(jvm, "io.delta.tables.DeltaTable")

Set up nessie branches
----------------------------

- Branch `main` already exists
- Create branch `dev`
- List all branches (pipe JSON result into jq)

In [None]:
!nessie branch dev

In [None]:
!nessie --verbose branch

Create tables under dev branch
-------------------------------------

Creating two tables under the `dev` branch:
- region
- nation

It is not yet possible to create table using pyspark and iceberg, so Java code
is used instead

In [None]:
hadoop_conf = sc._jsc.hadoopConfiguration()
hadoop_conf.set("nessie.ref", "dev")

region_df = spark.read.load("data/region.parquet")
region_df.write.format("delta").save("spark_warehouse/testing/region")

nation_df = spark.read.load("data/nation.parquet")
nation_df.write.format("delta").save("spark_warehouse/testing/nation")


Check generated tables
----------------------------

Check tables generated under the dev branch (and that the main branch does not
have any tables)

In [None]:
!nessie contents --list

In [None]:
!nessie contents --list --ref dev

In [None]:
!nessie --verbose branch

Dev promotion
-------------

Promote dev branch promotion to main

In [None]:
!nessie merge dev --force

In [None]:
!nessie contents --list

In [None]:
!nessie --verbose branch

Create `etl` branch
----------------------

- Create a branch `etl` out of `main`
- add data to nation
- alter region
- create table city
- query the tables in `etl`
- query the tables in `main`
- promote `etl` branch to `main`

In [None]:
!nessie branch etl main

In [None]:
hadoop_conf.set("nessie.ref", "etl")
Nation = Row("N_NATIONKEY", "N_NAME", "N_REGIONKEY", "N_COMMENT")
new_nations = spark.createDataFrame([
    Nation(25, "SYLDAVIA", 3, "King Ottokar's Sceptre"),
    Nation(26, "SAN THEODOROS", 1, "The Picaros")])
new_nations.write.option('hadoop.nessie.ref', 'etl').format("delta").mode("append").save("testing.nation")

In [None]:
# changing the default branch
hadoop_conf.set('nessie.ref', 'etl')
sc.getConf().set("spark.hadoop.nessie.ref", "etl")
base_table = os.getcwd() + "/spark_warehouse/testing/"
spark.sql("ALTER TABLE delta.`" + base_table + "region` ADD COLUMNS (R_ABBREV STRING)")

In [None]:
# Creating city table
sc.getConf().set("spark.hadoop.nessie.ref", "etl")
spark.sql("CREATE TABLE city (C_CITYKEY BIGINT, C_NAME STRING, N_NATIONKEY BIGINT, C_COMMNT STRING) USING delta PARTITIONED BY (N_NATIONKEY) LOCATION 'spark_warehouse/testing/city'")

In [None]:
from pynessie import init
nessie = init()
nessie.list_keys('main').entries

In [None]:
[i.name for i in nessie.list_keys('etl').entries]

In [None]:
{i.name:i.hash_ for i in nessie.list_references()}

In [None]:
nessie.merge('main', 'etl')

In [None]:
{i.name:i.hash_ for i in nessie.list_references()}

Create `experiment` branch
--------------------------------

- create `experiment` branch from `main`
- drop `nation` table
- add data to `region` table
- compare `experiment` and `main` tables

In [None]:
!nessie branch experiment main

In [None]:
# changing the default branch
hadoop_conf.set('nessie.ref', 'experiment')


jvm.DeltaLog.clearCache()
deltaTable = jvm.DeltaTable.forPath("spark_warehouse/testing/nation")
deltaTable.delete()

In [None]:
spark.sql("set spark.hadoop.nessie.ref=experiment")
spark.sql('INSERT INTO TABLE delta.`' + base_table + 'region` VALUES (5, "AUSTRALIA", "Let\'s hop there", "AUS")')
spark.sql('INSERT INTO TABLE delta.`' + base_table + 'region` VALUES (6, "ANTARTICA", "It\'s cold", "ANT")')

In [None]:
!nessie contents --list --ref experiment

In [None]:
spark.sql("select * from delta.`" + base_table + "region`").toPandas()

The branch used for Delta queries should be changed manually to query a
different branch

In [None]:
hadoop_conf.set('nessie.ref', 'main')
jvm.DeltaLog.clearCache()
spark.sql("set spark.hadoop.nessie.ref=main")
spark.sql("select * from delta.`/home/ryan/workspace/nessie/python/demo/spark_warehouse/testing/region`").toPandas()