![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)

### [Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg!](https://tabular.io/blog/docker-spark-and-iceberg/)

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Jupyter")\
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")\
        .getOrCreate()

spark

24/09/01 07:12:45 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
24/09/01 07:12:46 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


## Load One Month of NYC Taxi/Limousine Trip Data

For this notebook, we will use the New York City Taxi and Limousine Commision Trip Record Data that's available on the AWS Open Data Registry. This contains data of trips taken by taxis and for-hire vehicles in New York City. We'll save this into an iceberg table called `taxis`.

To be able to rerun the notebook several times, let's drop the table if it exists to start fresh.

In [3]:
%%sql

CREATE DATABASE IF NOT EXISTS nyc

In [4]:
%%sql

DROP TABLE IF EXISTS nyc.taxis

24/09/01 04:49:13 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/09/01 04:49:13 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/09/01 04:49:15 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


In [5]:
df = spark.read.parquet("/home/iceberg/data/yellow_tripdata_2021-04.parquet")
df.write.saveAsTable("nyc.taxis")

                                                                                

In [2]:
%%sql

DESCRIBE EXTENDED nyc.taxis

24/09/01 05:44:06 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/09/01 05:44:06 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/09/01 05:44:07 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


col_name,data_type,comment
VendorID,bigint,
tpep_pickup_datetime,timestamp_ntz,
tpep_dropoff_datetime,timestamp_ntz,
passenger_count,double,
distance,double,The elapsed trip distance in miles reported by the taximeter.
RatecodeID,double,
store_and_fwd_flag,string,
PULocationID,bigint,
DOLocationID,bigint,
payment_type,bigint,


In [7]:
%%sql

SELECT COUNT(*) as cnt
FROM nyc.taxis

cnt
2171187


## Schema Evolution

Adding, dropping, renaming, or altering columns is easy and safe in Iceberg. In this example, we'll rename `fare_amount` to `fare` and `trip_distance` to `distance`. We'll also add a float column `fare_per_distance_unit` immediately after `distance`.

In [8]:
%%sql

ALTER TABLE nyc.taxis RENAME COLUMN fare_amount TO fare

In [9]:
%%sql

ALTER TABLE nyc.taxis RENAME COLUMN trip_distance TO distance

In [10]:
%%sql

ALTER TABLE nyc.taxis ALTER COLUMN distance COMMENT 'The elapsed trip distance in miles reported by the taximeter.'

In [11]:
%%sql

ALTER TABLE nyc.taxis ALTER COLUMN distance TYPE double;

In [2]:
%%sql

ALTER TABLE nyc.taxis ALTER COLUMN distance AFTER fare;

24/09/01 07:13:05 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/09/01 07:13:05 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/09/01 07:13:06 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
24/09/01 07:13:08 ERROR HiveTableOperations: Cannot tell if commit to nyc.taxis succeeded, attempting to reconnect and check.
InvalidOperationException(message:The following columns have types incompatible with the existing columns in their respective positions :
PULocationID)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:59744)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:59730)
	at org.apache.hadoop.hive.metastore.api

Py4JJavaError: An error occurred while calling o40.sql.
: org.apache.iceberg.exceptions.CommitStateUnknownException: The following columns have types incompatible with the existing columns in their respective positions :
PULocationID
Cannot determine whether the commit was successful or not, the underlying data files may or may not be needed. Manual intervention via the Remove Orphan Files Action can remove these files when a connection to the Catalog can be re-established if the commit was actually unsuccessful.
Please check to see whether or not your commit was successful before retrying this commit. Retrying an already successful operation will result in duplicate records or unintentional modifications.
At this time no files will be deleted including possibly unused manifest lists.
	at org.apache.iceberg.hive.HiveTableOperations.doCommit(HiveTableOperations.java:300)
	at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:128)
	at org.apache.iceberg.BaseTransaction.lambda$commitSimpleTransaction$3(BaseTransaction.java:427)
	at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
	at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
	at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
	at org.apache.iceberg.BaseTransaction.commitSimpleTransaction(BaseTransaction.java:423)
	at org.apache.iceberg.BaseTransaction.commitTransaction(BaseTransaction.java:318)
	at org.apache.iceberg.spark.SparkCatalog.commitChanges(SparkCatalog.java:824)
	at org.apache.iceberg.spark.SparkCatalog.alterTable(SparkCatalog.java:345)
	at org.apache.spark.sql.execution.datasources.v2.AlterTableExec.run(AlterTableExec.scala:37)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
	at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:638)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:629)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:659)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: InvalidOperationException(message:The following columns have types incompatible with the existing columns in their respective positions :
PULocationID)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:59744)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result$alter_table_with_environment_context_resultStandardScheme.read(ThriftHiveMetastore.java:59730)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$alter_table_with_environment_context_result.read(ThriftHiveMetastore.java:59672)
	at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:88)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_alter_table_with_environment_context(ThriftHiveMetastore.java:1693)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.alter_table_with_environment_context(ThriftHiveMetastore.java:1677)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.alter_table_with_environmentContext(HiveMetaStoreClient.java:373)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:169)
	at com.sun.proxy.$Proxy40.alter_table_with_environmentContext(Unknown Source)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:62)
	at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:74)
	at org.apache.iceberg.hive.MetastoreUtil.alterTable(MetastoreUtil.java:78)
	at org.apache.iceberg.hive.HiveOperationsBase.lambda$persistTable$1(HiveOperationsBase.java:139)
	at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:72)
	at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:65)
	at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
	at org.apache.iceberg.hive.HiveOperationsBase.persistTable(HiveOperationsBase.java:137)
	at org.apache.iceberg.hive.HiveTableOperations.doCommit(HiveTableOperations.java:244)
	... 54 more


In [3]:
%%sql

ALTER TABLE nyc.taxis
ADD COLUMN fare_per_distance_unit float AFTER distance

AnalysisException: [FIELDS_ALREADY_EXISTS] Cannot add column, because `fare_per_distance_unit` already exists in "STRUCT<VendorID: BIGINT, tpep_pickup_datetime: TIMESTAMP_NTZ, tpep_dropoff_datetime: TIMESTAMP_NTZ, passenger_count: DOUBLE, distance: DOUBLE COMMENT 'The elapsed trip distance in miles reported by the taximeter.', RatecodeID: DOUBLE, store_and_fwd_flag: STRING, PULocationID: BIGINT, DOLocationID: BIGINT, payment_type: BIGINT, fare: DOUBLE, extra: DOUBLE, mta_tax: DOUBLE, tip_amount: DOUBLE, tolls_amount: DOUBLE, improvement_surcharge: DOUBLE, total_amount: DOUBLE, congestion_surcharge: DOUBLE, airport_fee: DOUBLE, fare_per_distance_unit: FLOAT>".; line 2 pos 0;
AddColumns [QualifiedColType(None,fare_per_distance_unit,FloatType,true,None,Some(resolvedfieldposition(AFTER distance)),None)]
+- ResolvedTable org.apache.iceberg.spark.SparkCatalog@6be3bb59, nyc.taxis, hive_prd.nyc.taxis, [VendorID#20L, tpep_pickup_datetime#21, tpep_dropoff_datetime#22, passenger_count#23, distance#24, RatecodeID#25, store_and_fwd_flag#26, PULocationID#27L, DOLocationID#28L, payment_type#29L, fare#30, extra#31, mta_tax#32, tip_amount#33, tolls_amount#34, improvement_surcharge#35, total_amount#36, congestion_surcharge#37, airport_fee#38, fare_per_distance_unit#39]


Let's update the new `fare_per_distance_unit` to equal `fare` divided by `distance`.

In [16]:
%%sql

ALTER TABLE nyc.taxis
ADD COLUMN fare_per_distance_unit float

In [17]:
%%sql

UPDATE nyc.taxis
SET fare_per_distance_unit = fare/distance

                                                                                

In [18]:
%%sql

SELECT
VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance_unit
FROM nyc.taxis

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,fare,distance,fare_per_distance_unit
1,2021-04-01 00:00:18,2021-04-01 00:21:54,25.5,8.4,3.0357143878936768
1,2021-04-01 00:42:37,2021-04-01 00:46:23,5.0,0.9,5.55555534362793
1,2021-04-01 00:57:56,2021-04-01 01:08:22,11.5,3.4,3.382352828979492
1,2021-04-01 00:01:58,2021-04-01 00:54:27,44.2,0.0,
2,2021-04-01 00:24:55,2021-04-01 00:34:33,9.0,1.96,4.591836929321289
2,2021-04-01 00:19:16,2021-04-01 00:21:46,4.5,0.77,5.844155788421631
2,2021-04-01 00:25:11,2021-04-01 00:31:53,11.5,3.65,3.1506848335266118
1,2021-04-01 00:27:53,2021-04-01 00:47:03,26.5,8.9,2.9775280952453613
2,2021-04-01 00:24:24,2021-04-01 00:37:50,12.0,2.98,4.026845455169678
1,2021-04-01 00:19:18,2021-04-01 00:41:25,28.0,8.9,3.146067380905152


## Expressive SQL for Row Level Changes
With Iceberg tables, `DELETE` queries can be used to perform row-level deletes. This is as simple as providing the table name and a `WHERE` predicate. If the filter matches an entire partition of the table, Iceberg will intelligently perform a metadata-only operation where it simply deletes the metadata for that partition.

Let's perform a row-level delete for all rows that have a `fare_per_distance_unit` greater than 4 or a `distance` greater than 2. This should leave us with relatively short trips that have a relatively high fare per distance traveled.

In [19]:
%%sql

DELETE FROM nyc.taxis
WHERE fare_per_distance_unit > 4.0 OR distance > 2.0

                                                                                

There are some fares that have a `null` for `fare_per_distance_unit` due to the distance being `0`. Let's remove those as well.

In [20]:
%%sql

DELETE FROM nyc.taxis
WHERE fare_per_distance_unit is null

In [21]:
%%sql

SELECT
VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance_unit
FROM nyc.taxis

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,fare,distance,fare_per_distance_unit
2,2021-04-01 00:53:54,2021-04-01 00:58:37,7.5,1.93,3.886010408401489
2,2021-04-01 00:05:45,2021-04-01 00:15:21,-9.0,1.96,-4.591836929321289
2,2021-04-01 00:16:50,2021-04-01 00:22:58,7.5,1.95,3.846153736114502
2,2021-04-01 00:22:33,2021-04-01 00:23:00,-2.5,0.03,-83.33333587646484
1,2021-04-01 00:48:37,2021-04-01 00:54:30,7.5,1.9,3.9473683834075928
2,2021-04-01 00:26:08,2021-04-01 00:29:36,6.5,1.79,3.631284952163696
2,2021-04-01 00:10:58,2021-04-01 00:16:00,6.5,1.66,3.9156627655029297
2,2021-04-01 00:13:14,2021-04-01 00:16:13,-4.5,0.73,-6.164383411407471
1,2021-04-01 00:20:52,2021-04-01 00:27:13,8.0,2.0,4.0
2,2021-04-01 01:12:20,2021-04-01 01:17:19,7.0,1.92,3.6458332538604736


In [22]:
%%sql

SELECT COUNT(*) as cnt
FROM nyc.taxis

cnt
17703


## Partitioning

A table’s partitioning can be updated in place and applied only to newly written data. Query plans are then split, using the old partition scheme for data written before the partition scheme was changed, and using the new partition scheme for data written after. People querying the table don’t even have to be aware of this split. Simple predicates in WHERE clauses are automatically converted to partition filters that prune out files with no matches. This is what’s referred to in Iceberg as *Hidden Partitioning*.

In [8]:
spark.conf.get("spark.sql.extensions")
# spark.conf.get("spark.app.name")

In [2]:
%%sql

ALTER TABLE nyc.taxis
ADD PARTITION FIELD VendorID

In [27]:
%%sql

CREATE TABLE nyc.sample (
    id bigint,
    data string,
    category string)
USING iceberg
PARTITIONED BY (category);

## Metadata Tables

Iceberg tables contain very rich metadata that can be easily queried. For example, you can retrieve the manifest list for any snapshot, simply by querying the table's `snapshots` table.

In [4]:
%%sql

SELECT snapshot_id, manifest_list
FROM nyc.taxis.snapshots

snapshot_id,manifest_list
3983422392546513524,s3a://warehouse/wh/nyc.db/taxis/metadata/snap-3983422392546513524-1-7ddc94f6-d1ad-4579-8c31-215f5cfc64f3.avro
8483294036336372047,s3a://warehouse/wh/nyc.db/taxis/metadata/snap-8483294036336372047-1-445ce58e-25be-4449-ae69-bdd743f24dea.avro
2884711803897636007,s3a://warehouse/wh/nyc.db/taxis/metadata/snap-2884711803897636007-1-5c1791d5-1b6c-4aa7-97ae-60e416217fac.avro
1429449267781306858,s3a://warehouse/wh/nyc.db/taxis/metadata/snap-1429449267781306858-1-e0d103ab-ca59-411b-9268-e98ba277e8a6.avro


The `files` table contains loads of information on data files, including column level statistics such as null counts, lower bounds, and upper bounds.

In [5]:
%%sql

SELECT file_path, file_format, record_count, null_value_counts, lower_bounds, upper_bounds
FROM nyc.taxis.files

file_path,file_format,record_count,null_value_counts,lower_bounds,upper_bounds
s3a://warehouse/wh/nyc.db/taxis/data/00000-15-6d3573e0-9628-4028-a5c5-2be7859d2bf7-0-00001.parquet,PARQUET,17703,"{1: 0, 2: 0, 3: 0, 4: 299, 5: 0, 6: 299, 7: 299, 8: 0, 9: 0, 10: 0, 11: 0, 12: 0, 13: 0, 14: 0, 15: 0, 16: 0, 17: 0, 18: 299, 19: 299, 20: 0}","{1: bytearray(b'\x01\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'@\x88-\xfe\xdd\xbe\x05\x00'), 3: bytearray(b'@\x98\x82 \xde\xbe\x05\x00'), 4: bytearray(b'\x00\x00\x00\x00\x00\x00\x00\x00'), 5: bytearray(b'{\x14\xaeG\xe1z\x84?'), 6: bytearray(b'\x00\x00\x00\x00\x00\x00\xf0?'), 7: bytearray(b'N'), 8: bytearray(b'\x01\x00\x00\x00\x00\x00\x00\x00'), 9: bytearray(b'\x01\x00\x00\x00\x00\x00\x00\x00'), 10: bytearray(b'\x00\x00\x00\x00\x00\x00\x00\x00'), 11: bytearray(b'\x00\x00\x00\x00\x00 l\xc0'), 12: bytearray(b'\x00\x00\x00\x00\x00\x00\x12\xc0'), 13: bytearray(b'\x00\x00\x00\x00\x00\x00\xe0\xbf'), 14: bytearray(b'\x85\xebQ\xb8\x1e\xd5t\xc0'), 15: bytearray(b'\x00\x00\x00\x00\x00\x00\x04\xc0'), 16: bytearray(b'333333\xd3\xbf'), 17: bytearray(b'R\xb8\x1e\x85\xebYu\xc0'), 18: bytearray(b'\x00\x00\x00\x00\x00\x00\x04\xc0'), 19: bytearray(b'\x00\x00\x00\x00\x00\x00\xf4\xbf'), 20: bytearray(b'\x00\x00\xfa\xc5')}","{1: bytearray(b'\x06\x00\x00\x00\x00\x00\x00\x00'), 2: bytearray(b'@\xa72h9\xc1\x05\x00'), 3: bytearray(b'\xc0X\xfclC\xc1\x05\x00'), 4: bytearray(b'\x00\x00\x00\x00\x00\x00\x18@'), 5: bytearray(b'\x00\x00\x00\x00\x00\x00\x00@'), 6: bytearray(b'\x00\x00\x00\x00\x00\xc0X@'), 7: bytearray(b'Y'), 8: bytearray(b'\t\x01\x00\x00\x00\x00\x00\x00'), 9: bytearray(b'\t\x01\x00\x00\x00\x00\x00\x00'), 10: bytearray(b'\x04\x00\x00\x00\x00\x00\x00\x00'), 11: bytearray(b'\x00\x00\x00\x00\x00\x00 @'), 12: bytearray(b'\x00\x00\x00\x00\x00\x00\x16@'), 13: bytearray(b'\x00\x00\x00\x00\x00\x00\xe0?'), 14: bytearray(b'\x00\x00\x00\x00\x00@U@'), 15: bytearray(b'\x00\x00\x00\x00\x00\x80;@'), 16: bytearray(b'333333\xd3?'), 17: bytearray(b'33333SU@'), 18: bytearray(b'\x00\x00\x00\x00\x00\x00\x04@'), 19: bytearray(b'\x00\x00\x00\x00\x00\x00\xf4?'), 20: bytearray(b'\x00\x00\x80@')}"


## Time Travel

The history table lists all snapshots and which parent snapshot they derive from. The `is_current_ancestor` flag let's you know if a snapshot is part of the linear history of the current snapshot of the table.

In [6]:
%%sql

SELECT *
FROM nyc.taxis.history

made_current_at,snapshot_id,parent_id,is_current_ancestor
2024-09-01 04:50:02.594000,3983422392546513524,,True
2024-09-01 04:56:11.449000,8483294036336372047,3.9834223925465134e+18,True
2024-09-01 04:56:26.891000,2884711803897636007,8.483294036336372e+18,True
2024-09-01 04:56:32.571000,1429449267781306858,2.884711803897636e+18,True


You can time-travel by altering the `current-snapshot-id` property of the table to reference any snapshot in the table's history. Let's revert the table to it's original state by traveling to the very first snapshot ID.

In [7]:
%%sql --var df

SELECT *
FROM nyc.taxis.history

made_current_at,snapshot_id,parent_id,is_current_ancestor
2024-09-01 04:50:02.594000,3983422392546513524,,True
2024-09-01 04:56:11.449000,8483294036336372047,3.9834223925465134e+18,True
2024-09-01 04:56:26.891000,2884711803897636007,8.483294036336372e+18,True
2024-09-01 04:56:32.571000,1429449267781306858,2.884711803897636e+18,True


In [8]:
original_snapshot = df.head().snapshot_id
spark.sql(f"CALL system.rollback_to_snapshot('nyc.taxis', {original_snapshot})")
original_snapshot

3983422392546513524

In [9]:
%%sql

SELECT
VendorID
,tpep_pickup_datetime
,tpep_dropoff_datetime
,fare
,distance
,fare_per_distance_unit
FROM nyc.taxis

                                                                                

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,fare,distance,fare_per_distance_unit
1,2021-04-01 00:00:18,2021-04-01 00:21:54,25.5,8.4,
1,2021-04-01 00:42:37,2021-04-01 00:46:23,5.0,0.9,
1,2021-04-01 00:57:56,2021-04-01 01:08:22,11.5,3.4,
1,2021-04-01 00:01:58,2021-04-01 00:54:27,44.2,0.0,
2,2021-04-01 00:24:55,2021-04-01 00:34:33,9.0,1.96,
2,2021-04-01 00:19:16,2021-04-01 00:21:46,4.5,0.77,
2,2021-04-01 00:25:11,2021-04-01 00:31:53,11.5,3.65,
1,2021-04-01 00:27:53,2021-04-01 00:47:03,26.5,8.9,
2,2021-04-01 00:24:24,2021-04-01 00:37:50,12.0,2.98,
1,2021-04-01 00:19:18,2021-04-01 00:41:25,28.0,8.9,


Another look at the history table shows that the original state of the table has been added as a new entry
with the original snapshot ID.

In [10]:
%%sql

SELECT *
FROM nyc.taxis.history

made_current_at,snapshot_id,parent_id,is_current_ancestor
2024-09-01 04:50:02.594000,3983422392546513524,,True
2024-09-01 04:56:11.449000,8483294036336372047,3.9834223925465134e+18,False
2024-09-01 04:56:26.891000,2884711803897636007,8.483294036336372e+18,False
2024-09-01 04:56:32.571000,1429449267781306858,2.884711803897636e+18,False
2024-09-01 07:21:45.485000,3983422392546513524,,True


In [11]:
%%sql

SELECT COUNT(*) as cnt
FROM nyc.taxis

cnt
2171187
