##### ICEBERG - SCHEMA EVOLUTION
##### Objective: To test the schema evolution by adding, updating, deleting columns, and changing the datatype of columns 
##### Dataset format: This nested JSON dataset is about meteorite landing which is downloaded from the public source mentioned below
##### Dataset Source: https://catalog.data.gov/dataset/

##### 1.1 Configure Spark Iceberg Runtime package and other settings

In [2]:
spark.conf.set("spark.sql.catalog.mycatalog.type", "Hadoop")

In [3]:
spark.conf.set("spark-sql.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2")
spark.conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
spark.conf.set("spark.sql.catalog.mycatalog", "org.apache.iceberg.spark.SparkSessionCatalog")

org.apache.spark.sql.AnalysisException:  Cannot modify the value of a static config: spark.sql.extensions.

##### packages that need to be downloaded and used during the Spark session.
###### spark.conf.set("spark-sql.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2")

##### This specifies any extensions to SQL that should be present in the Spark session
###### spark.conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")

##### The settings below are to configure your specific catalog, which can be under a namespace of your choosing eg: here it is spark_catalog
##### This specifies that this specific catalog is using the Apache Iceberg Spark Catalog class.
###### spark.conf.set("spark.sql.catalog.mycatalog", "org.apache.iceberg.spark.SparkSessionCatalog")

##### This setting is used to set the type of catalog you are using, and possible values include:Hadoop (if using HDFS/File System Catalog), Hive
###### spark.conf.set("spark.sql.catalog.mycatalog.type", "Hadoop")

##### 1.2 The spark.sql.extensions might show an error that "Cannot modify the value of a static config: spark.sql.extensions" which means it's not set, However, when its value is checked Using getter configuration, it displays the value as set above i.e. IcebergSparkSessionExtensions

In [4]:
spark.conf.get("spark.sql.catalog.mycatalog.type")

res3: String = Hadoop


##### 2.0 Read the nested json dataset and verify its schema

In [8]:
val jsonDF =  spark.read.option("multiline", "true").json("../Data/Json/meteorite_landing.json")

jsonDF: org.apache.spark.sql.DataFrame = [data: array<array<string>>, meta: struct<view: struct<approvals: array<struct<reviewedAt:bigint,reviewedAutomatically:boolean,state:string,submissionDetails:struct<permissionType:string>,submissionId:bigint,submissionObject:string,submissionOutcome:string,submissionOutcomeApplication:struct<failureCount:bigint,status:string>,submittedAt:bigint,submitter:struct<displayName:string,id:string>,targetAudience:string,workflowId:bigint>>, assetType: string ... 38 more fields>>]


In [10]:
jsonDF.printSchema

root
 |-- data: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- meta: struct (nullable = true)
 |    |-- view: struct (nullable = true)
 |    |    |-- approvals: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- reviewedAt: long (nullable = true)
 |    |    |    |    |-- reviewedAutomatically: boolean (nullable = true)
 |    |    |    |    |-- state: string (nullable = true)
 |    |    |    |    |-- submissionDetails: struct (nullable = true)
 |    |    |    |    |    |-- permissionType: string (nullable = true)
 |    |    |    |    |-- submissionId: long (nullable = true)
 |    |    |    |    |-- submissionObject: string (nullable = true)
 |    |    |    |    |-- submissionOutcome: string (nullable = true)
 |    |    |    |    |-- submissionOutcomeApplication: struct (nullable = true)
 |    |    |    |    |    |-- failureCount: long (nullable = true

##### source: https://stackoverflow.com/questions/61863489/flatten-nested-json-in-scala-spark-dataframe/61863579#61863579
##### 2.1 The dynamic code in scala is referred from the above source where it explodes the nested JSON into individual columns, since all of these are not arrays we cannot use the explode function and also it will make it more cumbersome to process individual columns. The below code dynamically splits arrays, and structs type into individual columns keeping its hierarchy intact, For eg if c is the nested child of b which is a child of an i.e. a.b.c, the code will split it as an a_b_c column. This will also prevent duplicating columns in case nested JSON has the same property name because its specific hierarchy will be attached to its name now.

In [11]:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try

implicit class DFHelpers(df: DataFrame) {
    def columns = {
      val dfColumns = df.columns.map(_.toLowerCase)
      df.schema.fields.flatMap { data =>
        data match {
          case column if column.dataType.isInstanceOf[StructType] => {
            column.dataType.asInstanceOf[StructType].fields.map { field =>
              val columnName = column.name
              val fieldName = field.name
              col(s"${columnName}.${fieldName}").as(s"${columnName}_${fieldName}")
            }.toList
          }
          case column => List(col(s"${column.name}"))
        }
      }
    }

    def flatten: DataFrame = {
      val empty = df.schema.filter(_.dataType.isInstanceOf[StructType]).isEmpty
      empty match {
        case false =>
          df.select(columns: _*).flatten
        case _ => df
      }
    }
    def explodeColumns = {
      @tailrec
      def columns(cdf: DataFrame):DataFrame = cdf.schema.fields.filter(_.dataType.typeName == "array") match {
        case c if !c.isEmpty => columns(c.foldLeft(cdf)((dfa,field) => {
          dfa.withColumn(field.name,explode_outer(col(s"${field.name}"))).flatten
        }))
        case _ => cdf
      }
      columns(df.flatten)
    }
}



import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try
defined class DFHelpers


##### 2.2 Call the above function with the dataframe variable

In [12]:
val flattenedJsonDF = jsonDF.explodeColumns

flattenedJsonDF: org.apache.spark.sql.DataFrame = [data: string, meta_view_approvals_reviewedAt: bigint ... 100 more fields]


##### 2.3 Verifying the schema after the function is run. It maintained the hierarchy by separating the names with underscore symbole ('-') between each data column. This will also prevent duplicating columns in case nested json has the same property name because its specific hierarchy will be attached to its name now.
##### For eg: the property 'flags' are now prefixed with its hierarchy meta_view_flags, meta_view_owner_flags, meta_view_columns_flags

In [13]:
flattenedJsonDF.printSchema

root
 |-- data: string (nullable = true)
 |-- meta_view_approvals_reviewedAt: long (nullable = true)
 |-- meta_view_approvals_reviewedAutomatically: boolean (nullable = true)
 |-- meta_view_approvals_state: string (nullable = true)
 |-- meta_view_approvals_submissionDetails_permissionType: string (nullable = true)
 |-- meta_view_approvals_submissionId: long (nullable = true)
 |-- meta_view_approvals_submissionObject: string (nullable = true)
 |-- meta_view_approvals_submissionOutcome: string (nullable = true)
 |-- meta_view_approvals_submissionOutcomeApplication_failureCount: long (nullable = true)
 |-- meta_view_approvals_submissionOutcomeApplication_status: string (nullable = true)
 |-- meta_view_approvals_submittedAt: long (nullable = true)
 |-- meta_view_approvals_submitter_displayName: string (nullable = true)
 |-- meta_view_approvals_submitter_id: string (nullable = true)
 |-- meta_view_approvals_targetAudience: string (nullable = true)
 |-- meta_view_approvals_workflowId: long (

In [14]:
flattenedJsonDF.show(2)

+--------------------+------------------------------+-----------------------------------------+-------------------------+----------------------------------------------------+--------------------------------+------------------------------------+-------------------------------------+-------------------------------------------------------------+-------------------------------------------------------+-------------------------------+-----------------------------------------+--------------------------------+----------------------------------+------------------------------+-------------------+-------------------------+-----------------------+------------------+----------------------------------------------+----------------------------------------+--------------------------------------------+--------------------------------------+----------------------------------------+-----------------------------------------+-------------------------------------+-----------------------------------------+---

##### 3. Finally :-), the core step, create an iceberg table and write the json data to it. 
##### Once the catalog is created, for the next writes use append to add to an existing table, create for new , replace to overwrite, or use both createOrReplace for the safer side if one wants to replace and create a new table if it already exists to 

In [16]:
val icedTable = flattenedJsonDF.writeTo("iceberg.mydb.meteorites").using("iceberg").createOrReplace()

icedTable: Unit = ()


##### 3.1 Verify if the table is created using iceberg format

In [17]:
val meteorTable = spark.read.format("iceberg").load("iceberg.mydb.meteorites")

meteorTable: org.apache.spark.sql.DataFrame = [data: string, meta_view_approvals_reviewedAt: bigint ... 100 more fields]


##### 3.2 Display result

In [18]:
meteorTable.show(2)

+--------------------+------------------------------+-----------------------------------------+-------------------------+----------------------------------------------------+--------------------------------+------------------------------------+-------------------------------------+-------------------------------------------------------------+-------------------------------------------------------+-------------------------------+-----------------------------------------+--------------------------------+----------------------------------+------------------------------+-------------------+-------------------------+-----------------------+------------------+----------------------------------------------+----------------------------------------+--------------------------------------------+--------------------------------------+----------------------------------------+-----------------------------------------+-------------------------------------+-----------------------------------------+---

##### 3.3 Check the count to test later when we append the data again

In [19]:
meteorTable.count()

res11: Long = 542500


##### 3.4 Check the count of columns to test the columsn size post delete step executions

In [20]:
meteorTable.columns.length

res12: Int = 102


##### 4 The Schema Evolution Feature test Begins here. Not really :-p, It is a test before the actual test
##### Testing if any of the cases of Schema Evolution works by default without further configurations. 
##### These 4.x steps are to verify if any of the schema evolution cases works by default before configuring schema evolution-specific settings. These executions are run to compare results before and after schema evolution configurations.

##### 4.1 From the original dataset, I have added new integer column and saved it as new dataset, so that I can write back to the iceberg table where the schema is already set due to the first write execution
##### Added new integer column : testintproperty
##### Steps are - Read the modified JSON dataset, explode it, and then append it to the existing iceberg table

In [26]:
val addnewintcolDF =  spark.read.option("multiline", "true").json("../Data/Json/meteorite_landing_addnew_int.json")
val flattenedsinglechangeDF = addnewintcolDF.explodeColumns
val icedsinglechangedTable = flattenedsinglechangeDF.writeTo("iceberg.mydb.meteorites").append()

org.apache.spark.sql.AnalysisException:  [INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS] Cannot write to `demo`.`iceberg`.`mydb`.`meteorites`, the reason is too many data columns:

##### 4.2 From the original dataset, I have added new integer,double,bool and strings column and saved it as new dataset, so that I can write back to the iceberg table where the schema is already set due to the first write execution
##### New Integer Column : testintproperty
##### New Double Column : testdoubleprop
##### New String Column : teststring
##### New Bool Column : testflag
##### Steps are - Read the modified JSON dataset, explode it, and then append it to the existing iceberg table

In [27]:
val addnewmultiplecolDF =  spark.read.option("multiline", "true").json("../Data/Json/meteorite_landing_addnew.json")
val flattenedallchangesDF = addnewmultiplecolDF.explodeColumns
val icedallchangedTable = flattenedallchangesDF.writeTo("iceberg.mydb.meteorites").append()

org.apache.spark.sql.AnalysisException:  [INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS] Cannot write to `demo`.`iceberg`.`mydb`.`meteorites`, the reason is too many data columns:

##### 5.0 SCHEMA EVOLUTION TEST(actually) BEGINS HERE 
##### Source: https://iceberg.apache.org/docs/latest/spark-writes/#schema-merge
##### Set the table property to accept any schema as per the above Apache documentation source before 

##### The documentation from the above link states below
##### Schema Merge🔗
##### While inserting or updating Iceberg is capable of resolving schema mismatch at runtime. 
##### If configured, Iceberg will perform an automatic schema evolution as follows:
##### A new column is present in the source but not in the target table.
##### The new column is added to the target table. Column values are set to NULL in all the rows already present in the table
##### A column is present in the target but not in the source.
##### The target column value is set to NULL when inserting or left unchanged when updating the row.
##### The target table must be configured to accept any schema change by setting the property write.spark.accept-any-schema to true.

In [29]:
spark.sql("ALTER TABLE iceberg.mydb.meteorites SET TBLPROPERTIES ('write.spark.accept-any-schema'='true')")

res14: org.apache.spark.sql.DataFrame = []


##### 5.1 This step is the same as 4.1, From the original dataset, I have added new integer column and saved it as new dataset, so that I can write back to the iceberg table where the schema is already set due to the first write execution
##### Added new integer column : testintproperty
##### Steps are - Read the modified JSON dataset, explode it, and then append it to the existing iceberg table

In [30]:
val addnewintcolDF =  spark.read.option("multiline", "true").json("../Data/Json/meteorite_landing_addnew_int.json")
val flattenedsinglechangeDF = addnewintcolDF.explodeColumns
val icedsinglechangedTable = flattenedsinglechangeDF.writeTo("iceberg.mydb.meteorites").append()

java.lang.IllegalArgumentException:  Field meta_view_testintproperty not found in source schema

In [31]:
val addnewmultiplecolDF =  spark.read.option("multiline", "true").json("../Data/Json/meteorite_landing_addnew.json")
val flattenedallchangesDF = addnewmultiplecolDF.explodeColumns
val icedallchangedTable = flattenedallchangesDF.writeTo("iceberg.mydb.meteorites").append()

java.lang.IllegalArgumentException:  Field meta_view_approvals_testdoubleprop not found in source schema