# Using Apache Iceberg with Spark 3 in CML

The official documentation for Apache Iceberg with Spark is located at [this link](https://iceberg.apache.org/#getting-started/#using-iceberg-in-spark-3)

For a full list of Apache Iceberg terms, please visit [this link](https://iceberg.apache.org/#terms/)

In [1]:
import cml.data_v1 as cmldata

# Sample in-code customization of spark configurations
#from pyspark import SparkContext
#SparkContext.setSystemProperty('spark.executor.cores', '1')
#SparkContext.setSystemProperty('spark.executor.memory', '2g')

CONNECTION_NAME = "go01-aw-dl"

conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

In [2]:
import os

username = os.environ["PROJECT_OWNER"]

#### You can use simple Spark SQL commands to create Spark tables as you always have. Just make sure to specify the USING iceberg clause.

In [4]:
spark.sql("CREATE TABLE IF NOT EXISTS default.{}_ice (id bigint, data string) USING iceberg".format(username))

DataFrame[]

#### To select a specific table snapshot or the snapshot at some time, Iceberg supports two Spark read options:

* snapshot-id selects a specific table snapshot
* as-of-timestamp selects the current snapshot at a timestamp, in milliseconds

#### You can view all snapshots associated with the table

In [5]:
spark.sql("SELECT * FROM default.{}_ice".format(username))

DataFrame[id: bigint, data: string]

#### Or a full table version history 

In [6]:
spark.read.format("iceberg").load("default.{}_ice.history".format(username)).show(20, False)

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2024-03-16 00:53:23.233|2952744008021118981|null               |true               |
|2024-03-16 00:55:07.633|4221471412694680144|2952744008021118981|true               |
|2024-03-16 00:55:26.894|2773036698405268957|4221471412694680144|true               |
|2024-03-16 00:55:28.269|5208843994265553390|2773036698405268957|true               |
|2024-03-16 00:55:29.615|3789197272971574187|5208843994265553390|true               |
|2024-03-16 00:55:31.048|4009769249131678601|3789197272971574187|true               |
|2024-03-16 00:55:32.441|428428724457144922 |4009769249131678601|true               |
|2024-03-16 00:55:33.823|7443296050635542790|428428724457144922 |true               |
|2024-03-16 00:55:35.212|7363979166116357902|744329605

##### A manifest file is a metadata file that lists a subset of data files that make up a snapshot.

##### Each data file in a manifest is stored with a partition tuple, column-level stats, and summary information used to prune splits during scan planning.

##### To show a table’s file manifests and each file’s metadata, run:

In [7]:
spark.read.format("iceberg").load("default.{}_ice.manifests".format(username)).show(5, False)

+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+------------------------+---------------------------+--------------------------+-------------------+
|content|path                                                                                                                                                                          |length|partition_spec_id|added_snapshot_id  |added_data_files_count|existing_data_files_count|deleted_data_files_count|added_delete_files_count|existing_delete_files_count|deleted_delete_files_count|partition_summaries|
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+-

## Time Travel

### Using snapshots as shown above, we can insert some data into the table and roll back to its original state

In [8]:
# Insert using Iceberg format
spark.sql("INSERT INTO default.{}_ice VALUES (1, 'x'), (2, 'y'), (3, 'z')".format(username))

DataFrame[]

In [9]:
# Query using select
spark.sql("SELECT * FROM default.{}_ice".format(username)).show()

+---+----+
| id|data|
+---+----+
|  6|   w|
| 10|   S|
|  1|   d|
|  2|   e|
|  3|   f|
|  6|   N|
|  8|   I|
|  9|   Q|
|  0|   x|
|  8|   c|
|  3|   G|
|  3|   C|
|  7|   u|
|  9|   G|
|  0|   E|
|  1|   x|
|  2|   y|
|  3|   z|
|  5|   r|
|  4|   r|
+---+----+
only showing top 20 rows



In [10]:
from datetime import datetime

# current date and time
now = datetime.now()

timestamp = datetime.timestamp(now)
print("timestamp =", timestamp)

timestamp = 1712681548.088943


In [11]:
# Query using a point in time
df = spark.read.option("as-of-timestamp", int(timestamp*1000)).format("iceberg").load("default.{}_ice".format(username))
df.show(100)

+---+----+
| id|data|
+---+----+
|  6|   w|
| 10|   S|
|  6|   N|
|  1|   d|
|  2|   e|
|  3|   f|
|  8|   I|
|  9|   Q|
|  3|   G|
|  0|   x|
|  8|   c|
|  3|   C|
|  9|   G|
|  7|   u|
|  4|   r|
|  0|   E|
|  1|   x|
|  2|   y|
|  3|   z|
|  5|   r|
|  3|   p|
|  6|   B|
|  6|   T|
|  6|   c|
|  6|   V|
|  5|   D|
|  5|   j|
|  7|   p|
|  6|   E|
|  1|   x|
|  2|   y|
|  3|   z|
|  9|   U|
| 10|   v|
+---+----+



In [12]:
# Insert using Iceberg format
spark.sql("INSERT INTO default.{}_ice VALUES (1, 'd'), (2, 'e'), (3, 'f')".format(username))

DataFrame[]

#### Let's insert more data into the table

In [13]:
# Insert using Iceberg format
import string
import random

for i in range(25):
    number = random.randint(0, 10)
    letter = random.choice(string.ascii_letters)
    spark.sql("INSERT INTO default.{}_ice VALUES ({}, '{}')".format(username, number, letter))

#### Now let's access the data again. Let's access it with the same timestemp as before. Notice we have a smaller number of rows than we just inserted.

In [14]:
# Query using a point in time
df = spark.read.option("as-of-timestamp", int(timestamp*1000)).format("iceberg").load("default.{}_ice".format(username))
df.show(100)

+---+----+
| id|data|
+---+----+
|  6|   w|
| 10|   S|
|  1|   d|
|  2|   e|
|  3|   f|
|  6|   N|
|  8|   I|
|  0|   x|
|  9|   Q|
|  3|   G|
|  8|   c|
|  3|   C|
|  7|   u|
|  9|   G|
|  0|   E|
|  4|   r|
|  1|   x|
|  2|   y|
|  3|   z|
|  5|   r|
|  3|   p|
|  6|   B|
|  6|   T|
|  6|   c|
|  5|   D|
|  6|   V|
|  5|   j|
|  7|   p|
|  1|   x|
|  2|   y|
|  3|   z|
|  6|   E|
|  9|   U|
| 10|   v|
+---+----+



### Observe that many new Snapshots have been created

In [15]:
spark.read.format("iceberg").load("default.{}_ice.history".format(username)).show(10, False)

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2024-03-16 00:53:23.233|2952744008021118981|null               |true               |
|2024-03-16 00:55:07.633|4221471412694680144|2952744008021118981|true               |
|2024-03-16 00:55:26.894|2773036698405268957|4221471412694680144|true               |
|2024-03-16 00:55:28.269|5208843994265553390|2773036698405268957|true               |
|2024-03-16 00:55:29.615|3789197272971574187|5208843994265553390|true               |
|2024-03-16 00:55:31.048|4009769249131678601|3789197272971574187|true               |
|2024-03-16 00:55:32.441|428428724457144922 |4009769249131678601|true               |
|2024-03-16 00:55:33.823|7443296050635542790|428428724457144922 |true               |
|2024-03-16 00:55:35.212|7363979166116357902|744329605

### You can also query the table in its previous state as of a specific partition.

#### Copy paste a snapshot_id from above and paste it in the next Spark command

In [17]:
spark.read\
    .option("snapshot-id", 3223179238839827744)\
    .table("default.{}_ice".format(username)).show()

+---+----+
| id|data|
+---+----+
|  6|   B|
|  5|   r|
|  1|   d|
|  2|   e|
|  3|   f|
|  9|   G|
|  6|   E|
|  1|   x|
|  2|   y|
|  9|   Q|
|  3|   z|
|  9|   U|
|  5|   D|
| 10|   v|
+---+----+



### The Iceberg API allows you to create tables from Spark Dataframes, and more

In [18]:
new_df = spark.sql("SELECT * FROM default.{}_ice".format(username)).sample(fraction=0.5, seed=3)

In [19]:
new_df.dtypes

[('id', 'bigint'), ('data', 'string')]

In [20]:
new_df.show(10)

+---+----+
| id|data|
+---+----+
|  6|   w|
|  8|   I|
|  3|   G|
|  6|   B|
|  6|   E|
|  3|   f|
|  7|   R|
|  7|   T|
|  2|   e|
|  9|   Q|
+---+----+
only showing top 10 rows

