Nessie Flink Demo with NBA Dataset
============================
This demo showcases how to use Nessie Python API along with Flink from Iceberg

Initialize Nessie environment
----------------------------------------------
To get started, we will first have to do a few setup steps that give us everything we need
to get started with Nessie. The `nessiedemo` lib configures all required dependencies, downloads and
configures Flink.
In case you're interested in the detailed setup steps for Flink, you can check out the source code
of the `nessiedemo` lib [here](https://github.com/projectnessie/nessie-demos/blob/main/pydemolib/nessiedemo/flink.py).

In [None]:
# install the nessiedemo lib, which configures all required dependencies
!pip install -i https://test.pypi.org/simple/ nessiedemo


In [None]:
# Setup the Demo: installs the required Python dependencies, downloads the sample datasets and
# downloads + starts the Nessie-Quarkus-Runner.
from nessiedemo.demo import setup_demo
demo = setup_demo("in-development/nessie-0.6-iceberg-flink-0.12.yml", ["nba"])

# This is separate, because NessieDemo.prepare() via .start() implicitly installs the required dependencies.
# Downloads Flink, and sets up a default table_env that points to `main`
from nessiedemo.flink import flink_for_demo
table_env, demo_flink = flink_for_demo(demo)

Set up Nessie branches (via Nessie CLI)
----------------------------
Once all dependencies are configured, we can get started with `Nessie` by using the
Nessie CLI, which allows interaction with Nessie branches and tables. We'll get started with the following steps:

- Create a new branch named `dev`
- List all branches

It is worth mentioning that we don't have to explicitly create a `main` branch, since it's the default branch.

In [None]:
# Create a new `dev` branch
!nessie branch dev

# Table environment for `dev` branch, which will be used later on
dev_table_env = demo_flink.table_env_for_ref("dev")

In [None]:
# List all branches with their revisions. We should see the `main` & `dev` branches.
!nessie --verbose branch

Create tables under dev branch
-------------------------------------
Once we created the `dev` branch and verified that it exists, we can use the `dev_table_env` Table environment (that points to the `dev` branch)
to create some tables and add some data.

We create two tables under the `dev` branch using the `dev_table_env`:
- `salaries`
- `totals_stats`


In [None]:
# Load the dataset
dataset = demo.fetch_dataset("nba")

from pyflink.table import DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem

# Creating `salaries` table
dev_table_env.connect(FileSystem().path(dataset['salaries.csv']))\
  .with_format(OldCsv()
               .field('Season', DataTypes.STRING()).field("Team", DataTypes.STRING())
               .field("Salary", DataTypes.STRING()).field("Player", DataTypes.STRING()))\
  .with_schema(Schema()
               .field('Season', DataTypes.STRING()).field("Team", DataTypes.STRING())
               .field("Salary", DataTypes.STRING()).field("Player", DataTypes.STRING()))\
  .create_temporary_table('nessie.nba.salaries_temp')

dev_table_env.execute_sql(
        "CREATE TABLE IF NOT EXISTS nessie.nba.salaries (Season STRING, Team STRING, Salary STRING, Player STRING)").wait()

tab = dev_table_env.from_path('nessie.nba.salaries_temp')
tab.select(tab.Season, tab.Team, tab.Salary, tab.Player).execute_insert('nessie.nba.salaries').wait()

# Creating `totals_stats` table
dev_table_env.connect(FileSystem().path(dataset['totals_stats.csv']))\
  .with_format(OldCsv()
               .field('Season', DataTypes.STRING()).field("Age", DataTypes.STRING()).field("Team", DataTypes.STRING())
               .field("ORB", DataTypes.STRING()).field("DRB", DataTypes.STRING()).field("TRB", DataTypes.STRING())
               .field("AST", DataTypes.STRING()).field("STL", DataTypes.STRING()).field("BLK", DataTypes.STRING())
               .field("TOV", DataTypes.STRING()).field("PTS", DataTypes.STRING()).field("Player", DataTypes.STRING())
               .field("RSorPO", DataTypes.STRING()))\
  .with_schema(Schema()
               .field('Season', DataTypes.STRING()).field("Age", DataTypes.STRING()).field("Team", DataTypes.STRING())
               .field("ORB", DataTypes.STRING()).field("DRB", DataTypes.STRING()).field("TRB", DataTypes.STRING())
               .field("AST", DataTypes.STRING()).field("STL", DataTypes.STRING()).field("BLK", DataTypes.STRING())
               .field("TOV", DataTypes.STRING()).field("PTS", DataTypes.STRING()).field("Player", DataTypes.STRING())
               .field("RSorPO", DataTypes.STRING()))\
  .create_temporary_table('nessie.nba.totals_stats_temp')

dev_table_env.execute_sql(
        "CREATE TABLE IF NOT EXISTS nessie.nba.totals_stats (Season STRING, Age STRING, Team STRING, \
        ORB STRING, DRB STRING, TRB STRING, AST STRING, STL STRING, BLK STRING, TOV STRING, PTS STRING, \
        Player STRING, RSorPO STRING)").wait()

tab = dev_table_env.from_path('nessie.nba.totals_stats_temp')
tab.select(tab.Season, tab.Age, tab.Team, tab.ORB, tab.DRB, tab.TRB, 
           tab.AST, tab.STL, tab.BLK, tab.TOV, tab.PTS, tab.Player, tab.RSorPO).execute_insert('nessie.nba.totals_stats').wait()


In [None]:
# Ideally we would like to use a SQL to print results, such as
# table_env.execute_sql("SELECT * FROM nessie.nba.`salaries@dev`").print()
# Unfortunately the .print() function only prints the output to STDOUT and not to the Notebook. 
# Therefore, we will use Flink's Table API to get the Table and print it's output

# Notice how we view the data of the salaries table on the `dev` branch via `@dev` via the
# `table_env`, which itself points to the `main` branch.
# We could achieve the same via the below Table API call via the `dev_table_env`, 
# which points to the `dev` branch.
# dev_table_env.from_path('nessie.nba.salaries').to_pandas()

table_env.from_path('nessie.nba.`salaries@dev`').to_pandas()

Check generated tables
----------------------------
Since we have been working solely on the `dev` branch, where we created 2 tables and added some data,
let's verify that the `main` branch was not altered by our changes.

In [None]:
# There are no tables on the `main` branch
!nessie contents --list

In [None]:
# We should see the `salaries` & `totals_stats` tables on the dev branch
!nessie contents --list --ref dev

We can also verify that the `dev` and `main` branches point to different commits

In [None]:
# List all branches with their revisions, where the revision of `main` should be different from `dev`
!nessie --verbose branch

Dev promotion into main
-----------------------
Once we are done with our changes on the `dev` branch, we would like to merge those changes into `main`.
We merge `dev` into `main` via the Nessie CLI.
Both branches should be at the same revision after merging/promotion.

In [None]:
# Merge `dev` into `main`
!nessie merge dev -b main --force

In [None]:
# List all branches with their revisions, where the revision of main=dev
!nessie --verbose branch

Create `etl` branch
----------------------
In this section we'll be simulating what a nightly ETL job might do in terms of changes.

- Create a branch `etl` out of `main`
- add data to `salaries`
- rename the table `totals_stats`
- create table `allstar_games_stats`
- query the tables in `etl`
- query the tables in `main`
- promote `etl` branch to `main`

In [None]:
# Create the `etl` branch based on `main`
!nessie branch etl main

# Table environment for `etl` branch, which will be used later on
etl_table_env = demo_flink.table_env_for_ref("etl")

In [None]:
# add some salaries for Kevin Durant
etl_table_env.execute_sql("INSERT INTO nessie.nba.salaries \
                        VALUES ('2017-18', 'Golden State Warriors', '$25000000', 'Kevin Durant')").wait()
etl_table_env.execute_sql("INSERT INTO nessie.nba.salaries \
                        VALUES ('2018-19', 'Golden State Warriors', '$30000000', 'Kevin Durant')").wait()
etl_table_env.execute_sql("INSERT INTO nessie.nba.salaries \
                        VALUES ('2019-20', 'Brooklyn Nets', '$37199000', 'Kevin Durant')").wait()
etl_table_env.execute_sql("INSERT INTO nessie.nba.salaries \
                        VALUES ('2020-21', 'Brooklyn Nets', '$39058950', 'Kevin Durant')").wait()

In [None]:
# Rename the table `totals_stats` to `new_total_stats`
etl_table_env.execute_sql("ALTER TABLE nessie.nba.totals_stats RENAME TO nessie.nba.new_total_stats")

In [None]:
# Creating `allstar_games_stats` table
etl_table_env.connect(FileSystem().path(dataset['allstar_games_stats.csv']))\
    .with_format(OldCsv()
                 .field('Season', DataTypes.STRING()).field("Age", DataTypes.STRING()).field("Team", DataTypes.STRING())
                 .field("ORB", DataTypes.STRING()).field("TRB", DataTypes.STRING()).field("AST", DataTypes.STRING())
                 .field("STL", DataTypes.STRING()).field("BLK", DataTypes.STRING()).field("TOV", DataTypes.STRING())
                 .field("PF", DataTypes.STRING()).field("PTS", DataTypes.STRING()).field("Player", DataTypes.STRING()))\
    .with_schema(Schema()
                 .field('Season', DataTypes.STRING()).field("Age", DataTypes.STRING()).field("Team", DataTypes.STRING())
                 .field("ORB", DataTypes.STRING()).field("TRB", DataTypes.STRING()).field("AST", DataTypes.STRING())
                 .field("STL", DataTypes.STRING()).field("BLK", DataTypes.STRING()).field("TOV", DataTypes.STRING())
                 .field("PF", DataTypes.STRING()).field("PTS", DataTypes.STRING()).field("Player", DataTypes.STRING()))\
    .create_temporary_table('nessie.nba.allstar_games_stats_temp')

etl_table_env.execute_sql(
        "CREATE TABLE IF NOT EXISTS nessie.nba.allstar_games_stats (Season STRING, Age STRING, \
        Team STRING, ORB STRING, TRB STRING, AST STRING, STL STRING, BLK STRING, TOV STRING, \
        PF STRING, PTS STRING, Player STRING)").wait()

tab = etl_table_env.from_path('nessie.nba.allstar_games_stats_temp')
tab.select(tab.Season, tab.Age, tab.Team, tab.ORB, tab.TRB, tab.AST, 
           tab.STL, tab.BLK, tab.TOV, tab.PF, tab.PTS, tab.Player).execute_insert('nessie.nba.allstar_games_stats').wait()

In [None]:
# Notice how we view the data on the etl branch via @etl
table_env.from_path('nessie.nba.`allstar_games_stats@etl`').to_pandas()

In [None]:
# Since we have been working on the `etl` branch, the `allstar_games_stats` table is not on the `main` branch
!nessie contents --list

In [None]:
# We should see `allstar_games_stats` and the `new_total_stats` on the `etl` branch
!nessie contents --list --ref etl

In [None]:
# Now merge the `etl` branch into `main`
!nessie merge etl -b main --force

In [None]:
# The `etl` and `main` branch should have the same revision
!nessie --verbose branch

Create `experiment` branch
--------------------------------
As a data analyst we might want to carry out some experiments with some data, without affecting `main` in any way.
As in the previous examples, we can just get started by creating an `experiment` branch off of `main`
and carry out our experiment, which could consist of the following steps:
- drop `new_totals_stats` table
- add data to `salaries` table
- compare `experiment` and `main` tables

In [None]:
# Create the `experiment` branch from `main`
!nessie branch experiment main

# Table environment for `experiment` branch, which will be used later on
experiment_table_env = demo_flink.table_env_for_ref("experiment")

In [None]:
# Drop the `totals_stats` table on the `experiment` branch
experiment_table_env.execute_sql("DROP TABLE IF EXISTS nessie.nba.new_total_stats")

In [None]:
# add some salaries for Dirk Nowitzki
experiment_table_env.execute_sql("INSERT INTO nessie.nba.salaries \
                                VALUES ('2015-16', 'Dallas Mavericks', '$8333333', 'Dirk Nowitzki')").wait()
experiment_table_env.execute_sql("INSERT INTO nessie.nba.salaries \
                                VALUES ('2016-17', 'Dallas Mavericks', '$25000000', 'Dirk Nowitzki')").wait()
experiment_table_env.execute_sql("INSERT INTO nessie.nba.salaries \
                                VALUES ('2017-18', 'Dallas Mavericks', '$5000000', 'Dirk Nowitzki')").wait()
experiment_table_env.execute_sql("INSERT INTO nessie.nba.salaries \
                                VALUES ('2018-19', 'Dallas Mavericks', '$5000000', 'Dirk Nowitzki')").wait()

In [None]:
# We should see the `salaries` and `allstar_games_stats` tables only (since we just dropped `new_total_stats`)
!nessie contents --list --ref experiment

In [None]:
# `main` hasn't changed been changed and still has the `new_total_stats` table
!nessie contents --list

Let's take a look at the contents of the `salaries` table on the `experiment` branch.
Notice the use of the `nessie` catalog and the use of `@experiment` to view data on the `experiment` branch

In [None]:
from pyflink.table.expressions import lit

table_env.from_path('nessie.nba.`salaries@experiment`').select(lit(1).count).to_pandas()

and compare to the contents of the `salaries` table on the `main` branch. Notice that we didn't have to specify `@branchName` as it defaulted
to the `main` branch

In [None]:
from pyflink.table.expressions import lit

table_env.from_path('nessie.nba.`salaries@main`').select(lit(1).count).to_pandas()
