# Working with branches and Write-Audit-Publish

Branches in Iceberg allow you to make changes to your data or table configuration without affecting other users working with the master table branch.
WAP is a pattern that works well with branches that allows you to make changes, audit those changes and only when validated, merge them for users to see.

Start up your Spark session configured to use our Polaris Catalog

In [None]:
## Update with your principal user credentials (from Polaris Catalog)

clientId="90c6a4746a5434b4"
clientSecret="a612b67eec79bd69b76bf9804b66bc96"

In [None]:
## Start the Spark application and connect to our Polaris Catalog

import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('iceberg_lab') \
.config('spark.sql.defaultCatalog', 'polaris') \
.config('spark.sql.catalog.polaris', 'org.apache.iceberg.spark.SparkCatalog') \
.config('spark.sql.catalog.polaris.type', 'rest') \
.config('spark.sql.catalog.polaris.header.X-Iceberg-Access-Delegation','true') \
.config('spark.sql.catalog.polaris.client.region','us-east-1') \
.config('spark.sql.catalog.polaris.uri','http://polaris-catalog:8181/api/catalog') \
.config('spark.sql.catalog.polaris.credential',clientId+':'+clientSecret) \
.config('spark.sql.catalog.polaris.warehouse','polariscatalog') \
.config('spark.sql.catalog.polaris.scope','PRINCIPAL_ROLE:ALL') \
.config('spark.sql.catalog.polaris.token-refresh-enabled', 'true') \
.getOrCreate()

### Getting some data

We'll use the famous NYC taxi data for this excercise.  It's available on their website, but we took a recent snapshot and made it available in the Upsolver S3 bucket.

We'll use Spark DataFrames to read Taxi data CSV into an Iceberg table.

NOTE: if you're starting with this notebook and didn't complete the Getting Started with Spark and Iceberg notebook, than you need to first create a `demo` database in the `polaris` catalog, like `CREATE DATABASE polaris.demo`

In [None]:
### More information can be found on the NYC Public Data website
### https://data.cityofnewyork.us/Transportation/For-Hire-Vehicles-FHV-Active/8wbx-tsch/about_data

### Recent file was downloaded and available for this workshop in the following bucket
### s3://upsolver-workshop-lake/workshop-samples/For_Hire_Vehicles_FHV_Active_20240915.csv

spark.sql('DROP TABLE IF EXISTS demo.vfh PURGE')

df = spark.read.format("csv") \
          .option("header",True) \
          .option("inferschema",True) \
          .load("s3a://upsolver-workshop-lake/workshop-samples/For_Hire_Vehicles_FHV_Active_20240915.csv")

df.write.saveAsTable('demo.vfh')

Check that data has been loaded successfully to our Iceberg table

In [None]:
%%sql

SELECT * FROM demo.vfh limit 10

Create a simple aggregation, by taxi company name, so that we can demonstrate working with branches

In [None]:
%%sql

SELECT `Base Name`, `Base Number`, count(*) AS total_cars FROM demo.vfh 
GROUP BY 1,2
ORDER BY 3 DESC

### Creating a branch

You can learn more about branches in the Iceberg [documentations](https://iceberg.apache.org/docs/latest/spark-ddl/#branching-and-tagging-ddl)

In [None]:
%%sql

ALTER TABLE demo.vfh
CREATE OR REPLACE BRANCH `audit`
RETAIN 2 DAYS

### Working with data in a branch

To work within your branch you need to refernce it by name with the prefix `branch_` followed by the branch name. The branch is referenced following the table name, like `dbName.tableName.branch_branchName`

The first thing we'll do is delete some data.  We're doing this within a branch so the main table other users are querying will not be affected

In [None]:
%%sql

DELETE FROM demo.vfh.branch_audit
WHERE `Base Number` = 'B03557'

Rerun the aggregation query on the new branch and validate that indeed SPACELINKS LLC company is no longer in the dataset.

In [None]:
%%sql

SELECT `Base Name`, `Base Number`, count(*) AS total_cars 
FROM demo.vfh.branch_audit
GROUP BY 1,2 
ORDER BY 3 DESC

You can rerun the same query but on the main branch, just like other users would and ensure the original data is unchanged.

In [None]:
%%sql

SELECT `Base Name`, `Base Number`, count(*) AS total_cars 
FROM demo.vfh
GROUP BY 1,2 
ORDER BY 3 DESC

Inspect the `refs` information table to see the snapshot associated with your branch

In [None]:
%%sql

SELECT * FROM polaris.demo.vfh.refs

You can access the same view of the data by using the `VERSION AS OF SNAPSHOT_ID`.  You get the snapshot ID from the `refs` or `snapshots` information tables.

In [None]:
%%sql

SELECT `Base Name`, `Base Number`, count(*) AS total_cars 
FROM demo.vfh VERSION AS OF 6215557498076574768  --change this to your snapshot id
GROUP BY 1,2 
ORDER BY 3 DESC

There are multiple ways to merge branches, read more in the [documentation](https://iceberg.apache.org/docs/latest/spark-procedures/#snapshot-management).

In this example we'll use `cherry_pick` to select our audit branch and promote it to `main`

%%sql

CALL polaris.system.cherrypick_snapshot('demo.vfh', 6215557498076574768) -- substitute your snapshot ID

Now that the merge is complete, we can let the branch automatically expire or we can manually drop it.

In [None]:
%%sql

ALTER TABLE demo.vfh
DROP BRANCH audit

## Working with WAP

WAP is a pattern that allows you to write some changes, audit them and then publish them to the rest of your users.

WAP similar to branching but less explicit.  With branches you create, update, drop or publish data in your branch. With WAP, you enable it on your table and define a session WAP_ID property that Spark implicitly uses to isolate changes to an internal "branch".  Other users are unaware unless they dive deep into the snapshot detail.  Once you're happy with your changes, you can publish them to the main branch without the need to manage branches.

To setup WAP you need to first need to enable WAP on your table and then create a unique ID, we use the `uuid` library for this.  

Note that WAP is enabled on the table, hence why we set the `TBLPROPERTIES`.  But the WAP ID is defined as a Spark session property that will only persist within the active Spark session.  If the session is closed, lost or restarted, you'll need to regenerate and re-set the WAP ID.

In [None]:
import uuid
wap_id = uuid.uuid4().hex

spark.sql("ALTER TABLE demo.vfh SET TBLPROPERTIES ('write.wap.enabled'='true')")
spark.conf.set('spark.wap.id', wap_id)

Lets delete some data.  Note we're not referencing the branch name in this case. Spark uses the WAP ID behind the scenes to track the changes

In [None]:
%%sql

delete from demo.vfh
where `Base Number` = 'B03406'

Since WAP works under the covers, querying the base table will show no changes, all data will be present. To view the changed dataset, you must query it using the `VERSION AS OF SNAPSHOT_ID` syntax shown below.  To get the latest snapshot ID that includes your changes view the `snapshots` information table.

When you inspect the `snapshots` information table, notice the `wap-id` field is included in the `summary` column which should have your WAP ID

In [None]:
%%sql

SELECT `Base Name`, `Base Number`, count(*) AS total_cars 
FROM demo.vfh VERSION AS OF 2263443007379069011 -- substitute for your snapshot id
GROUP BY 1,2 
ORDER BY 3 desc

Now you can validate your data and ensure it is as you expect. 

If you're happy, publish your changes.

In [None]:
spark.sql("CALL polaris.system.publish_changes('demo.vfh', '"+ wap_id+"')")

If you're not happy with your changes, simply expire the snapshot that includes your changes

In [None]:
## substitute your snapshot id

spark.sql("CALL polaris.system.expire_snapshots(table => 'demo.vfh', snapshot_ids => Array(2263443007379069011))")

Once you're done, disable WAP on the table and unset the session property containing your WAP ID

In [None]:
spark.sql("ALTER TABLE demo.vfh SET TBLPROPERTIES ('write.wap.enabled'='false')")
spark.conf.unset('spark.wap.id')