In [None]:
import os
os.environ["JAVA_HOME"] = "/home/snazy/devel/openjdk/images/graalvm/jdk-11"

In [None]:
import glob
import os
import subprocess
import sys

# TODO replace this block with the following, once nessiedemo is stable and released on pypi or at least pypi-test
#   subprocess.run([sys.executable, "-m", "pip", "install", "nessiedemo"])
setup_path = "{}/../setup".format(os.getcwd())
pkg_file = glob.glob("{}/dist/nessiedemo-*.whl".format(setup_path))[0]
result = subprocess.run([sys.executable, "-m", "pip", "install", "--force-reinstall", pkg_file])
if result.returncode != 0:
    raise Exception("pip install failed: exit-code={}, stdout={}, stderr={}".format(result.returncode, result.stdout, result.stderr))

In [None]:
from nessiedemo.demo import NessieDemo
demo = NessieDemo("nessie-0.5-iceberg-0.11.yml")
dataset = demo.fetch_dataset("region-nation")

demo.start()

# This is separate, because NessieDemo.prepare() via .start() implicitly installs the required dependencies
from nessiedemo.spark import NessieDemoSpark
demo_spark = NessieDemoSpark(demo)

spark, sc, jvm = demo_spark.get_or_create_spark_context()

In [None]:
# session for dev branch
spark_dev = spark.newSession()
spark_dev.conf.set("spark.sql.catalog.nessie.ref", "dev" )

# session for ETL branch
spark_etl = spark.newSession()
spark_etl.conf.set("spark.sql.catalog.nessie.ref", "etl" )

# session for experiment branch
spark_experiment = spark.newSession()
spark_experiment.conf.set("spark.sql.catalog.nessie.ref", "experiment" )

In [None]:
!nessie branch dev

In [None]:
catalog = jvm.CatalogUtil.loadCatalog("org.apache.iceberg.nessie.NessieCatalog", "nessie", {'ref': 'dev', 'url': 'http://localhost:19120/api/v1', "warehouse": 'file://' + os.getcwd() + '/spark_warehouse'}, sc._jsc.hadoopConfiguration())

# Creating region table
region_name = jvm.TableIdentifier.parse("testing.region")
region_schema = jvm.Schema([
    jvm.Types.NestedField.optional(1, "R_REGIONKEY", jvm.Types.LongType.get()),
    jvm.Types.NestedField.optional(2, "R_NAME", jvm.Types.StringType.get()),
    jvm.Types.NestedField.optional(3, "R_COMMENT", jvm.Types.StringType.get()),
])
region_spec = jvm.PartitionSpec.unpartitioned()

region_table = catalog.createTable(region_name, region_schema, region_spec)
region_df = spark_dev.read.load(dataset["region.parquet"])
region_df.write.format("iceberg").mode("overwrite").save("nessie.testing.region")

# Creating nation table
nation_name = jvm.TableIdentifier.parse("testing.nation")
nation_schema = jvm.Schema([
    jvm.Types.NestedField.optional(1, "N_NATIONKEY", jvm.Types.LongType.get()),
    jvm.Types.NestedField.optional(2, "N_NAME", jvm.Types.StringType.get()),
    jvm.Types.NestedField.optional(3, "N_REGIONKEY", jvm.Types.LongType.get()),
    jvm.Types.NestedField.optional(4, "N_COMMENT", jvm.Types.StringType.get()),
])
nation_spec = jvm.PartitionSpec.builderFor(nation_schema).truncate("N_NAME", 2).build()
nation_table = catalog.createTable(nation_name, nation_schema, nation_spec)

nation_df = spark_dev.read.load(dataset["nation.parquet"])
nation_df.write.format("iceberg").mode("overwrite").save("nessie.testing.nation")