# Using Apache Iceberg with Spark 3 in CML

The official documentation for Apache Iceberg with Spark is located at [this link](https://iceberg.apache.org/#getting-started/#using-iceberg-in-spark-3)

For a full list of Apache Iceberg terms, please visit [this link](https://iceberg.apache.org/#terms/)

In [2]:
#spark.sql("DROP TABLE IF EXISTS spark_catalog.testdb.newtesttable")
#spark.sql("DROP TABLE IF EXISTS spark_catalog.testdb.secondtesttable")

In [60]:
spark.stop()

### Start a PySpark Session as shown below. You will want to set the Spark Catalog configurations as shown

In [61]:
"""SimpleApp.py"""
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder\
  .appName("1.1 - Ingest") \
  .config("spark.hadoop.fs.s3a.s3guard.ddb.region", "us-east-2")\
  .config("spark.yarn.access.hadoopFileSystems", "s3a://demo-aws-go02")\
  .config("spark.jars","/home/cdsw/lib/iceberg-spark3-runtime-0.9.1.1.13.317211.0-9.jar") \
  .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
  .config("spark.sql.catalog.spark_catalog","org.apache.iceberg.spark.SparkSessionCatalog") \
  .config("spark.sql.catalog.local","org.apache.iceberg.spark.SparkCatalog") \
  .config("spark.sql.catalog.local.type","hadoop") \
  .config("spark.sql.catalog.spark_catalog.type","hive") \
  .getOrCreate()

### Iceberg comes with catalogs that enable SQL commands to manage tables and load them by name. 
### Catalogs are configured using properties under spark.sql.catalog.(catalog_name).

In [62]:
  # Using a local Spark Catalog

#spark.sql("CREATE DATABASE IF NOT EXISTS spark_catalog.newjar")
spark.sql("USE spark_catalog.testdb")
spark.sql("SHOW CURRENT NAMESPACE").show()
#spark.sql("DROP TABLE testtable")

+-------------+---------+
|      catalog|namespace|
+-------------+---------+
|spark_catalog|   testdb|
+-------------+---------+



### You can use simple Spark SQL commands to create Spark tables as you always have. Just make sure to specify the USING iceberg clause.

In [63]:
spark.sql("CREATE TABLE IF NOT EXISTS newtesttable (id bigint, data string) USING iceberg")

DataFrame[]

### To select a specific table snapshot or the snapshot at some time, Iceberg supports two Spark read options:

* snapshot-id selects a specific table snapshot
* as-of-timestamp selects the current snapshot at a timestamp, in milliseconds

#### You can view all snapshots associated with the table

In [64]:
spark.sql("SELECT * FROM spark_catalog.testdb.newtesttable")

DataFrame[id: bigint, data: string]

In [65]:
spark.read.format("iceberg").load("spark_catalog.testdb.newtesttable.snapshots").show(20, False)

+-----------------------+-------------------+-------------------+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id          |operation|manifest_list                                                                                                                                              |summary                                                                                                                                                                                                                                   

#### Or a full table version history 

In [66]:
spark.read.format("iceberg").load("spark_catalog.testdb.newtesttable.history").show(20, False)

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2022-05-19 00:27:36.444|5797822168349485877|null               |true               |
|2022-05-19 00:28:16.897|3326689113563627160|5797822168349485877|true               |
|2022-05-19 00:36:41.604|2080039523045202901|3326689113563627160|true               |
|2022-05-19 00:36:42.87 |8726800525946754873|2080039523045202901|true               |
|2022-05-19 00:36:43.934|5250560437891035012|8726800525946754873|true               |
|2022-05-19 00:36:44.966|3502850421386520639|5250560437891035012|true               |
|2022-05-19 00:36:46.076|6484488418513084975|3502850421386520639|true               |
|2022-05-19 00:36:47.109|6131843803655349628|6484488418513084975|true               |
|2022-05-19 00:36:48.125|9149412625213001646|613184380

#### To show a table’s data files and each file’s metadata, run:

In [67]:
spark.read.format("iceberg").load("spark_catalog.testdb.newtesttable.files").show(20, False)

+-------+----------------------------------------------------------------------------------------------------------------------------------------------+-----------+------------+------------------+------------------+----------------+-----------------+----------------+-----------------------+-----------------------+------------+-------------+------------+
|content|file_path                                                                                                                                     |file_format|record_count|file_size_in_bytes|column_sizes      |value_counts    |null_value_counts|nan_value_counts|lower_bounds           |upper_bounds           |key_metadata|split_offsets|equality_ids|
+-------+----------------------------------------------------------------------------------------------------------------------------------------------+-----------+------------+------------------+------------------+----------------+-----------------+----------------+---------------------

### A manifest file is a metadata file that lists a subset of data files that make up a snapshot.

### Each data file in a manifest is stored with a partition tuple, column-level stats, and summary information used to prune splits during scan planning.

#### To show a table’s file manifests and each file’s metadata, run:

In [68]:
spark.read.format("iceberg").load("spark_catalog.testdb.newtesttable.manifests").show(20, False)

+-----------------------------------------------------------------------------------------------------------------------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+-------------------+
|path                                                                                                                               |length|partition_spec_id|added_snapshot_id  |added_data_files_count|existing_data_files_count|deleted_data_files_count|partition_summaries|
+-----------------------------------------------------------------------------------------------------------------------------------+------+-----------------+-------------------+----------------------+-------------------------+------------------------+-------------------+
|s3a://demo-aws-go02/warehouse/tablespace/external/hive/testdb.db/newtesttable/metadata/939c9bae-bb1f-4a5e-a792-11c5900a55ab-m0.avro|5599  |0                |7365436161241313619|1  

## Time Travel

### Using snapshots as shown above, we can insert some data into the table and roll back to its original state

In [69]:
# Insert using Iceberg format
spark.sql("INSERT INTO spark_catalog.testdb.newtesttable VALUES (1, 'x'), (2, 'y'), (3, 'z')")

DataFrame[]

In [70]:
# Query using select
spark.sql("SELECT * FROM spark_catalog.testdb.newtesttable").show()

+---+----+
| id|data|
+---+----+
| 10|   M|
|  1|   d|
|  2|   e|
|  3|   f|
|  9|   x|
|  6|   u|
|  3|   g|
|  5|   D|
|  9|   D|
|  6|   F|
|  8|   G|
|  8|   Q|
|  2|   X|
|  6|   R|
|  1|   x|
|  2|   y|
|  3|   z|
|  9|   j|
|  0|   g|
|  0|   i|
+---+----+
only showing top 20 rows



In [71]:
# Query using DF - All Data
df = spark.table("spark_catalog.testdb.newtesttable")
df.show(100)

+---+----+
| id|data|
+---+----+
| 10|   M|
|  1|   d|
|  2|   e|
|  3|   f|
|  9|   x|
|  6|   u|
|  3|   g|
|  5|   D|
|  9|   D|
|  6|   F|
|  8|   G|
|  8|   Q|
|  2|   X|
|  6|   R|
|  1|   x|
|  2|   y|
|  3|   z|
|  9|   j|
|  0|   g|
|  0|   i|
|  7|   K|
| 10|   K|
|  5|   p|
|  7|   v|
|  7|   V|
|  5|   M|
|  3|   X|
|  4|   P|
|  8|   y|
|  6|   H|
|  0|   Y|
|  9|   i|
|  1|   x|
|  2|   y|
|  3|   z|
| 10|   V|
|  4|   P|
|  1|   x|
|  2|   y|
|  3|   z|
|  6|   z|
|  2|   X|
|  4|   q|
|  0|   Z|
|  5|   X|
|  7|   w|
|  5|   k|
|  3|   P|
|  0|   K|
|  3|   a|
|  2|   U|
|  8|   R|
|  4|   j|
|  8|   p|
|  1|   x|
|  2|   y|
|  3|   z|
|  2|   G|
|  0|   Q|
| 10|   e|
|  1|   d|
|  2|   e|
|  3|   f|
|  0|   O|
|  1|   w|
|  7|   v|
| 10|   w|
|  3|   z|
+---+----+



In [72]:
from datetime import datetime

# current date and time
now = datetime.now()

timestamp = datetime.timestamp(now)
print("timestamp =", timestamp)

timestamp = 1653348163.46654


#### Timestamps can be tricky. Please make sure to round your timestamp as shown below.

In [73]:
# Query using a point in time
df = spark.read.option("as-of-timestamp", int(timestamp*1000)).format("iceberg").load("spark_catalog.testdb.newtesttable")
df.show(100)

+---+----+
| id|data|
+---+----+
| 10|   M|
|  1|   d|
|  2|   e|
|  3|   f|
|  9|   x|
|  6|   u|
|  3|   g|
|  5|   D|
|  9|   D|
|  6|   F|
|  8|   G|
|  8|   Q|
|  2|   X|
|  6|   R|
|  1|   x|
|  2|   y|
|  3|   z|
|  9|   j|
|  0|   g|
|  0|   i|
|  7|   K|
| 10|   K|
|  5|   p|
|  7|   v|
|  7|   V|
|  5|   M|
|  3|   X|
|  4|   P|
|  8|   y|
|  6|   H|
|  0|   Y|
|  9|   i|
|  1|   x|
|  2|   y|
|  3|   z|
| 10|   V|
|  4|   P|
|  1|   x|
|  2|   y|
|  3|   z|
|  6|   z|
|  2|   X|
|  4|   q|
|  0|   Z|
|  5|   X|
|  7|   w|
|  5|   k|
|  3|   P|
|  0|   K|
|  3|   a|
|  2|   U|
|  8|   R|
|  4|   j|
|  8|   p|
|  1|   x|
|  2|   y|
|  3|   z|
|  2|   G|
|  0|   Q|
| 10|   e|
|  1|   d|
|  2|   e|
|  3|   f|
|  0|   O|
|  1|   w|
|  7|   v|
| 10|   w|
|  3|   z|
+---+----+



In [74]:
# Insert using Iceberg format
spark.sql("INSERT INTO spark_catalog.testdb.newtesttable VALUES (1, 'd'), (2, 'e'), (3, 'f')")

DataFrame[]

### Let's insert more data into the table

In [75]:
# Insert using Iceberg format
import string
import random

for i in range(25):
    number = random.randint(0, 10)
    letter = random.choice(string.ascii_letters)
    spark.sql("INSERT INTO spark_catalog.testdb.newtesttable VALUES ({}, '{}')".format(number, letter))

### Now let's access the data again. Let's access it with the same timestemp as before. Notice we have a smaller number of rows than we just inserted.

In [76]:
# Query using a point in time
df = spark.read.option("as-of-timestamp", int(timestamp*1000)).format("iceberg").load("spark_catalog.testdb.newtesttable")
df.show(100)

+---+----+
| id|data|
+---+----+
| 10|   M|
|  1|   d|
|  2|   e|
|  3|   f|
|  9|   x|
|  6|   u|
|  3|   g|
|  5|   D|
|  9|   D|
|  6|   F|
|  8|   G|
|  8|   Q|
|  2|   X|
|  6|   R|
|  1|   x|
|  2|   y|
|  3|   z|
|  9|   j|
|  0|   g|
|  0|   i|
|  7|   K|
| 10|   K|
|  5|   p|
|  7|   v|
|  7|   V|
|  5|   M|
|  3|   X|
|  4|   P|
|  8|   y|
|  6|   H|
|  0|   Y|
|  9|   i|
|  1|   x|
|  2|   y|
|  3|   z|
| 10|   V|
|  4|   P|
|  1|   x|
|  2|   y|
|  3|   z|
|  6|   z|
|  2|   X|
|  4|   q|
|  0|   Z|
|  5|   X|
|  7|   w|
|  5|   k|
|  3|   P|
|  0|   K|
|  3|   a|
|  2|   U|
|  8|   R|
|  4|   j|
|  8|   p|
|  1|   x|
|  2|   y|
|  3|   z|
|  2|   G|
|  0|   Q|
| 10|   e|
|  1|   d|
|  2|   e|
|  3|   f|
|  0|   O|
|  1|   w|
|  7|   v|
| 10|   w|
|  3|   z|
+---+----+



### Observe that many new Snapshots have been created

In [77]:
spark.read.format("iceberg").load("spark_catalog.testdb.newtesttable.history").show(20, False)

+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2022-05-19 00:27:36.444|5797822168349485877|null               |true               |
|2022-05-19 00:28:16.897|3326689113563627160|5797822168349485877|true               |
|2022-05-19 00:36:41.604|2080039523045202901|3326689113563627160|true               |
|2022-05-19 00:36:42.87 |8726800525946754873|2080039523045202901|true               |
|2022-05-19 00:36:43.934|5250560437891035012|8726800525946754873|true               |
|2022-05-19 00:36:44.966|3502850421386520639|5250560437891035012|true               |
|2022-05-19 00:36:46.076|6484488418513084975|3502850421386520639|true               |
|2022-05-19 00:36:47.109|6131843803655349628|6484488418513084975|true               |
|2022-05-19 00:36:48.125|9149412625213001646|613184380

### You can also query the table in its previous state as of a specific partition.

#### Copy paste a partition_id from above and paste it in the next Spark command

In [78]:
spark.read\
    .option("snapshot-id", 6484488418513084975)\
    .table("spark_catalog.testdb.newtesttable").show()

+---+----+
| id|data|
+---+----+
|  8|   p|
|  1|   d|
|  2|   e|
|  3|   f|
|  1|   w|
|  7|   v|
|  0|   g|
|  1|   x|
|  2|   y|
|  3|   z|
|  7|   v|
+---+----+



### The Iceberg API allows you to create tables from Spark Dataframes, and more

In [79]:
new_df = spark.sql("SELECT * FROM spark_catalog.testdb.newtesttable").sample(fraction=0.5, seed=3)

In [80]:
new_df.dtypes

[('id', 'bigint'), ('data', 'string')]

In [81]:
new_df.show()

+---+----+
| id|data|
+---+----+
| 10|   M|
|  9|   x|
|  6|   u|
|  5|   D|
|  6|   F|
|  6|   R|
|  8|   r|
|  1|   x|
|  9|   j|
|  0|   i|
| 10|   K|
|  5|   p|
|  3|   j|
|  6|   G|
|  7|   V|
|  5|   M|
|  3|   X|
|  6|   O|
|  5|   i|
|  1|   d|
+---+----+
only showing top 20 rows



#### Creating a new Spark Table with the API and Loading It

In [82]:
#spark.sql("DROP TABLE IF EXISTS spark_catalog.testdb.secondtesttable")

In [29]:
new_df.writeTo("spark_catalog.testdb.secondtesttable").create()

In [52]:
new_df_nodups = new_df.dropDuplicates(["id"])

In [53]:
new_df_nodups.show()

+---+----+
| id|data|
+---+----+
|  0|   Z|
|  7|   K|
|  6|   z|
|  9|   x|
|  5|   M|
|  1|   x|
| 10|   e|
|  3|   z|
|  8|   p|
|  2|   y|
|  4|   P|
+---+----+



#### More ETL SQL Operations

In [31]:
spark.sql("INSERT INTO spark_catalog.testdb.secondtesttable SELECT * FROM spark_catalog.testdb.newtesttable")

DataFrame[]

In [43]:
new_df.count()

33

In [40]:
second_df = spark.sql("SELECT * FROM spark_catalog.testdb.secondtesttable") 

In [42]:
second_df.count()

98

In [44]:
second_df=second_df.withColumn("id", second_df.id*3)

In [54]:
sec_df_nodups = second_df.dropDuplicates(["id"])

In [55]:
sec_df_nodups.show()

+---+----+
| id|data|
+---+----+
|  0|   g|
|  6|   y|
|  9|   f|
| 27|   x|
|  3|   d|
| 12|   P|
| 18|   u|
| 21|   K|
| 15|   D|
| 30|MMMM|
| 24|   G|
+---+----+



In [56]:
new_df_nodups.writeTo("spark_catalog.testdb.new_df_nodups").create()

In [57]:
sec_df_nodups.writeTo("spark_catalog.testdb.sec_df_nodups").create()

#### Update and Merge Into SQL Operations

In [94]:
#spark.sql("UPDATE spark_catalog.testdb.new_df_nodups SET data = '?' WHERE id = (SELECT id FROM spark_catalog.testdb.sec_df_nodups)")

In [None]:
#spark.sql(
#"MERGE INTO spark_catalog.testdb.new_df_nodups t USING (SELECT * FROM spark_catalog.testdb.sec_df_nodups) u ON t.id = u.id \
#WHEN MATCHED THEN UPDATE SET t.data = u.data + t.data \
#WHEN NOT MATCHED THEN INSERT *")

In [None]:
spark.sql("SELECT count(*) FROM spark_catalog.testdb.new_df_nodups").show() 

In [None]:
spark.sql("SELECT * FROM spark_catalog.testdb.new_df_nodups").show() 