# Spark-Iceberg Evaluation



# Index

1) [Configuring Iceberg](#configure_iceberg) <br>
2) [Evaluating Iceberg for Spark](#test) <br>
&emsp;&emsp;&emsp;&emsp;a) [Regular Dataframe behavior](#normaldftest) <br>
&emsp;&emsp;&emsp;&emsp;b) [Pure Dataframe + Iceberg](#dftest) <br>
&emsp;&emsp;&emsp;&emsp;c) [SparkSQL + Iceberg](#sparksqltest) <br>
3) [Hybrid tests with Dataframe and Datasets](#hybrid) <br>
&emsp;&emsp;&emsp;&emsp;a) [SparkSQL + Iceberg + Dataframe](#sparksqldftest) <br>
&emsp;&emsp;&emsp;&emsp;b) [SparkSQL + Iceberg + Dataset](#sparksqldstest) <br>
4) [Perform transformations with UDFs and Dataset APIs](#udf) <br>
5) [Known Issues](#knownissues) <br>
6) [Key Takeaways](#summary) <br>


<a id="configure_iceberg"></a>
# Configuring Iceberg on Spark session

This notebook uses 3 * r4.4xlarge EMR 6.2 cluster. Iceberg 0.11 only works with Spark 3.0.1 due to this issue: https://github.com/apache/iceberg/issues/2335. Iceberg 0.12, yet to be released, will support Spark 3.1 on EMR 6.3.


In [35]:
%%configure -f
{
    "conf":  { 
             "spark.jars.packages":"org.apache.iceberg:iceberg-spark3-runtime:0.11.0",
             "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
             "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog",
             "spark.sql.catalog.spark_catalog.type":"hive",
             "spark.sql.catalog.local":"org.apache.iceberg.spark.SparkCatalog",
             "spark.sql.catalog.local.type":"hadoop",
             "spark.sql.catalog.local.warehouse":"s3://vasveena-test-demo/iceberg/catalog/tables/"
           } 
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
16,application_1619633879001_0026,spark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
16,application_1619633879001_0026,spark,idle,Link,Link,✔


In [36]:
spark.version

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res1: String = 3.0.1-amzn-0


<a id="test"></a>
# Evaluating Iceberg for Spark
<a id="normaldftest"></a>
## Regular Dataframe Read/Write test with Parquet Files

### Read Input data from S3

Let's read our 100M record dataset

In [43]:
val input_df = spark.read.parquet("s3://neilawstmp2/tmp/hudi-perf/input/")
input_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

input_df: org.apache.spark.sql.DataFrame = [id: bigint, month: bigint ... 5 more fields]
res9: Long = 100000000


In [45]:
input_df.printSchema()
input_df.rdd.getNumPartitions

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- month: long (nullable = true)
 |-- sk: long (nullable = true)
 |-- txt: struct (nullable = true)
 |    |-- key1: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- modified_timestamp: timestamp (nullable = true)

res12: Int = 25


### Transform input data
Performing some transformations on the input

In [46]:
import org.apache.spark.sql.functions._

val input_df2=(input_df.withColumn("z", substring(md5(concat($"id")),1,1))
                       .withColumn("schema-v", lit("v1")).withColumn("data-v", lit("v2"))
                       .withColumn("trade_dt", substring($"modified_timestamp",1,10)))
input_df2.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.functions._
input_df2: org.apache.spark.sql.DataFrame = [id: bigint, month: bigint ... 9 more fields]
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
|     id|month|     sk|txt|                uuid|year| modified_timestamp|  z|schema-v|data-v|  trade_dt|
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
|4000000|    3|4000000|[E]|6e505939-f5fd-4ab...|2019|2021-04-02 00:05:02|  9|      v1|    v2|2021-04-02|
|4000001|    9|4000001|[F]|20486aca-2759-43f...|2019|2021-04-02 00:05:02|  d|      v1|    v2|2021-04-02|
|4000002|   11|4000002|[G]|42962a21-a2dc-40d...|2019|2021-04-02 00:05:02|  d|      v1|    v2|2021-04-02|
|4000003|    9|4000003|[H]|9841ad6d-1532-496...|2019|2021-04-02 00:05:02|  c|      v1|    v2|2021-04-02|
|4000004|    4|4000004|[I]|ff1a855a-cced-495...|2019|2021-04-02 00:05:02|  4|      v1|    v2|2021-04-02|
+-------+-----+-------+---+---

### Write data back to S3 in normal Parquet format 

In [23]:
val t1 = System.nanoTime

val s3_location=s"s3://vasveena-test-demo/temp6/parquet-perf/catalog/example_iceberg_perf_test_6"

input_df2.write.mode("OVERWRITE").partitionBy("z","schema-v","data-v","trade_dt").parquet(s3_location)

val duration = (System.nanoTime - t1) / 1e9d + " seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 1649793133537949
s3_location: String = s3://vasveena-test-demo/temp6/parquet-perf/catalog/example_iceberg_perf_test_6
duration: String = 97.146739323 seconds


### Write data back to S3 in Spark table format (Parquet)

In [24]:
val t1 = System.nanoTime

val s3_location_ndf=s"s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_normaldf"

(input_df2.write.mode("OVERWRITE")
    .option("path", s3_location_ndf)
    .partitionBy("z","schema-v","data-v","trade_dt").format("parquet")
    .saveAsTable("iceberg_table_normaldf"))

val duration = (System.nanoTime - t1) / 1e9d + "seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 1650088339766019
s3_location_ndf: String = s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_normaldf
duration: String = 100.121327602seconds


### Results

Reading output data

In [25]:
println("Reading direct Parquet output")
spark.read.parquet("s3://vasveena-test-demo/temp6/parquet-perf/catalog/example_iceberg_perf_test_6/").show(5)
println("Reading Spark table Parquet output")
spark.sql("select * from iceberg_table_normaldf limit 5").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Reading direct Parquet output
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
|     id|month|     sk|txt|                uuid|year| modified_timestamp|  z|schema-v|data-v|  trade_dt|
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
|4000011|    2|4000011|[P]|dbf28c6d-6f94-449...|2019|2021-04-02 00:05:02|  3|      v1|    v2|2021-04-02|
|4000017|    1|4000017|[V]|96fa033f-0fb0-4d5...|2019|2021-04-02 00:05:02|  3|      v1|    v2|2021-04-02|
|4000047|   10|4000047|[Z]|93c488d8-b882-442...|2019|2021-04-02 00:05:02|  3|      v1|    v2|2021-04-02|
|4000051|    2|4000051|[D]|4464b11b-14aa-4ca...|2019|2021-04-02 00:05:02|  3|      v1|    v2|2021-04-02|
|4000071|    4|4000071|[X]|bdb89389-2406-47b...|2019|2021-04-02 00:05:02|  3|      v1|    v2|2021-04-02|
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
only showing top 5 rows



Now lets go ahead and list the S3 location where our DF was written in regular Parquet format

In [26]:
%%sh

aws s3 ls s3://vasveena-test-demo/temp6/parquet-perf/catalog/example_iceberg_perf_test_6/

                           PRE z=0/
                           PRE z=1/
                           PRE z=2/
                           PRE z=3/
                           PRE z=4/
                           PRE z=5/
                           PRE z=6/
                           PRE z=7/
                           PRE z=8/
                           PRE z=9/
                           PRE z=a/
                           PRE z=b/
                           PRE z=c/
                           PRE z=d/
                           PRE z=e/
                           PRE z=f/
2021-05-17 20:28:35          0 _SUCCESS


Checking the S3 location where our DF was written in regular Spark table format

In [27]:
%%sh

aws s3 ls s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_normaldf/

                           PRE z=0/
                           PRE z=1/
                           PRE z=2/
                           PRE z=3/
                           PRE z=4/
                           PRE z=5/
                           PRE z=6/
                           PRE z=7/
                           PRE z=8/
                           PRE z=9/
                           PRE z=a/
                           PRE z=b/
                           PRE z=c/
                           PRE z=d/
                           PRE z=e/
                           PRE z=f/
2021-05-17 20:33:28          0 _SUCCESS


### Note

As expected, the S3 partitions are not hashed in both Spark table format and direct Parquet writes

<a id="dftest"></a>
## Dataframe Read/Write tests with Iceberg format

### Bulk insert our input dataframe in Iceberg format

Now lets try to write the transformed dataframe in Iceberg format

In [29]:
val t1 = System.nanoTime

val s3_location_idf=s"s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_dfonly"
val data_location_idf=s"s3://vasveena-test-hmswarehouse/"

//orderBy clause is required to avoid error "java.lang.IllegalStateException: Already closed files for partition: z=6/schema_v=v1/data_v=v2/trade_dt=trade_dt"
(input_df2.orderBy("z","`schema-v`","`data-v`","trade_dt").write.mode("OVERWRITE")
    .option("path", s3_location_idf)
    .option("write.object-storage.enabled",true)
    .option("write.object-storage.path",data_location_idf)
    .partitionBy("z","`schema-v`","`data-v`","trade_dt").format("iceberg")
    .saveAsTable("iceberg_table_dfonly22"))

val duration = (System.nanoTime - t1) / 1e9d + "seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 1650425623897933
s3_location_idf: String = s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_dfonly
data_location_idf: String = s3://vasveena-test-hmswarehouse/
duration: String = 171.271756516seconds


### Results

Reading output data

In [37]:
spark.read.format("iceberg").load("iceberg_table_dfonly22").printSchema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- month: long (nullable = true)
 |-- sk: long (nullable = true)
 |-- txt: struct (nullable = true)
 |    |-- key1: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- modified_timestamp: timestamp (nullable = true)
 |-- z: string (nullable = true)
 |-- schema-v: string (nullable = true)
 |-- data-v: string (nullable = true)
 |-- trade_dt: string (nullable = true)



In [38]:
println("Reading data in iceberg format")
spark.read.format("iceberg").load("iceberg_table_dfonly22").show(5)
println("Using SparkSQL")
spark.sql("select * from iceberg_table_dfonly22 limit 5").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Reading data in iceberg format
+--------+-----+--------+---+--------------------+----+-------------------+---+--------+------+----------+
|      id|month|      sk|txt|                uuid|year| modified_timestamp|  z|schema-v|data-v|  trade_dt|
+--------+-----+--------+---+--------------------+----+-------------------+---+--------+------+----------+
|44000005|    6|44000005|[X]|879bf2e9-92f1-44d...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|44000023|    7|44000023|[P]|141ee5fc-0222-40e...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|44000070|    6|44000070|[K]|fd1b1435-78a6-4f0...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|44000091|    2|44000091|[F]|0ccdc268-8dd9-4ba...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|44000099|    8|44000099|[N]|2271f8df-4eff-445...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
+--------+-----+--------+---+--------------------+----+-------------------+---+--------+------+----------+
only s

Listing S3 location of Iceberg data and metadata

In [39]:
%%sh

#Metastore location is empty. i.e, no pre-hash
aws s3 ls "s3://vasveena-test-hmswarehouse/"

#Listing table location
aws s3 ls "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_dfonly/"

#Lists iceberg metadata
aws s3 ls "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_dfonly/metadata/"

                           PRE data/
                           PRE metadata/
2021-05-17 20:40:19       3604 00000-a606ccd1-7242-4963-81a0-19f85b7997a9.metadata.json
2021-05-17 20:40:18       9457 53e4ba81-c785-4b3b-8a08-4de084692421-m0.avro
2021-05-17 20:40:19       3565 snap-7125006042262973350-1-53e4ba81-c785-4b3b-8a08-4de084692421.avro


### Note

S3 partitions are not hashed when using pure dataframe with iceberg

<a id="sparksqltest"></a>
## SparkSQL + Iceberg Read/Write test with Parquet Files

### Creating an output table in SparkSQL

In [40]:
spark.sql("""CREATE TABLE iceberg_table_sparksql22 (id bigint,
                                       month bigint,
                                       sk bigint,
                                       txt struct<key1:string>,
                                       uuid string,
                                       year string,
                                       modified_timestamp timestamp,
                                       z string,
                                       `schema_v` string,   --had to rename these columns due to error
                                       `data_v` string,
                                       trade_dt string)
USING iceberg
OPTIONS ( 'write.object-storage.enabled'=true,
          'write.object-storage.path'='s3://vasveena-test-hmswarehouse/')
PARTITIONED BY (z,`schema_v`,`data_v`,trade_dt)
LOCATION 's3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksql22'""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res7: org.apache.spark.sql.DataFrame = []


During insert with input_df2, we will get an error "Possibly unquoted identifier schema-v detected" which happened even if those fields are quoted properly on both source and destination. So, changing col name from data-v, schema-v to data_v and schema_v on both source and destination tables.

In [47]:
val input_df4 = input_df2.withColumnRenamed("schema-v", "schema_v").withColumnRenamed("data-v", "data_v")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

input_df4: org.apache.spark.sql.DataFrame = [id: bigint, month: bigint ... 9 more fields]


In [48]:
input_df4.printSchema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- month: long (nullable = true)
 |-- sk: long (nullable = true)
 |-- txt: struct (nullable = true)
 |    |-- key1: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- modified_timestamp: timestamp (nullable = true)
 |-- z: string (nullable = true)
 |-- schema_v: string (nullable = false)
 |-- data_v: string (nullable = false)
 |-- trade_dt: string (nullable = true)



In [49]:
input_df4.registerTempTable("inputdf4")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [55]:
spark.sql("""select * from inputdf4 limit 5""").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+--------+---+--------------------+----+-------------------+---+--------+------+----------+
|      id|month|      sk|txt|                uuid|year| modified_timestamp|  z|schema_v|data_v|  trade_dt|
+--------+-----+--------+---+--------------------+----+-------------------+---+--------+------+----------+
|15000000|    7|15000000|[C]|d413c3ff-031f-490...|2019|2021-04-02 00:05:02|  e|      v1|    v2|2021-04-02|
|15000001|    1|15000001|[D]|56137d34-8d22-46e...|2019|2021-04-02 00:05:02|  3|      v1|    v2|2021-04-02|
|15000002|   11|15000002|[E]|042a812f-1009-403...|2019|2021-04-02 00:05:02|  c|      v1|    v2|2021-04-02|
|15000003|    1|15000003|[F]|9a3d5aad-8d18-4d8...|2019|2021-04-02 00:05:02|  5|      v1|    v2|2021-04-02|
|15000004|   12|15000004|[G]|0c8708bb-558f-4fd...|2019|2021-04-02 00:05:02|  f|      v1|    v2|2021-04-02|
+--------+-----+--------+---+--------------------+----+-------------------+---+--------+------+----------+



### Bulk insert data with iceberg format

Now, lets do a bulk insert on iceberg table using pure SparkSQL

In [56]:
val t1 = System.nanoTime

spark.sql("""insert overwrite table iceberg_table_sparksql22
             partition (z,`schema_v`,`data_v`,trade_dt) 
             select id, month, sk, txt, uuid,
             year, modified_timestamp, z,
             `schema_v`, `data_v`,trade_dt
             from inputdf4
             order by z,`schema_v`,`data_v`,trade_dt """) //order by clause is required to avoid error "java.lang.IllegalStateException: Already closed files for partition: z=6/schema_v=v1/data_v=v2/trade_dt=trade_dt"

val duration = (System.nanoTime - t1) / 1e9d + "seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 1663700435514845
res25: org.apache.spark.sql.DataFrame = []
duration: String = 136.060916253seconds


### Results

Reading output data

In [57]:
spark.sql("""select * from default.iceberg_table_sparksql22""").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
|     id|month|     sk|txt|                uuid|year| modified_timestamp|  z|schema_v|data_v|  trade_dt|
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
|7000019|    7|7000019|[N]|b74241b1-ff28-4d6...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|7000047|    5|7000047|[P]|c0ffb8e0-f38c-45f...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|7000050|    3|7000050|[S]|446d3ea6-1d15-467...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|7000059|    3|7000059|[B]|9b9f4dd5-6cf7-4a3...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|7000069|    2|7000069|[L]|563a3c50-bac2-40d...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
only showing top 5 rows



Listing write object storage path. As we can see, the hashes are created.

In [59]:
%%sh

aws s3 ls "s3://vasveena-test-hmswarehouse/" | head -n 20

                           PRE 0196c656/
                           PRE 058e9a6a/
                           PRE 0a8968ba/
                           PRE 0aea53a0/
                           PRE 0f459a4a/
                           PRE 11aa143d/
                           PRE 196efb11/
                           PRE 1aefec80/
                           PRE 1bfd75f9/
                           PRE 2022376d/
                           PRE 233ddc65/
                           PRE 25e13848/
                           PRE 27c516c7/
                           PRE 2ee645fd/
                           PRE 3345d8c1/
                           PRE 3686753d/
                           PRE 3b8b1575/
                           PRE 3f1efbc8/
                           PRE 4783b0e6/
                           PRE 488a6052/



[Errno 32] Broken pipe


If we list one of the above hashes, we get the following:

```
aws s3 ls s3://vasveena-test-hmswarehouse/7e5ce3b1/catalog/example_iceberg_perf_test_sparksql22/z=2/schema_v=v1/data_v=v2/trade_dt=2021-04-02/
2021-05-17 20:20:58  163809081 00002-213-ebfd0f70-dcd9-4359-ad57-5a54446e87c0-00001.parquet

```

Similarly, we can list the S3 metadata location


In [60]:
%%sh

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksql22/metadata/"

2021-05-17 23:55:53       2917 00000-d8bff426-01b5-4469-b30b-ac91083ecc3f.metadata.json
2021-05-18 00:09:52       3946 00001-34c06a63-a841-43b9-a59c-c6b206a2b575.metadata.json
2021-05-18 00:21:01       5122 00002-6f56828d-3d39-4a9b-abf7-e71f4d4f1fad.metadata.json
2021-05-18 00:09:52       9523 5588c135-8b04-4425-8747-65f431a15cb1-m0.avro
2021-05-18 00:21:00       9521 c520445a-6898-47a4-a846-be7b6b0d8836-m0.avro
2021-05-18 00:21:01       9484 c520445a-6898-47a4-a846-be7b6b0d8836-m1.avro
2021-05-18 00:21:01       3615 snap-169749560302296372-1-c520445a-6898-47a4-a846-be7b6b0d8836.avro
2021-05-18 00:09:52       3568 snap-5010499931560731436-1-5588c135-8b04-4425-8747-65f431a15cb1.avro


### Note

Spark + Iceberg integration with write object storage path works as documented while using pure SparkSQL. This is true for both Scala/PySpark implementations

Table created via DF API and SparkSQL look identical based on diffchecker. Checked "show create table" from Hive catalog. This is something that needs to be checked.

<a id="hybrid"></a>
# Hybrid implementation with DF + DS

Now that we have established pure SparkSQL works as expected, we will look into extending this implementation to DF + DS. The reason is, FINRA uses Datasets currently to read/write. 

FINRA's current workloads look like following

&emsp;&emsp;&emsp;&emsp;1) Read BZ2/Parquet input files from S3 into dataset <br>
&emsp;&emsp;&emsp;&emsp;2) Perform transformations on input dataset <br>
&emsp;&emsp;&emsp;&emsp;3) Store intermediate results in HDFS<br>
&emsp;&emsp;&emsp;&emsp;4) Do 2 and 3 for all N steps <br>
&emsp;&emsp;&emsp;&emsp;5) Combine all HDFS output and write to S3 location (final destination)<br>

Idea is to do something like following

&emsp;&emsp;&emsp;&emsp;1) Create iceberg tables for input/output data in Hive catalog<br>
&emsp;&emsp;&emsp;&emsp;2) Read input data from input iceberg table from S3 into a dataframe and cast to Dataset<br>
&emsp;&emsp;&emsp;&emsp;3) Perform transformations on top of dataset <br>
&emsp;&emsp;&emsp;&emsp;4) Store intermediate results in HDFS <br>
&emsp;&emsp;&emsp;&emsp;5) Do 3 and 4 for all N steps<br>
&emsp;&emsp;&emsp;&emsp;6) Combine all HDFS output and write to S3 location in Iceberg table format (final destination)<br>
&emsp;&emsp;&emsp;&emsp;7) (Optional) Drop input and output Iceberg tables but keep the S3 data

<a id="sparksqldftest"></a>
## Hybrid tests with Dataframes + SparkSQL

In our previous cases, the input was a Dataframe but the resultant table was created and inserted to via SparkSQL. Let us try an hybrid implementation where input is dataframe and the output table is created from SparkSQL. However, lets do the insert via DF operation. 

### Creating a new iceberg table 

There must be other ways to achieve this with non-SQL syntax (for type safety)

In [61]:
spark.sql("""CREATE TABLE iceberg_table_sparksqldf22 (id bigint,
                                       month bigint,
                                       sk bigint,
                                       txt struct<key1:string>,
                                       uuid string,
                                       year string,
                                       modified_timestamp timestamp,
                                       z string,
                                       `schema_v` string,   --had to rename these columns due to error
                                       `data_v` string,
                                       trade_dt string)
USING iceberg
OPTIONS ( 'write.object-storage.enabled'=true,
          'write.object-storage.path'='s3://vasveena-test-hmswarehouse/')
PARTITIONED BY (z,`schema_v`,`data_v`,trade_dt)
LOCATION 's3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksqldf22'""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res28: org.apache.spark.sql.DataFrame = []


### Bulk inserting data into the iceberg table

This time lets insert using insertInto on input DF. 

#### Gotcha 

Partition fields of the input dataframe should be at end to match with the output iceberg table

In [62]:
val t1 = System.nanoTime

//order by clause is needed to avoid error "Caused by: java.lang.IllegalStateException: Already closed files for partition: z=c/schema_v=v1/data_v=v2/trade_dt=2021-04-02"
input_df4.orderBy("z","`schema-v`","`data-v`","trade_dt").write.mode("overwrite").insertInto("iceberg_table_sparksqldf22")

val duration = (System.nanoTime - t1) / 1e9d + "seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 1664066758504441
duration: String = 160.410501777seconds


### Reading data from table

In [205]:
spark.table("iceberg_table_sparksqldf22").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+--------+---+--------------------+----+-------------------+---+--------+------+----------+
|      id|month|      sk|txt|                uuid|year| modified_timestamp|  z|schema_v|data_v|  trade_dt|
+--------+-----+--------+---+--------------------+----+-------------------+---+--------+------+----------+
|38000025|    3|38000025|[L]|60098a8c-0bf9-47f...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|38000031|    3|38000031|[R]|a4bf6610-36d0-43b...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|38000043|    6|38000043|[D]|495d0f6d-dc23-4e6...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|38000069|    6|38000069|[D]|9b85c17c-74bd-403...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|38000075|   12|38000075|[J]|64ab0d92-7084-4ca...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
+--------+-----+--------+---+--------------------+----+-------------------+---+--------+------+----------+
only showing top 5 rows



### Listing S3 locations

Now lets recursively check one of the hashes under "write.object-storage.enabled" where the Datafrane + SparkSQL table data resides.

In [86]:
%%sh

aws s3 ls "s3://vasveena-test-hmswarehouse/2b857635/catalog/example_iceberg_perf_test_sparksqldf22/" --recursive | head -n 20

2021-05-18 00:27:11  163906720 2b857635/catalog/example_iceberg_perf_test_sparksqldf22/z=e/schema_v=v1/data_v=v2/trade_dt=2021-04-02/00014-293-0f52bdc9-82ad-480a-bd2a-bde2e25d22ed-00001.parquet


Listing metadata on S3

In [63]:
%%sh

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksqldf22/metadata/"

2021-05-18 00:24:27       2919 00000-09eac564-e370-4c54-931b-0fd1579a943a.metadata.json
2021-05-18 00:27:31       3948 00001-e3bae3ac-cd00-46a3-a097-9075c4a88992.metadata.json
2021-05-18 00:27:31       9499 c88893f2-523c-4a14-879e-827f8b7a61ca-m0.avro
2021-05-18 00:27:31       3571 snap-868207853291015711-1-c88893f2-523c-4a14-879e-827f8b7a61ca.avro


<a id="sparksqldstest"></a>
## Hybrid tests with Dataset + SparkSQL

Now lets try something similar with dataset. We will create a dataset object.

### Creating Dataset from Schema

In [64]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row

/* val schema = (new StructType()
               .add("id", IntegerType)
               .add("month",IntegerType)
               .add("sk",IntegerType)
               .add("txt",new StructType()
                    .add("key1",StringType))
               .add("uuid",StringType)
               .add("year",StringType)
               .add("modified_timestamp",TimestampType)
               .add("z",StringType)
               .add("schema_v", StringType)
               .add("data_v", StringType)
               .add("trade_dt", StringType)) */

case class TxtStruct(key1: String)

case class Random(id: BigInt, 
                  month: BigInt, 
                  sk: BigInt, 
                  txt: Seq[TxtStruct], 
                  uuid: String, 
                  year: String,
                  modified_timestamp: java.sql.Timestamp, 
                  z: String, 
                  schema_v: String, 
                  data_v: String, 
                  trade_dt: String)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.types._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
defined class TxtStruct
defined class Random


In [65]:
val ds_input = input_df4.select($"id",$"month",$"sk"
                                //,$"txt".cast("array<struct<key1:string>>")
                                ,(array($"txt")) as "txt"
                                ,$"uuid",$"year",$"modified_timestamp",$"z"
                                ,$"schema_v",$"data_v",$"trade_dt").as[Random]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ds_input: org.apache.spark.sql.Dataset[Random] = [id: bigint, month: bigint ... 9 more fields]


In [66]:
ds_input.printSchema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- month: long (nullable = true)
 |-- sk: long (nullable = true)
 |-- txt: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- modified_timestamp: timestamp (nullable = true)
 |-- z: string (nullable = true)
 |-- schema_v: string (nullable = false)
 |-- data_v: string (nullable = false)
 |-- trade_dt: string (nullable = true)



In [84]:
ds_input.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+-------+-----+--------------------+----+-------------------+---+--------+------+----------+
|     id|month|     sk|  txt|                uuid|year| modified_timestamp|  z|schema_v|data_v|  trade_dt|
+-------+-----+-------+-----+--------------------+----+-------------------+---+--------+------+----------+
|4000000|    3|4000000|[[E]]|6e505939-f5fd-4ab...|2019|2021-04-02 00:05:02|  9|      v1|    v2|2021-04-02|
|4000001|    9|4000001|[[F]]|20486aca-2759-43f...|2019|2021-04-02 00:05:02|  d|      v1|    v2|2021-04-02|
|4000002|   11|4000002|[[G]]|42962a21-a2dc-40d...|2019|2021-04-02 00:05:02|  d|      v1|    v2|2021-04-02|
|4000003|    9|4000003|[[H]]|9841ad6d-1532-496...|2019|2021-04-02 00:05:02|  c|      v1|    v2|2021-04-02|
|4000004|    4|4000004|[[I]]|ff1a855a-cced-495...|2019|2021-04-02 00:05:02|  4|      v1|    v2|2021-04-02|
+-------+-----+-------+-----+--------------------+----+-------------------+---+--------+------+----------+
only showing top 5 rows



### Creating a new iceberg table

In [81]:
spark.sql("""CREATE TABLE iceberg_table_sparksqlds66(id bigint,
                                       month bigint,
                                       sk bigint,
                                       -- txt struct<key1:string>,
                                       txt array<struct<key1:string>>,
                                       uuid string,
                                       year string,
                                       modified_timestamp timestamp,
                                       z string,
                                       `schema_v` string,   --had to rename these columns due to error
                                       `data_v` string,
                                       trade_dt string)
USING iceberg
OPTIONS ( 'write.object-storage.enabled'=true,
          'write.object-storage.path'='s3://vasveena-test-hmswarehouse/')
PARTITIONED BY (z,`schema_v`,`data_v`,trade_dt)
LOCATION 's3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksqlds66'""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res61: org.apache.spark.sql.DataFrame = []


### Bulk insert dataset to iceberg table 

In [82]:
val t1 = System.nanoTime

//order by clause is needed to avoid error "Caused by: java.lang.IllegalStateException: Already closed files for partition: z=c/schema_v=v1/data_v=v2/trade_dt=2021-04-02"
ds_input.orderBy("z","`schema-v`","`data-v`","trade_dt").write.mode("overwrite").insertInto("iceberg_table_sparksqlds66")

val duration = (System.nanoTime - t1) / 1e9d + "seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 1666192491945114
duration: String = 175.917568684seconds


### Reading data

In [83]:
spark.table("iceberg_table_sparksqlds66").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----+-------+-----+--------------------+----+-------------------+---+--------+------+----------+
|     id|month|     sk|  txt|                uuid|year| modified_timestamp|  z|schema_v|data_v|  trade_dt|
+-------+-----+-------+-----+--------------------+----+-------------------+---+--------+------+----------+
|6000003|    6|6000003|[[J]]|6ed8ea5f-1693-441...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|6000006|    7|6000006|[[M]]|d8c42b7a-168f-477...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|6000008|    2|6000008|[[O]]|dad5d203-faf0-413...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|6000046|    3|6000046|[[A]]|b8ec2d30-2ea8-414...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|6000052|   11|6000052|[[G]]|61cb5988-97c0-49e...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
+-------+-----+-------+-----+--------------------+----+-------------------+---+--------+------+----------+
only showing top 5 rows



### Listing S3 locations

Now lets recursively check one of the hashes under "write.object-storage.enabled" where the Dataset + SparkSQL table data resides.

In [87]:
%%sh

aws s3 ls "s3://vasveena-test-hmswarehouse/2d875426/catalog/example_iceberg_perf_test_sparksqlds66/" --recursive

2021-05-18 00:58:59  163824894 2d875426/catalog/example_iceberg_perf_test_sparksqlds66/z=d/schema_v=v1/data_v=v2/trade_dt=2021-04-02/00013-497-e87978c0-22cb-4940-bece-31fc29a6e684-00001.parquet


Listing metadata on S3 location

In [88]:
%%sh

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksqlds66/metadata/"

2021-05-18 00:56:20       3053 00000-0fdb41e8-e76e-4a2f-b685-c08c6e69886d.metadata.json
2021-05-18 00:59:15       4086 00001-ef435d4d-0c42-46aa-b211-f1ac6c5ef494.metadata.json
2021-05-18 00:59:15       9547 8826bf95-fa26-4470-8e5c-d5c1ee46df89-m0.avro
2021-05-18 00:59:15       3571 snap-2984059586568082337-1-8826bf95-fa26-4470-8e5c-d5c1ee46df89.avro


<a id="udf"></a>
## Calling UDFs

Since we are able to read data from Iceberg table into Dataframe/Dataset, we can also manipulate data using UDF / Dataset APIs such as copy or flatMapGroups. These APIs are heavily leveraged by FINRA Catlinker. 

### Dataset APIs

Let us first try Dataset APIs

In [89]:
val ds = spark.table("iceberg_table_sparksqlds66").as[Random]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ds: org.apache.spark.sql.Dataset[Random] = [id: bigint, month: bigint ... 9 more fields]


In [90]:
ds.printSchema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- month: long (nullable = true)
 |-- sk: long (nullable = true)
 |-- txt: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- modified_timestamp: timestamp (nullable = true)
 |-- z: string (nullable = true)
 |-- schema_v: string (nullable = true)
 |-- data_v: string (nullable = true)
 |-- trade_dt: string (nullable = true)



We can also create a dataset by creating a new Iceberg table in case the table is deleted from metastore 

In [None]:
val ds = spark.read.format("iceberg").option("path", "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksqlds66").as[Random]

#### Creating a random function

In [91]:
def somefunc(trade_dt:String, values:Iterator[Random]): TraversableOnce[(String,Random)] = {
  println("Print from somefunc")
  return values.map(x => (trade_dt, x))
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

somefunc: (trade_dt: String, values: Iterator[Random])TraversableOnce[(String, Random)]


Use flatMapGroups to call somefunc over trade_dt groups

In [95]:
val udf_ds = (ds.groupByKey(t => t.trade_dt)
              .flatMapGroups(somefunc).withColumnRenamed("_1","trade_dt").withColumnRenamed("_2","vector"))

udf_ds.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

udf_ds: org.apache.spark.sql.DataFrame = [trade_dt: string, vector: struct<id: decimal(38,0), month: decimal(38,0) ... 9 more fields>]
+----------+--------------------+
|  trade_dt|              vector|
+----------+--------------------+
|2021-04-02|[44000000, 3, 440...|
|2021-04-02|[36291570, 6, 362...|
|2021-04-02|[44000028, 7, 440...|
|2021-04-02|[36291582, 11, 36...|
|2021-04-02|[44000032, 6, 440...|
+----------+--------------------+
only showing top 5 rows



#### We can write this result back into a new Iceberg table

In [105]:
udf_ds.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- trade_dt: string (nullable = true)
 |-- vector: struct (nullable = true)
 |    |-- id: decimal(38,0) (nullable = true)
 |    |-- month: decimal(38,0) (nullable = true)
 |    |-- sk: decimal(38,0) (nullable = true)
 |    |-- txt: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- key1: string (nullable = true)
 |    |-- uuid: string (nullable = true)
 |    |-- year: string (nullable = true)
 |    |-- modified_timestamp: timestamp (nullable = true)
 |    |-- z: string (nullable = true)
 |    |-- schema_v: string (nullable = true)
 |    |-- data_v: string (nullable = true)
 |    |-- trade_dt: string (nullable = true)



In [104]:
spark.sql("""CREATE TABLE iceberg_table_sparksqldsudf22 (
                                    trade_dt string,
                                    vector struct<id: bigint,
                                       month:bigint,
                                       sk:bigint,
                                       txt:array<struct<key1:string>>,
                                       uuid:string,
                                       year:string,
                                       modified_timestamp:timestamp,
                                       z:string,
                                       schema_v:string,   --had to rename these columns due to error
                                       data_v:string,
                                       trade_dt:string>)
USING iceberg
OPTIONS ( 'write.object-storage.enabled'=true,
          'write.object-storage.path'='s3://vasveena-test-hmswarehouse/')
LOCATION 's3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksqldsudf22'""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res84: org.apache.spark.sql.DataFrame = []


In [108]:
val t1 = System.nanoTime

//no order by clause needed here since the dataset does not have a partition
udf_ds.write.mode("overwrite").insertInto("iceberg_table_sparksqldsudf22")

val duration = (System.nanoTime - t1) / 1e9d + "seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 1667999152218613
duration: String = 1172.648418599seconds


In [109]:
spark.table("iceberg_table_sparksqldsudf22").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+
|  trade_dt|              vector|
+----------+--------------------+
|2021-04-02|[18271721, 2, 182...|
|2021-04-02|[24120637, 3, 241...|
|2021-04-02|[18271722, 12, 18...|
|2021-04-02|[24120638, 4, 241...|
|2021-04-02|[18271728, 2, 182...|
+----------+--------------------+
only showing top 5 rows



#### Listing warehouse for the above write under one of the hashes

```
aws s3 ls s3://vasveena-test-hmswarehouse/53e6b258/catalog/example_iceberg_perf_test_sparksqldsudf22/

2021-05-17 21:34:34 2640936849 00001-637-c6e4e0fe-a115-4b3d-b083-0899c7d490c1-00001.parquet
```

#### We can also write into an HDFS location leveraged by FINRA for storing checkpoint data 

In [121]:
val t1 = System.nanoTime

//write to HDFS
udf_ds.write.mode("overwrite").parquet("/user/hadoop/checkpoint/example_iceberg_perf_test_sparksqldsudf22/")

val duration = (System.nanoTime - t1) / 1e9d + "seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 1672027455932338
duration: String = 840.13282666seconds


In [123]:
val hdfsdf = spark.read.parquet("/user/hadoop/checkpoint/example_iceberg_perf_test_sparksqldsudf22/")
hdfsdf.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

hdfsdf: org.apache.spark.sql.DataFrame = [trade_dt: string, vector: struct<id: decimal(38,0), month: decimal(38,0) ... 9 more fields>]
+----------+--------------------+
|  trade_dt|              vector|
+----------+--------------------+
|2021-04-02|[6000003, 6, 6000...|
|2021-04-02|[28719690, 1, 287...|
|2021-04-02|[6000006, 7, 6000...|
|2021-04-02|[28719705, 6, 287...|
|2021-04-02|[6000008, 2, 6000...|
+----------+--------------------+
only showing top 5 rows



#### Listing HDFS location within EMR 

```

[hadoop@ip-172-31-41-141 ~]$ hdfs dfs -ls /user/hadoop/checkpoint/example_iceberg_perf_test_sparksqldsudf22/
Found 3 items
-rw-r--r--   1 livy hadoop          0 2021-05-18 02:51 /user/hadoop/checkpoint/example_iceberg_perf_test_sparksqldsudf22/_SUCCESS
-rw-r--r--   1 livy hadoop       1514 2021-05-18 02:38 /user/hadoop/checkpoint/example_iceberg_perf_test_sparksqldsudf22/part-00000-d72eb454-b9fd-4530-8b41-6d291cd0a20e-c000.snappy.parquet
-rw-r--r--   1 livy hadoop 5419772564 2021-05-18 02:51 /user/hadoop/checkpoint/example_iceberg_perf_test_sparksqldsudf22/part-00001-d72eb454-b9fd-4530-8b41-6d291cd0a20e-c000.snappy.parquet

```

### Dataframe UDFs

In [111]:
val df = spark.table("iceberg_table_sparksqlds66") //Reading above table into a Dataframe. Can use any method to load dataframe

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

df: org.apache.spark.sql.DataFrame = [id: bigint, month: bigint ... 9 more fields]


Creating a test UDF to call on this dataframe

In [112]:
def someudf = (sk:java.math.BigDecimal) => {
  println("Print from UDF")
  ((sk+"1")).toString
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

someudf: java.math.BigDecimal => String


In [113]:
import org.apache.spark.sql.functions.udf
val someUDF = udf(someudf)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.functions.udf
someUDF: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$5520/989180592@3406cac9,StringType,List(Some(class[value[0]: decimal(38,18)])),None,true,true)


In [124]:
val udf_df = df.withColumn("udfcol",someUDF($"id"))
udf_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

udf_df: org.apache.spark.sql.DataFrame = [id: bigint, month: bigint ... 10 more fields]
+-------+-----+-------+-----+--------------------+----+-------------------+---+--------+------+----------+--------------------+
|     id|month|     sk|  txt|                uuid|year| modified_timestamp|  z|schema_v|data_v|  trade_dt|              udfcol|
+-------+-----+-------+-----+--------------------+----+-------------------+---+--------+------+----------+--------------------+
|6000003|    6|6000003|[[J]]|6ed8ea5f-1693-441...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|6000003.000000000...|
|6000006|    7|6000006|[[M]]|d8c42b7a-168f-477...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|6000006.000000000...|
|6000008|    2|6000008|[[O]]|dad5d203-faf0-413...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|6000008.000000000...|
|6000046|    3|6000046|[[A]]|b8ec2d30-2ea8-414...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|6000046.000000000...|
|6000052|   11|6

In [125]:
spark.sql("""CREATE TABLE iceberg_table_sparksqldfudf24(id bigint,
                                       month bigint,
                                       sk bigint,
                                       -- txt struct<key1:string>,
                                       txt array<struct<key1:string>>,
                                       uuid string,
                                       year string,
                                       modified_timestamp timestamp,
                                       udfcol string,
                                       z string,
                                       `schema_v` string,   --had to rename these columns due to error
                                       `data_v` string,
                                       trade_dt string)
USING iceberg
OPTIONS ( 'write.object-storage.enabled'=true,
          'write.object-storage.path'='s3://vasveena-test-hmswarehouse/')
PARTITIONED BY (z,`schema_v`,`data_v`,trade_dt)
LOCATION 's3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksqldfudf24'""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res107: org.apache.spark.sql.DataFrame = []


In [119]:
udf_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- month: long (nullable = true)
 |-- sk: long (nullable = true)
 |-- txt: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- modified_timestamp: timestamp (nullable = true)
 |-- z: string (nullable = true)
 |-- schema_v: string (nullable = true)
 |-- data_v: string (nullable = true)
 |-- trade_dt: string (nullable = true)
 |-- udfcol: string (nullable = true)



In [127]:
udf_df.select($"id",$"month",$"sk",$"txt",$"uuid",$"year",$"modified_timestamp",$"udfcol",$"z",$"schema_v",$"data_v",$"trade_dt").orderBy("z","`schema_v`","`data_v`","trade_dt").write.mode("overwrite").insertInto("iceberg_table_sparksqldfudf24")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Listing warehouse location for above table on one of the hashes

```

aws s3 ls s3://vasveena-test-hmswarehouse/18742155/catalog/example_iceberg_perf_test_sparksqldfudf24/ --recursive
2021-05-17 23:09:15  183401535 18742155/catalog/example_iceberg_perf_test_sparksqldfudf24/z=c/schema_v=v1/data_v=v2/trade_dt=2021-04-02/00012-799-c1b939a2-e9c1-4aab-92fc-5d6d12f1718d-00001.parquet

```

### Listing table S3 location

In [129]:
%%sh

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksqldfudf24/" --recursive

2021-05-18 02:52:21       3159 iceberg/tables/catalog/example_iceberg_perf_test_sparksqldfudf24/metadata/00000-68ad005f-e320-471c-a8fc-6bd959ad9018.metadata.json
2021-05-18 03:09:21       4198 iceberg/tables/catalog/example_iceberg_perf_test_sparksqldfudf24/metadata/00001-78697895-d347-4c96-863a-4c8e06a43652.metadata.json
2021-05-18 03:09:21       9880 iceberg/tables/catalog/example_iceberg_perf_test_sparksqldfudf24/metadata/87e22fa9-87da-4dc5-8c34-b06e8eb25db3-m0.avro
2021-05-18 03:09:21       3575 iceberg/tables/catalog/example_iceberg_perf_test_sparksqldfudf24/metadata/snap-4899277091392625360-1-87e22fa9-87da-4dc5-8c34-b06e8eb25db3.avro


Result is as expected. So we should be able to transform intermediate data and write the output to an existing Iceberg table or onto HDFS staging location.

<a id="knownissues"></a>
## Known Issues

### 1) Already closed files for partition

While bulk inserting data to Iceberg tables, if the table size is big, then more than 50% of the time, we may run into this failure. Smaller tables would work fine. 

```
Caused by: java.lang.IllegalStateException: Already closed files for partition: z=c/schema-v=v1/data-v=v2/trade_dt=2021-04-02
  at org.apache.iceberg.io.PartitionedWriter.write(PartitionedWriter.java:69)
  at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$7(WriteToDataSourceV2Exec.scala:441)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
  at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
  at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:385)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
  at org.apache.spark.scheduler.Task.run(Task.scala:127)
  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
  ... 3 more

```

This is due to the following issue:

https://github.com/apache/iceberg/issues/508 <br>
https://lists.apache.org/thread.html/e54d90b11d94e1bf93f1c750939271f79a96dfa94ef9c2c10f586546@%3Cdev.iceberg.apache.org%3E

To workaround, all partition fields should be sorted prior to write to avoid this issue for large datasets (especially during bulk insert). 

```
ds_input.orderBy("z","schema-v","data-v","trade_dt").write.mode("overwrite").insertInto("iceberg_table_sparksqlds6") → like this
```

### 2) Unquoted Identifier

While writing data from Dataframe/Dataset to Iceberg table, ran into the below issue since field names had “-” in them. 

An error was encountered:

```
org.apache.spark.sql.catalyst.parser.ParseException:
Possibly unquoted identifier schema-v detected. Please consider quoting it with back-quotes as `schema-v`(line 1, pos 6)

== SQL ==
schema-v
------^^^

```

It kept complaining of this error even in the presence of quoting (`) on source and destination. So, had to change "-" to "_". 

### 3) Drop table does not work 

Unable to drop tables from Iceberg HMS catalog. Drop will happen successfully but the table will still be present in the metastore. Spark by default does a table exist check. 

Issue:
https://github.com/apache/iceberg/issues/2374

Workaround attempt.

Start spark-shell in EMR master node using following command (usually JARs are loaded when they are on classpath but I had to explicitly specify iceberg JAR for some reason)

spark-shell --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf spark.sql.catalog.spark_catalog.type=hive --conf spark.sql.catalog.spark_catalog.uri=thrift://localhost:9083 —conf spark.hadoop.hive.metastore.uris=thrift://localhost:9083 —jars /usr/lib/spark/jars/iceberg-spark3-0.11.0.jar

import org.apache.iceberg.spark.SparkSessionCatalog
import org.apache.spark.sql.connector.catalog.Identifier
val db = spark.catalog.currentDatabase
spark.sessionState.catalogManager.catalog("hive").asInstanceOf[SparkSessionCatalog[_]].dropTable(db, table = "iceberg_table_sparksql", ignoreIfNotExists = false, purge = true)

The above does not work. It expects an Identifier (https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/connector/catalog/Identifier.html) in dropTable API. 

<a id="summary"></a>
## Key takeaways

1) Destination Iceberg table needs to be created prior to write for ObjectStoreLocationProvider to work as expected. It does not matter if the write is performed from SparkSQL/DataframeWriter. 

2) Input S3/HDFS data can be read into a dataset (cast) in Iceberg format. Transformations can be performed on top of this dataset (tested UDF and a dataset API) and the transformed dataset can be written to the output Iceberg table or temporary HDFS location.

3) Direct Parquet write (without Iceberg) took ~ 95 seconds for 100M records. Bulk insert in iceberg for same amount of data took anywhere from ~140-160 seconds (with sort) using SparkSQL. It took ~160 seconds with insertInto API using Dataframe/Datasets (with sort). FINRA’s workloads always does bulk inserts i.e., no incremental processing so this increase in write cost (sort with TBs of data) should be taken into consideration until we workaround the issue "Already closed files for partition".

4) There are some corner case errors we are running into which may impact customer experience (some of the gotchas stated below). For example, all partition columns must be sorted prior to write to avoid a random error. This could become costly for TBs of data. 
