# Iceberg Lab 
## Unit 7: Snapshot Management

In the previous unit, we -
1. Learned how to "Time Travel" in Iceberg Tables

In this unit, we will-
1. Learn about managing table snapshots 


### 1. Imports

In [2]:
from pyspark.sql.functions import col
from pyspark.sql import SparkSession

import warnings
warnings.filterwarnings('ignore')

### 2. Create a Spark session powered by Cloud Dataproc 

In [3]:
spark = SparkSession.builder.appName('Loan Analysis').getOrCreate()
spark.sparkContext.setLogLevel("WARN")
spark

### 3. Declare variables

In [4]:
project_id_output = !gcloud config list --format "value(core.project)" 2>/dev/null
PROJECT_ID = project_id_output[0]
print("PROJECT_ID: ", PROJECT_ID)

PROJECT_ID:  nikhim-iceberg-lab


In [5]:
project_name_output = !gcloud projects describe $PROJECT_ID | grep name | cut -d':' -f2 | xargs
PROJECT_NAME = project_name_output[0]
print("PROJECT_NAME: ", PROJECT_NAME)

PROJECT_NAME:  nikhim-iceberg-lab


In [6]:
project_number_output = !gcloud projects describe $PROJECT_ID | grep projectNumber | cut -d':' -f2 | xargs
PROJECT_NUMBER = project_number_output[0]
print("PROJECT_NUMBER: ", PROJECT_NUMBER)

PROJECT_NUMBER:  928505941962


In [7]:
DPMS_NAME=f"iceberg-hms-{PROJECT_NUMBER}"
LOCATION="us-central1"

metastore_dir = !gcloud metastore services describe $DPMS_NAME --location $LOCATION |grep 'hive.metastore.warehouse.dir'| cut -d':' -f2- | xargs 
HIVE_METASTORE_WAREHOUSE_DIR = metastore_dir[0]
print("HIVE_METASTORE_WAREHOUSE_DIR",HIVE_METASTORE_WAREHOUSE_DIR)

HIVE_METASTORE_WAREHOUSE_DIR gs://gcs-bucket-iceberg-hms-928505941962-71d67f3e-cf27-4b25-a996-86a/hive-warehouse


In [8]:
TABLE_NAME="loans_by_state_iceberg"
DB_NAME="loan_db"

#fully qualified table name
FQTN=f"{DB_NAME}.{TABLE_NAME}"

print("Fully quailified table name :",FQTN)

Fully quailified table name : loan_db.loans_by_state_iceberg


### 4. Snapshot Management

In [9]:
#Listing currently available snapshots for the table

spark.table("loan_db.loans_by_state_iceberg.snapshots").show()

+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|          parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+-------------------+---------+--------------------+--------------------+
|2023-02-10 15:26:...|3648627921780331930|               null|   append|gs://gcs-bucket-i...|{spark.app.id -> ...|
|2023-02-10 22:29:...|5222601969543758311|3648627921780331930|overwrite|gs://gcs-bucket-i...|{spark.app.id -> ...|
|2023-02-10 22:33:...|9145457862466461068|5222601969543758311|   append|gs://gcs-bucket-i...|{spark.app.id -> ...|
|2023-02-10 22:35:...|8627182030940064924|9145457862466461068|overwrite|gs://gcs-bucket-i...|{spark.app.id -> ...|
|2023-02-10 22:36:...|2697368997376323351|8627182030940064924|overwrite|gs://gcs-bucket-i...|{spark.app.id -> ...|
|2023-02-10 22:44:...|5865803199727045458|2697368997376323351|overwrite|gs://gcs

In [10]:
#Checking the Table state with few sample records

#Fetch current values for 4 states
spark.sql("SELECT * FROM loan_db.loans_by_state_iceberg where addr_state in ('IA','AZ','CA','IN')").show(truncate=False)


ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR,/etc/spark/conf/ivysettings.xml will be used
[Stage 2:>                                                          (0 + 1) / 1]

+----------+----------+
|addr_state|loan_count|
+----------+----------+
|CA        |11111     |
|IA        |11111     |
|IN        |11111     |
|AZ        |11111     |
+----------+----------+



                                                                                

#### a. rollback_to_snapshot

In [1]:
#Fetch the 5th snapshot update for this example
ROLLBACK_SNAPSHOT_ID = spark.sql(f"SELECT snapshot_id FROM \
(SELECT snapshot_id, ROW_NUMBER() OVER(ORDER BY committed_at ASC) rownum FROM {FQTN}.snapshots) \
a where a.rownum =5").collect()[0][0]

print("ROLLBACK_SNAPSHOT_ID ", ROLLBACK_SNAPSHOT_ID)


23/02/11 04:42:53 INFO metastore: Trying to connect to metastore with URI thrift://10.93.64.15:9080
23/02/11 04:42:53 INFO metastore: Opened a connection to metastore, current connections: 1
23/02/11 04:42:53 INFO metastore: Connected to metastore.
23/02/11 04:42:53 INFO BaseMetastoreTableOperations: Refreshing table metadata from new version: gs://gcs-bucket-iceberg-hms-928505941962-71d67f3e-cf27-4b25-a996-86a/hive-warehouse/loan_db.db/loans_by_state_iceberg/metadata/00008-e22e9032-f45b-41ba-a815-c7fbbb913536.metadata.json
23/02/11 04:42:54 INFO BaseMetastoreCatalog: Table loaded by catalog: spark_catalog.loan_db.loans_by_state_iceberg
23/02/11 04:42:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/11 04:42:57 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 0:>               

ROLLBACK_SNAPSHOT_ID  2697368997376323351


                                                                                

In [14]:
#build snapshot rollback statement

SNPSHT_RLBK_STMNT = f"CALL spark_catalog.system.rollback_to_snapshot('{FQTN}',{ROLLBACK_SNAPSHOT_ID})"
print(SNPSHT_RLBK_STMNT)

spark.sql(f"{SNPSHT_RLBK_STMNT}").show()

In [15]:
#Table current state has been updated to an older snaphsot version
spark.sql("SELECT * FROM loan_db.loans_by_state_iceberg where addr_state in ('IA','AZ','CA','IN')").show(truncate=False)


+----------+----------+
|addr_state|loan_count|
+----------+----------+
|IN        |7511      |
|AZ        |11111     |
+----------+----------+



#### b. rollback_to_timestamp

In [16]:
#Fetch a specific timestamp to revert table state to

timestamp_val = spark.sql("select committed_at from (SELECT committed_at, ROW_NUMBER() OVER(ORDER BY committed_at ASC) rownum from loan_db.loans_by_state_iceberg.snapshots) a where a.rownum =3").collect()[0][0]
print("Rolling back to timestamp", timestamp_val)

23/02/11 04:45:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/11 04:45:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 5:>                                                          (0 + 1) / 1]

Rolling back to timestamp 2023-02-10 22:33:06.455000


                                                                                

In [17]:
tmstmp_rlbk_stmnt = f"CALL spark_catalog.system.rollback_to_timestamp('loan_db.loans_by_state_iceberg',TIMESTAMP '{timestamp_val}')"
print(tmstmp_rlbk_stmnt)

spark.sql(tmstmp_rlbk_stmnt).show()

CALL spark_catalog.system.rollback_to_timestamp('loan_db.loans_by_state_iceberg',TIMESTAMP '2023-02-10 22:33:06.455000')
+--------------------+-------------------+
|previous_snapshot_id|current_snapshot_id|
+--------------------+-------------------+
| 2697368997376323351|5222601969543758311|
+--------------------+-------------------+



In [18]:
#Table current state
spark.sql("SELECT * FROM loan_db.loans_by_state_iceberg where addr_state in ('IA','AZ','CA','IN')").show(truncate=False)


[Stage 7:>                                                          (0 + 1) / 1]

+----------+----------+
|addr_state|loan_count|
+----------+----------+
|CA        |62090     |
|IN        |7511      |
|IA        |1         |
+----------+----------+



                                                                                

**NOTE:**


Both rollback_to_snapshot and rollback_to_timestamp can only switch the snapshot if the updating snapshot_id is an ancestor of the current snapshot (older than current_snapshot). 


In [19]:
#Fetching a younger snapshot which is not an ancestor of the current snapshot

newer_snpsht_id = spark.sql("select snapshot_id from (SELECT snapshot_id, ROW_NUMBER() OVER(ORDER BY committed_at DESC) rownum from loan_db.loans_by_state_iceberg.snapshots) a where a.rownum =1").collect()[0][0]
print("Rolling back to snapshot id ", newer_snpsht_id)


23/02/11 04:47:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/02/11 04:47:54 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


Rolling back to snapshot id  5865803199727045458


In [20]:
snpsht_rlbk_stmmt = f"CALL spark_catalog.system.rollback_to_snapshot('loan_db.loans_by_state_iceberg',{newer_snpsht_id})"
print(snpsht_rlbk_stmmt)

spark.sql(snpsht_rlbk_stmmt).show()

CALL spark_catalog.system.rollback_to_snapshot('loan_db.loans_by_state_iceberg',5865803199727045458)


Py4JJavaError: An error occurred while calling o83.sql.
: org.apache.iceberg.exceptions.ValidationException: Cannot roll back to snapshot, not an ancestor of the current state: 5865803199727045458
	at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49)
	at org.apache.iceberg.SetSnapshotOperation.rollbackTo(SetSnapshotOperation.java:84)
	at org.apache.iceberg.SnapshotManager.rollbackTo(SnapshotManager.java:59)
	at org.apache.iceberg.spark.procedures.RollbackToSnapshotProcedure.lambda$call$0(RollbackToSnapshotProcedure.java:88)
	at org.apache.iceberg.spark.procedures.BaseProcedure.execute(BaseProcedure.java:100)
	at org.apache.iceberg.spark.procedures.BaseProcedure.modifyIcebergTable(BaseProcedure.java:81)
	at org.apache.iceberg.spark.procedures.RollbackToSnapshotProcedure.call(RollbackToSnapshotProcedure.java:83)
	at org.apache.spark.sql.execution.datasources.v2.CallExec.run(CallExec.scala:34)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


As seen above we get the error _"Cannot roll back to snapshot, not an ancestor of the current state"_

We can workaround this problem using another procedure called **_set_current_snapshot_**

#### c. set_current_snapshot

In [21]:
set_snpsht_stmnt = f"CALL spark_catalog.system.set_current_snapshot('loan_db.loans_by_state_iceberg',{newer_snpsht_id})"
print(set_snpsht_stmnt)

spark.sql(set_snpsht_stmnt).show(truncate=False)

CALL spark_catalog.system.set_current_snapshot('loan_db.loans_by_state_iceberg',5865803199727045458)
+--------------------+-------------------+
|previous_snapshot_id|current_snapshot_id|
+--------------------+-------------------+
|5222601969543758311 |5865803199727045458|
+--------------------+-------------------+



In [22]:
#Table current state modified to newer_snapshot_id
spark.sql("SELECT * FROM loan_db.loans_by_state_iceberg where addr_state in ('IA','AZ','CA','IN')").show(truncate=False)


+----------+----------+
|addr_state|loan_count|
+----------+----------+
|CA        |11111     |
|IA        |11111     |
|IN        |11111     |
|AZ        |11111     |
+----------+----------+



In [23]:
!gsutil ls -r  {HIVE_METASTORE_WAREHOUSE_DIR}/loan_db.db/{TABLE_NAME}/

gs://gcs-bucket-iceberg-hms-928505941962-71d67f3e-cf27-4b25-a996-86a/hive-warehouse/loan_db.db/loans_by_state_iceberg/:

gs://gcs-bucket-iceberg-hms-928505941962-71d67f3e-cf27-4b25-a996-86a/hive-warehouse/loan_db.db/loans_by_state_iceberg/data/:
gs://gcs-bucket-iceberg-hms-928505941962-71d67f3e-cf27-4b25-a996-86a/hive-warehouse/loan_db.db/loans_by_state_iceberg/data/00000-2-b2d1021e-d3ad-4aff-baa7-1c740e8a3144-00001.parquet
gs://gcs-bucket-iceberg-hms-928505941962-71d67f3e-cf27-4b25-a996-86a/hive-warehouse/loan_db.db/loans_by_state_iceberg/data/00000-205-c4327bde-de7b-4bb7-a2fe-cb90b1c77623-00001.parquet
gs://gcs-bucket-iceberg-hms-928505941962-71d67f3e-cf27-4b25-a996-86a/hive-warehouse/loan_db.db/loans_by_state_iceberg/data/00000-410-3265a2ac-0164-4598-a86e-03a63e191071-00001.parquet
gs://gcs-bucket-iceberg-hms-928505941962-71d67f3e-cf27-4b25-a996-86a/hive-warehouse/loan_db.db/loans_by_state_iceberg/data/00000-457-db97e1c8-4cc1-4098-94d7-a1172491a65c-00001.parquet
gs://gcs-bucket-iceb

### THIS CONCLUDES THIS LAB. PROCEED TO THE NEXT NOTEBOOK.