# Spark-Iceberg Evaluation



# Index

1) [Configuring Iceberg](#configure_iceberg) <br>
2) [Evaluating Iceberg for Spark](#test) <br>
&emsp;&emsp;&emsp;&emsp;a) [Regular Dataframe behavior](#normaldftest) <br>
&emsp;&emsp;&emsp;&emsp;b) [Pure Dataframe + Iceberg](#dftest) <br>
&emsp;&emsp;&emsp;&emsp;c) [SparkSQL + Iceberg](#sparksqltest) <br>
3) [Hybrid tests with Dataframe and Datasets](#hybrid) <br>
&emsp;&emsp;&emsp;&emsp;a) [SparkSQL + Iceberg + Dataframe](#sparksqldftest) <br>
&emsp;&emsp;&emsp;&emsp;b) [SparkSQL + Iceberg + Dataset](#sparksqldstest) <br>
4) [Perform transformations with UDFs and Dataset APIs](#udf) <br>

<a id="configure_iceberg"></a>
# Configuring Iceberg on Spark session

In [1]:
%%configure -f
{
    "conf":  { 
             "spark.jars.packages":"org.apache.iceberg:iceberg-spark3-runtime:0.11.0",
             "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
             "spark.sql.catalog.spark_catalog":"org.apache.iceberg.spark.SparkSessionCatalog",
             "spark.sql.catalog.spark_catalog.type":"hive",
             "spark.sql.catalog.local":"org.apache.iceberg.spark.SparkCatalog",
             "spark.sql.catalog.local.type":"hadoop",
             "spark.sql.catalog.local.warehouse":"s3://vasveena-test-demo/iceberg/catalog/tables/"
           } 
}

In [2]:
spark.version

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
8,application_1619633879001_0010,spark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res1: String = 3.0.1-amzn-0


<a id="test"></a>
# Evaluating Iceberg for Spark
<a id="normaldftest"></a>
## Regular Dataframe Read/Write test with Parquet Files

### Read Input data from S3

Let's read our 100M record dataset

In [3]:
val input_df = spark.read.parquet("s3://neilawstmp2/tmp/hudi-perf/input/")
input_df.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

input_df: org.apache.spark.sql.DataFrame = [id: bigint, month: bigint ... 5 more fields]
res2: Long = 100000000


In [4]:
input_df.createOrReplaceTempView("inputdf")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
input_df.printSchema()
input_df.rdd.getNumPartitions

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- month: long (nullable = true)
 |-- sk: long (nullable = true)
 |-- txt: struct (nullable = true)
 |    |-- key1: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- modified_timestamp: timestamp (nullable = true)

res7: Int = 45


In [7]:
%%sh
aws s3 ls s3://neilawstmp2/tmp/hudi-perf/input/ | wc -l

38


### Transform input data
Performing some transformations on the input

In [26]:
import org.apache.spark.sql.functions._

val input_df2=(input_df.withColumn("z", substring(md5(concat($"id")),1,1))
                       .withColumn("schema-v", lit("v1")).withColumn("data-v", lit("v2"))
                       .withColumn("trade_dt", substring($"modified_timestamp",1,10)))
input_df2.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.functions._
input_df2: org.apache.spark.sql.DataFrame = [id: bigint, month: bigint ... 9 more fields]
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
|     id|month|     sk|txt|                uuid|year| modified_timestamp|  z|schema-v|data-v|  trade_dt|
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
|4000000|    3|4000000|[E]|6e505939-f5fd-4ab...|2019|2021-04-02 00:05:02|  9|      v1|    v2|2021-04-02|
|4000001|    9|4000001|[F]|20486aca-2759-43f...|2019|2021-04-02 00:05:02|  d|      v1|    v2|2021-04-02|
|4000002|   11|4000002|[G]|42962a21-a2dc-40d...|2019|2021-04-02 00:05:02|  d|      v1|    v2|2021-04-02|
|4000003|    9|4000003|[H]|9841ad6d-1532-496...|2019|2021-04-02 00:05:02|  c|      v1|    v2|2021-04-02|
|4000004|    4|4000004|[I]|ff1a855a-cced-495...|2019|2021-04-02 00:05:02|  4|      v1|    v2|2021-04-02|
+-------+-----+-------+---+---

### Write data back to S3 in normal Parquet format 

In [37]:
val t1 = System.nanoTime

val s3_location=s"s3://vasveena-test-demo/temp6/parquet-perf/catalog/example_iceberg_perf_test_6"

input_df2.write.mode("OVERWRITE").partitionBy("z","schema-v","data-v","trade_dt").parquet(s3_location)

val duration = (System.nanoTime - t1) / 1e9d + " seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 639228646763777
s3_location: String = s3://vasveena-test-demo/temp6/parquet-perf/catalog/example_iceberg_perf_test_6
duration: String = 74.901509932 seconds


### Write data back to S3 in Spark table format (Parquet)

In [42]:
val t1 = System.nanoTime

val s3_location_ndf=s"s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_normaldf"

(input_df2.write.mode("OVERWRITE")
    .option("path", s3_location_ndf)
    .partitionBy("z","schema-v","data-v","trade_dt").format("parquet")
    .saveAsTable("iceberg_table_normaldf"))

val duration = (System.nanoTime - t1) / 1e9d + "seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 639530210694707
s3_location_ndf: String = s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_normaldf
duration: String = 97.160976689seconds


### Results

Reading output data

In [51]:
println("Reading direct Parquet output")
spark.read.parquet("s3://vasveena-test-demo/temp6/parquet-perf/catalog/example_iceberg_perf_test_6/").show(5)
println("Reading Spark table Parquet output")
spark.sql("select * from iceberg_table_normaldf limit 5").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Reading direct Parquet output
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
|     id|month|     sk|txt|                uuid|year| modified_timestamp|  z|schema-v|data-v|  trade_dt|
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
|4000011|    2|4000011|[P]|dbf28c6d-6f94-449...|2019|2021-04-02 00:05:02|  3|      v1|    v2|2021-04-02|
|4000017|    1|4000017|[V]|96fa033f-0fb0-4d5...|2019|2021-04-02 00:05:02|  3|      v1|    v2|2021-04-02|
|4000047|   10|4000047|[Z]|93c488d8-b882-442...|2019|2021-04-02 00:05:02|  3|      v1|    v2|2021-04-02|
|4000051|    2|4000051|[D]|4464b11b-14aa-4ca...|2019|2021-04-02 00:05:02|  3|      v1|    v2|2021-04-02|
|4000071|    4|4000071|[X]|bdb89389-2406-47b...|2019|2021-04-02 00:05:02|  3|      v1|    v2|2021-04-02|
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
only showing top 5 rows



Now lets go ahead and list the S3 location where our DF was written in regular Parquet format

In [44]:
%%sh

aws s3 ls s3://vasveena-test-demo/temp6/parquet-perf/catalog/example_iceberg_perf_test_6/

                           PRE z=0/
                           PRE z=1/
                           PRE z=2/
                           PRE z=3/
                           PRE z=4/
                           PRE z=5/
                           PRE z=6/
                           PRE z=7/
                           PRE z=8/
                           PRE z=9/
                           PRE z=a/
                           PRE z=b/
                           PRE z=c/
                           PRE z=d/
                           PRE z=e/
                           PRE z=f/
2021-05-06 03:45:28          0 _SUCCESS


Checking the S3 location where our DF was written in regular Spark table format

In [46]:
%%sh

aws s3 ls s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_normaldf/

                           PRE z=0/
                           PRE z=1/
                           PRE z=2/
                           PRE z=3/
                           PRE z=4/
                           PRE z=5/
                           PRE z=6/
                           PRE z=7/
                           PRE z=8/
                           PRE z=9/
                           PRE z=a/
                           PRE z=b/
                           PRE z=c/
                           PRE z=d/
                           PRE z=e/
                           PRE z=f/
2021-05-06 03:50:47          0 _SUCCESS


### Note

As expected, the S3 partitions are not hashed in both Spark table format and direct Parquet writes

<a id="dftest"></a>
## Dataframe Read/Write tests with Iceberg format

### Bulk insert our input dataframe in Iceberg format

Now lets try to write the transformed dataframe in Iceberg format

In [84]:
val t1 = System.nanoTime

val s3_location_idf=s"s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_dfonly2"
val data_location_idf=s"s3://vasveena-test-demo/iceberg/tables/data/dfonly2/"

//orderBy clause is required to avoid error "java.lang.IllegalStateException: Already closed files for partition: z=6/schema_v=v1/data_v=v2/trade_dt=trade_dt"
(input_df2.orderBy("z","`schema-v`","`data-v`","trade_dt").write.mode("OVERWRITE")
    .option("path", s3_location_idf)
    .option("write.object-storage.enabled",true)
    .option("write.object-storage.path",data_location_idf)
    .partitionBy("z","`schema-v`","`data-v`","trade_dt").format("iceberg")
    .saveAsTable("iceberg_table_dfonly2"))

val duration = (System.nanoTime - t1) / 1e9d + "seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 642391499352314
s3_location_idf: String = s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_dfonly2
data_location_idf: String = s3://vasveena-test-demo/iceberg/tables/data/dfonly2/
duration: String = 139.448695328seconds


### Results

Reading output data

In [85]:
println("Reading data in iceberg format")
spark.read.format("iceberg").load("iceberg_table_dfonly2").show(5)
println("Using SparkSQL")
spark.sql("select * from iceberg_table_dfonly2 limit 5").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Reading data in iceberg format
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
|     id|month|     sk|txt|                uuid|year| modified_timestamp|  z|schema-v|data-v|  trade_dt|
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
|6000003|    6|6000003|[J]|6ed8ea5f-1693-441...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|6000006|    7|6000006|[M]|d8c42b7a-168f-477...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|6000008|    2|6000008|[O]|dad5d203-faf0-413...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|6000046|    3|6000046|[A]|b8ec2d30-2ea8-414...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|6000052|   11|6000052|[G]|61cb5988-97c0-49e...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
+-------+-----+-------+---+--------------------+----+-------------------+---+--------+------+----------+
only showing top 5 rows


Listing S3 location of Iceberg data and metadata

In [194]:
%%sh

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_dfonly/data/"
#Partitions listed but it is not stored in a location that is pre-hashed

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_dfonly/metadata/"
#Lists iceberg metadata

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/data/dfonly/"
#This location is empty. Hence, throws error.

                           PRE z=0/
                           PRE z=1/
                           PRE z=2/
                           PRE z=3/
                           PRE z=4/
                           PRE z=5/
                           PRE z=6/
                           PRE z=7/
                           PRE z=8/
                           PRE z=9/
                           PRE z=a/
                           PRE z=b/
                           PRE z=c/
                           PRE z=d/
                           PRE z=e/
                           PRE z=f/
2021-05-06 04:11:55       3604 00000-10a945d8-68ad-4336-92ce-cf5afdf77353.metadata.json
2021-05-06 04:35:50       4524 00001-339f14f3-a0e0-4945-89ab-c545afa9f79d.metadata.json
2021-05-06 04:11:54       9461 78b1612f-01f7-4f40-95ae-08cb525823a6-m0.avro
2021-05-06 04:35:50       9454 b9200e4f-3018-4b14-9ade-d243869fa0f4-m0.avro
2021-05-06 04:11:55       3566 snap-2533887051401835083-1-78b1612f-01f7-4f40-95ae-08cb525823a6.a

CalledProcessError: Command 'b'\naws s3 ls "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_dfonly/data/"\n#Partitions listed but it is not stored in a location that is pre-hashed\n\naws s3 ls "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_dfonly/metadata/"\n#Lists iceberg metadata\n\naws s3 ls "s3://vasveena-test-demo/iceberg/tables/data/dfonly/"\n#This location is empty. Hence, throws error.\n'' returned non-zero exit status 1.

### Note

S3 partitions are not hashed when using pure dataframe with iceberg

<a id="sparksqltest"></a>
## SparkSQL + Iceberg Read/Write test with Parquet Files

### Creating an output table in SparkSQL

In [126]:
spark.read.format("iceberg").load("iceberg_table_dfonly2").printSchema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- month: long (nullable = true)
 |-- sk: long (nullable = true)
 |-- txt: struct (nullable = true)
 |    |-- key1: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- modified_timestamp: timestamp (nullable = true)
 |-- z: string (nullable = true)
 |-- schema-v: string (nullable = true)
 |-- data-v: string (nullable = true)
 |-- trade_dt: string (nullable = true)



In [168]:
spark.sql("""CREATE TABLE iceberg_table_sparksql2 (id bigint,
                                       month bigint,
                                       sk bigint,
                                       txt struct<key1:string>,
                                       uuid string,
                                       year string,
                                       modified_timestamp timestamp,
                                       z string,
                                       `schema_v` string,   --had to rename these columns due to error
                                       `data_v` string,
                                       trade_dt string)
USING iceberg
OPTIONS ( 'write.object-storage.enabled'=true,
          'write.object-storage.path'='s3://vasveena-test-demo/iceberg/tables/data/sparksql2/')
PARTITIONED BY (z,`schema_v`,`data_v`,trade_dt)
LOCATION 's3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksql2'""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res191: org.apache.spark.sql.DataFrame = []


#### Gotcha

During insert with input_df2, we will get an error "Possibly unquoted identifier schema-v detected" which happened even if those fields are quoted properly on both source and destination. So, changing col name from data-v, schema-v to data_v and schema_v on both source and destination tables.

In [191]:
input_df2.printSchema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- month: long (nullable = true)
 |-- sk: long (nullable = true)
 |-- txt: struct (nullable = true)
 |    |-- key1: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- modified_timestamp: timestamp (nullable = true)
 |-- z: string (nullable = true)
 |-- schema-v: string (nullable = false)
 |-- data-v: string (nullable = false)
 |-- trade_dt: string (nullable = true)



In [181]:
val input_df4 = input_df2.withColumnRenamed("schema-v", "schema_v").withColumnRenamed("data-v", "data_v")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

input_df4: org.apache.spark.sql.DataFrame = [id: bigint, month: bigint ... 9 more fields]


In [182]:
input_df4.printSchema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- month: long (nullable = true)
 |-- sk: long (nullable = true)
 |-- txt: struct (nullable = true)
 |    |-- key1: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- modified_timestamp: timestamp (nullable = true)
 |-- z: string (nullable = true)
 |-- schema_v: string (nullable = false)
 |-- data_v: string (nullable = false)
 |-- trade_dt: string (nullable = true)



In [187]:
input_df4.registerTempTable("inputdf4")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



### Bulk insert data with iceberg format

Now, lets do a bulk insert on iceberg table using pure SparkSQL

In [192]:
val t1 = System.nanoTime

spark.sql("""insert overwrite table iceberg_table_sparksql2
             partition (z,`schema_v`,`data_v`,trade_dt) 
             select id, month, sk, txt, uuid,
             year, modified_timestamp, z,
             `schema_v`, `data_v`,'trade_dt'
             from inputdf4
             order by z,`schema_v`,`data_v`,trade_dt """) //order by clause is required to avoid error "java.lang.IllegalStateException: Already closed files for partition: z=6/schema_v=v1/data_v=v2/trade_dt=trade_dt"

val duration = (System.nanoTime - t1) / 1e9d + "seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 650072184673361
res210: org.apache.spark.sql.DataFrame = []
duration: String = 140.217764581seconds


### Results

Reading output data

In [160]:
spark.sql("""select * from default.iceberg_table_sparksql""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res187: org.apache.spark.sql.DataFrame = [id: bigint, month: bigint ... 9 more fields]


Listing write object storage path. As we can see, the hashes are created.

In [193]:
%%sh

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/data/sparksql2/" | head -n 20

                           PRE 002fb1ef/
                           PRE 00a4ce44/
                           PRE 00b54463/
                           PRE 00b9eb53/
                           PRE 010c1ca4/
                           PRE 0196f04c/
                           PRE 01f2075d/
                           PRE 026cc0ea/
                           PRE 02c481ec/
                           PRE 02e728f8/
                           PRE 0328bcc6/
                           PRE 03414b3b/
                           PRE 0373fea9/
                           PRE 03b68b4b/
                           PRE 04ec9c8e/
                           PRE 052673bc/
                           PRE 05ffdbe9/
                           PRE 061b80ad/
                           PRE 064f3f29/
                           PRE 0696b1cb/



[Errno 32] Broken pipe


If we list one of the above hashes, we get the following:

```
aws s3 ls s3://vasveena-test-demo/iceberg/tables/data/sparksql2/002fb1ef/catalog/example_iceberg_perf_test_sparksql2/z=c/schema_v=v1/data_v=v2/
2021-05-06 02:38:59          0 trade_dt=trade_dt_$folder$ 
```

Similarly, we can list the S3 metadata location


In [196]:
%%sh

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksql2/metadata/"

2021-05-06 06:32:57       2938 00000-de09a45e-f0ee-4bf0-b75c-0c4bb7937ee6.metadata.json
2021-05-06 06:47:17       3965 00001-0d1176aa-13f0-45aa-9a63-acf219999cce.metadata.json
2021-05-06 06:47:16       9527 9f9249a7-8907-4ee3-a1db-e8b0043d359c-m0.avro
2021-05-06 06:47:17       3565 snap-8573294365950336448-1-9f9249a7-8907-4ee3-a1db-e8b0043d359c.avro


### Note

Spark + Iceberg integration with write object storage path works as documented while using pure SparkSQL. This is true for both Scala/PySpark implementations

Table created via DF API and SparkSQL look identical based on diffchecker. Checked "show create table" from Hive catalog. This is something that needs to be checked.

<a id="hybrid"></a>
# Hybrid implementation with DF + DS

Now that we have established pure SparkSQL works as expected, we will look into extending this implementation to DF + DS. 

Example workload could look like following

&emsp;&emsp;&emsp;&emsp;1) Read input files from S3 into dataset <br>
&emsp;&emsp;&emsp;&emsp;2) Perform transformations on input dataset <br>
&emsp;&emsp;&emsp;&emsp;3) Store intermediate output in HDFS<br>
&emsp;&emsp;&emsp;&emsp;4) Do 2 and 3 for all N steps <br>
&emsp;&emsp;&emsp;&emsp;5) Aggregate HDFS output and write to S3 location (final destination)<br>

Following approach leverages Iceberg in least disruptive manner

&emsp;&emsp;&emsp;&emsp;1) Maintain iceberg tables for input/output data in Hive catalog<br>
&emsp;&emsp;&emsp;&emsp;2) Read input data from input iceberg table from S3 into a dataframe and cast to Dataset<br>
&emsp;&emsp;&emsp;&emsp;3) Perform transformations on top of dataset <br>
&emsp;&emsp;&emsp;&emsp;4) Store intermediate result in HDFS <br>
&emsp;&emsp;&emsp;&emsp;5) Do 3 and 4 for all N steps<br>
&emsp;&emsp;&emsp;&emsp;6) Aggregate HDFS output and write to S3 location in Iceberg format (final destination)<br>

<a id="sparksqldftest"></a>
## Hybrid tests with Dataframes + SparkSQL

In our previous cases, the input was a Dataframe but the resultant table was created and inserted to via SparkSQL. Let us try an hybrid implementation where input is dataframe and the output table is created from SparkSQL. However, lets do the insert via DF operation. 

### Creating a new iceberg table 

There must be other ways to achieve this with non-SQL syntax (for type safety)

In [198]:
spark.sql("""CREATE TABLE iceberg_table_sparksqldf (id bigint,
                                       month bigint,
                                       sk bigint,
                                       txt struct<key1:string>,
                                       uuid string,
                                       year string,
                                       modified_timestamp timestamp,
                                       z string,
                                       `schema_v` string,   --had to rename these columns due to error
                                       `data_v` string,
                                       trade_dt string)
USING iceberg
OPTIONS ( 'write.object-storage.enabled'=true,
          'write.object-storage.path'='s3://vasveena-test-demo/iceberg/tables/data/sparksqldf/')
PARTITIONED BY (z,`schema_v`,`data_v`,trade_dt)
LOCATION 's3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksqldf'""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res213: org.apache.spark.sql.DataFrame = []


### Bulk inserting data into the iceberg table

This time lets insert using insertInto on input DF. 

#### Gotcha 

Partition fields of the input dataframe should be at end to match with the output iceberg table

In [201]:
val t1 = System.nanoTime

//order by clause is needed to avoid error "Caused by: java.lang.IllegalStateException: Already closed files for partition: z=c/schema_v=v1/data_v=v2/trade_dt=2021-04-02"
input_df4.orderBy("z","`schema-v`","`data-v`","trade_dt").write.mode("overwrite").insertInto("iceberg_table_sparksqldf")

val duration = (System.nanoTime - t1) / 1e9d + "seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 652865379993309
duration: String = 159.165016451seconds


### Reading data from table

In [205]:
spark.table("iceberg_table_sparksqldf").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+--------+---+--------------------+----+-------------------+---+--------+------+----------+
|      id|month|      sk|txt|                uuid|year| modified_timestamp|  z|schema_v|data_v|  trade_dt|
+--------+-----+--------+---+--------------------+----+-------------------+---+--------+------+----------+
|38000025|    3|38000025|[L]|60098a8c-0bf9-47f...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|38000031|    3|38000031|[R]|a4bf6610-36d0-43b...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|38000043|    6|38000043|[D]|495d0f6d-dc23-4e6...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|38000069|    6|38000069|[D]|9b85c17c-74bd-403...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|38000075|   12|38000075|[J]|64ab0d92-7084-4ca...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
+--------+-----+--------+---+--------------------+----+-------------------+---+--------+------+----------+
only showing top 5 rows



### Listing S3 locations

Now lets check the S3 location of "write.object-storage.enabled". We can see hashing has been taken care of. 

In [202]:
%%sh

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/data/sparksqldf/" | head -n 20

                           PRE 0061f845/
                           PRE 00cb3cf0/
                           PRE 00cd5e67/
                           PRE 00eea961/
                           PRE 00f6a166/
                           PRE 0114e19d/
                           PRE 0133e402/
                           PRE 0134f2be/
                           PRE 013f809b/
                           PRE 01824c0d/
                           PRE 01d76c28/
                           PRE 0255b3ea/
                           PRE 0293050d/
                           PRE 02e7aef7/
                           PRE 033d490e/
                           PRE 040b36ee/
                           PRE 040dca1c/
                           PRE 041ac4ec/
                           PRE 044466aa/
                           PRE 0445d322/



[Errno 32] Broken pipe


Listing metadata on S3

In [207]:
%%sh

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksqldf/metadata/"

2021-05-06 07:23:11       2940 00000-c852f0b7-822e-4c93-927b-c110fe88d7de.metadata.json
2021-05-06 07:34:09       3969 00001-ef48b376-a213-4285-8bd4-1bf0038ad693.metadata.json
2021-05-06 07:34:09       9519 9b661ca2-2703-4eef-8258-25f3c8d9eda5-m0.avro
2021-05-06 07:34:09       3569 snap-1712768399067180034-1-9b661ca2-2703-4eef-8258-25f3c8d9eda5.avro


<a id="sparksqldstest"></a>
## Hybrid tests with Dataset + SparkSQL

Now lets try something similar with dataset. We will create a dataset object.

### Creating Dataset from Schema

In [275]:
import org.apache.spark.sql.types._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row

/* val schema = (new StructType()
               .add("id", IntegerType)
               .add("month",IntegerType)
               .add("sk",IntegerType)
               .add("txt",new StructType()
                    .add("key1",StringType))
               .add("uuid",StringType)
               .add("year",StringType)
               .add("modified_timestamp",TimestampType)
               .add("z",StringType)
               .add("schema_v", StringType)
               .add("data_v", StringType)
               .add("trade_dt", StringType)) */

case class TxtStruct(key1: String)

case class Random(id: BigInt, 
                  month: BigInt, 
                  sk: BigInt, 
                  txt: Seq[TxtStruct], 
                  uuid: String, 
                  year: String,
                  modified_timestamp: java.sql.Timestamp, 
                  z: String, 
                  schema_v: String, 
                  data_v: String, 
                  trade_dt: String)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.types._
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
defined class TxtStruct
defined class Random


In [282]:
val ds_input = input_df4.select($"id",$"month",$"sk"
                                //,$"txt".cast("array<struct<key1:string>>")
                                ,(array($"txt")) as "txt"
                                ,$"uuid",$"year",$"modified_timestamp",$"z"
                                ,$"schema_v",$"data_v",$"trade_dt").as[Random]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ds_input: org.apache.spark.sql.Dataset[Random] = [id: bigint, month: bigint ... 9 more fields]


In [296]:
ds_input.printSchema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- month: long (nullable = true)
 |-- sk: long (nullable = true)
 |-- txt: array (nullable = false)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- modified_timestamp: timestamp (nullable = true)
 |-- z: string (nullable = true)
 |-- schema_v: string (nullable = false)
 |-- data_v: string (nullable = false)
 |-- trade_dt: string (nullable = true)



### Creating a new iceberg table

In [303]:
spark.sql("""CREATE TABLE iceberg_table_sparksqlds6(id bigint,
                                       month bigint,
                                       sk bigint,
                                       -- txt struct<key1:string>,
                                       txt array<struct<key1:string>>,
                                       uuid string,
                                       year string,
                                       modified_timestamp timestamp,
                                       z string,
                                       `schema_v` string,   --had to rename these columns due to error
                                       `data_v` string,
                                       trade_dt string)
USING iceberg
OPTIONS ( 'write.object-storage.enabled'=true,
          'write.object-storage.path'='s3://vasveena-test-demo/iceberg/tables/data/sparksqlds6/')
PARTITIONED BY (z,`schema_v`,`data_v`,trade_dt)
LOCATION 's3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksqlds6'""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res384: org.apache.spark.sql.DataFrame = []


### Bulk insert dataset to iceberg table 

In [305]:
val t1 = System.nanoTime

//order by clause is needed to avoid error "Caused by: java.lang.IllegalStateException: Already closed files for partition: z=c/schema_v=v1/data_v=v2/trade_dt=2021-04-02"
ds_input.orderBy("z","`schema-v`","`data-v`","trade_dt").write.mode("overwrite").insertInto("iceberg_table_sparksqlds6")

val duration = (System.nanoTime - t1) / 1e9d + "seconds"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

t1: Long = 659476745464997
duration: String = 169.715551177seconds


### Reading data

In [310]:
spark.table("iceberg_table_sparksqlds6").show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----+--------+-----+--------------------+----+-------------------+---+--------+------+----------+
|      id|month|      sk|  txt|                uuid|year| modified_timestamp|  z|schema_v|data_v|  trade_dt|
+--------+-----+--------+-----+--------------------+----+-------------------+---+--------+------+----------+
|38000025|    3|38000025|[[L]]|60098a8c-0bf9-47f...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|38000031|    3|38000031|[[R]]|a4bf6610-36d0-43b...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|38000043|    6|38000043|[[D]]|495d0f6d-dc23-4e6...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|38000069|    6|38000069|[[D]]|9b85c17c-74bd-403...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
|38000075|   12|38000075|[[J]]|64ab0d92-7084-4ca...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|
+--------+-----+--------+-----+--------------------+----+-------------------+---+--------+------+----------+
only showing top 5 

### Listing S3 locations

Now lets check the S3 location of "write.object-storage.enabled". We can see hashing has been taken care of. 

In [306]:
%%sh

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/data/sparksqlds6/"

                           PRE 0333adfe/
                           PRE 0a311088/
                           PRE 11115d68/
                           PRE 1c72097b/
                           PRE 1fe6cac8/
                           PRE 23c08f89/
                           PRE 2910e802/
                           PRE 2c6f894c/
                           PRE 3fed2e9d/
                           PRE 4564c4dc/
                           PRE 4d18cde7/
                           PRE 5fff9515/
                           PRE 63fcfe26/
                           PRE 751ff565/
                           PRE 75d0d81d/
                           PRE 7ad4c521/


In [307]:
ds_input

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res390: org.apache.spark.sql.Dataset[Random] = [id: bigint, month: bigint ... 9 more fields]


Listing metadata on S3

In [309]:
%%sh

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksqlds6/metadata/"

2021-05-06 09:20:46       3076 00000-66de4ce7-3ae1-4efb-b73d-e39cd4d65fc5.metadata.json
2021-05-06 09:24:30       4107 00001-f8969a4d-52c2-4f37-a8a6-9c7072fdb5c8.metadata.json
2021-05-06 09:24:30       9587 698b5c19-2324-47da-a0d4-3a207de7fb05-m0.avro
2021-05-06 09:24:30       3565 snap-9006403180404379398-1-698b5c19-2324-47da-a0d4-3a207de7fb05.avro


<a id="udf"></a>
## Calling UDFs

Since we are able to read data from Iceberg table into Dataframe/Dataset, we can also manipulate data using UDF / Dataset APIs such as copy or flatMapGroups. 

### Dataset APIs

Let us first try Dataset APIs

In [311]:
val ds = spark.table("iceberg_table_sparksqlds6").as[Random]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ds: org.apache.spark.sql.Dataset[Random] = [id: bigint, month: bigint ... 9 more fields]


In [312]:
ds.printSchema

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- id: long (nullable = true)
 |-- month: long (nullable = true)
 |-- sk: long (nullable = true)
 |-- txt: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key1: string (nullable = true)
 |-- uuid: string (nullable = true)
 |-- year: string (nullable = true)
 |-- modified_timestamp: timestamp (nullable = true)
 |-- z: string (nullable = true)
 |-- schema_v: string (nullable = true)
 |-- data_v: string (nullable = true)
 |-- trade_dt: string (nullable = true)



#### Creating a random function

In [335]:
def somefunc(trade_dt:String, values:Iterator[Random]): TraversableOnce[(String,Random)] = {
  println("Print from somefunc")
  return values.map(x => (trade_dt, x))
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

somefunc: (trade_dt: String, values: Iterator[Random])TraversableOnce[(String, Random)]


Use flatMapGroups to call somefunc over trade_dt groups

In [343]:
val udf_ds = (ds.groupByKey(t => t.trade_dt)
              .flatMapGroups(somefunc))

udf_ds.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

udf_ds: org.apache.spark.sql.Dataset[(String, Random)] = [_1: string, _2: struct<id: decimal(38,0), month: decimal(38,0) ... 9 more fields>]
+----------+--------------------+
|        _1|                  _2|
+----------+--------------------+
|2021-04-02|[43215124, 6, 432...|
|2021-04-02|[32324242, 3, 323...|
|2021-04-02|[43215133, 8, 432...|
|2021-04-02|[32324257, 1, 323...|
|2021-04-02|[43215141, 9, 432...|
+----------+--------------------+
only showing top 5 rows



We can write this result back into an existing Iceberg table if needed

### Dataframe UDFs

In [369]:
val df = spark.table("iceberg_table_sparksqlds6")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

df: org.apache.spark.sql.DataFrame = [id: bigint, month: bigint ... 9 more fields]


Creating a test UDF

In [370]:
def someudf = (sk:java.math.BigDecimal) => {
  println("Print from UDF")
  ((sk+"1")).toString
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

someudf: java.math.BigDecimal => String


In [371]:
import org.apache.spark.sql.functions.udf
val someUDF = udf(someudf)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.functions.udf
someUDF: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($$$82b5b23cea489b2712a1db46c77e458$$$$w$$Lambda$6134/2001105709@dc9b50b,StringType,List(Some(class[value[0]: decimal(38,18)])),None,true,true)


In [372]:
val udf_df = df.withColumn("udfcol",someUDF($"id"))
udf_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

udf_df: org.apache.spark.sql.DataFrame = [id: bigint, month: bigint ... 10 more fields]
+--------+-----+--------+-----+--------------------+----+-------------------+---+--------+------+----------+--------------------+
|      id|month|      sk|  txt|                uuid|year| modified_timestamp|  z|schema_v|data_v|  trade_dt|              udfcol|
+--------+-----+--------+-----+--------------------+----+-------------------+---+--------+------+----------+--------------------+
|38000025|    3|38000025|[[L]]|60098a8c-0bf9-47f...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|38000025.00000000...|
|38000031|    3|38000031|[[R]]|a4bf6610-36d0-43b...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|38000031.00000000...|
|38000043|    6|38000043|[[D]]|495d0f6d-dc23-4e6...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|38000043.00000000...|
|38000069|    6|38000069|[[D]]|9b85c17c-74bd-403...|2019|2021-04-02 00:05:02|  0|      v1|    v2|2021-04-02|38000069.00000000...|
|3

In [366]:
spark.sql("""CREATE TABLE iceberg_table_sparksqludf2(id bigint,
                                       month bigint,
                                       sk bigint,
                                       -- txt struct<key1:string>,
                                       txt array<struct<key1:string>>,
                                       uuid string,
                                       year string,
                                       modified_timestamp timestamp,
                                       udfcol string,
                                       z string,
                                       `schema_v` string,   --had to rename these columns due to error
                                       `data_v` string,
                                       trade_dt string)
USING iceberg
OPTIONS ( 'write.object-storage.enabled'=true,
          'write.object-storage.path'='s3://vasveena-test-demo/iceberg/tables/data/sparksqludf2/')
PARTITIONED BY (z,`schema_v`,`data_v`,trade_dt)
LOCATION 's3://vasveena-test-demo/iceberg/tables/catalog/example_iceberg_perf_test_sparksqludf2'""")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res409: org.apache.spark.sql.DataFrame = []


In [381]:
udf_df.orderBy("z","`schema_v`","`data_v`","trade_dt").write.mode("overwrite").insertInto("iceberg_table_sparksqludf2")

VBox()

An error was encountered:
Session 8 did not reach idle status in time. Current status is busy.


### Listing S3 location

In [377]:
%%sh

aws s3 ls "s3://vasveena-test-demo/iceberg/tables/data/sparksqludf2/"

CalledProcessError: Command 'b'\naws s3 ls "s3://vasveena-test-demo/iceberg/tables/data/sparksqludf2/"\n'' returned non-zero exit status 1.

Result is as expected. So we should be able to transform intermediate data as long as we write the output to an existing Iceberg table. 