# Import Neccessay Module

In [None]:
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool

val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExample")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("aws.region", "ap-southeast-1")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")    
    .enableHiveSupport()
    .getOrCreate();

# Generate Big Data

## Generate Insert Data

In [None]:
var start_cursor: Long = 0
for (_ <- 1 to 100) {
    println(start_cursor)
    var basePath = s"s3a://sellsuki-data-lake-dev/temp/test_hudi/append_bigdata2/start_$start_cursor"
    println(basePath)
    var append_df = spark.range(start_cursor, start_cursor+100000)
      .withColumn("col1", rand())
      .withColumn("col2", rand())
      .withColumn("col3", rand())
      .withColumn("col4", rand())
      .withColumn("col5", rand())

    var append_dfWithPartition = append_df
        .withColumn("Partition", (col("col1") * 1000000 / 10000).cast("Integer"))
        .withColumn("Partition2", (col("col1") * 1000000 / 100000).cast("Integer"))

    var temp = append_dfWithPartition.select(max("id")).collect()

    append_dfWithPartition.write.parquet(basePath)

    if (temp(0)(0) != null) {
        start_cursor = temp(0).getLong(0) + 1
    } else {
        start_cursor = 0 // or any other value you want to set when the DataFrame is empty
    }
}

## Generate Update Data

In [None]:
var start_cursor: Long = 0
for (_ <- 1 to 10) {
    println(start_cursor)
    var basePath = s"s3a://sellsuki-data-lake-dev/temp/test_hudi/append_bigdata3/start_$start_cursor"
    println(basePath)
    var append_df = spark.range(start_cursor, start_cursor+100000)
      .withColumn("col1", rand())
      .withColumn("col2", rand())
      .withColumn("col3", rand())
      .withColumn("col4", rand())
      .withColumn("col5", rand())

    var append_dfWithPartition = append_df
        .withColumn("Partition", (col("col1") * 1000000 / 10000).cast("Integer"))
        .withColumn("Partition2", (col("col1") * 1000000 / 100000).cast("Integer"))

    var temp = append_dfWithPartition.select(max("id")).collect()

    append_dfWithPartition.write.parquet(basePath)

    if (temp(0)(0) != null) {
        start_cursor = temp(0).getLong(0) + 1
    } else {
        start_cursor = 0 // or any other value you want to set when the DataFrame is empty
    }
}

In [None]:
var start_cursor: Long = 0
for (_ <- 1 to 100) {
    println(start_cursor)
    var basePath = s"s3a://sellsuki-data-lake-dev/temp/test_hudi/append_bigdata2_1/start_$start_cursor"
    println(basePath)
    var append_df = spark.range(start_cursor, start_cursor+10000)
      .withColumn("col1", rand())
      .withColumn("col2", rand())
      .withColumn("col3", rand())
      .withColumn("col4", rand())
      .withColumn("col5", rand())

    var append_dfWithPartition = append_df
        .withColumn("Partition", (col("col1") * 1000000 / 10000).cast("Integer"))
        .withColumn("Partition2", (col("col1") * 1000000 / 100000).cast("Integer"))

    var temp = append_dfWithPartition.select(max("id")).collect()

    append_dfWithPartition.write.parquet(basePath)

    if (temp(0)(0) != null) {
        start_cursor = temp(0).getLong(0) + 1
    } else {
        start_cursor = 0 // or any other value you want to set when the DataFrame is empty
    }
}

# Write MOR Hudi Table

## 10m Records

In [None]:
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool

val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExample")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("aws.region", "ap-southeast-1")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")    
    .enableHiveSupport()
    .getOrCreate();

var start_cursor: Long = 0
var parqDF = spark.read.parquet("s3a://data-lake-dev/temp/test_hudi/append_bigdata2/start_0")
val startTime_3 = System.currentTimeMillis()
for (_ <- 1 to 100) {
    println(start_cursor)
    var index = start_cursor*100000
    val tableName = "big_data_mor_hudi"
    val tableBase = "s3a://data-lake-dev/temp/test_hudi/v5_big_data_mor_hudi"
    var basePath = s"s3a://data-lake-dev/temp/test_hudi/append_bigdata2/start_$index"
    println(basePath)

    val parqDF = spark.read.parquet(basePath)

    parqDF.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator").
      option("hoodie.compact.inline.max.delta.seconds", "86400").
      option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
      option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL).
      option(PRECOMBINE_FIELD_OPT_KEY, "Partition2").
      option(PARTITIONPATH_FIELD_OPT_KEY, "Partition2").
      option(RECORDKEY_FIELD_OPT_KEY, "id").
      option("hoodie.table.name", tableName).
      option("hoodie.datasource.write.hive_style_partitioning","true").
      mode(Append).
      save(tableBase)

    start_cursor = start_cursor + 1
}

val endTime_3 = System.currentTimeMillis()
val executionTime_3 = endTime_3 - startTime_3
println(s"Append MOR Execution time: $executionTime_3 ms")

- Append MOR Execution time: 2328205 ms   ~ 38.80341667 min   846.8 MB

## 1m Record

In [None]:
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool

val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExample")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("aws.region", "ap-southeast-1")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")    
    .enableHiveSupport()
    .getOrCreate();

var start_cursor: Long = 0
var parqDF = spark.read.parquet("s3a://sellsuki-data-lake-dev/temp/test_hudi/append_bigdata2_1/start_0")
val startTime_3 = System.currentTimeMillis()
for (_ <- 1 to 100) {
    println(start_cursor)
    var index = start_cursor*10000
    val tableName = "big_data_mor_hudi"
    val tableBase = "s3a://sellsuki-data-lake-dev/temp/test_hudi/v5_big_data_mor_hudi2_1"
    var basePath = s"s3a://sellsuki-data-lake-dev/temp/test_hudi/append_bigdata2_1/start_$index"
    println(basePath)

    val parqDF = spark.read.parquet(basePath)

    parqDF.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
      option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator").
      option("hoodie.compact.inline.max.delta.seconds", "86400").
      option(TABLE_TYPE.key, MOR_TABLE_TYPE_OPT_VAL).
      option(PRECOMBINE_FIELD_OPT_KEY, "Partition2").
      option(PARTITIONPATH_FIELD_OPT_KEY, "Partition2").
      option(RECORDKEY_FIELD_OPT_KEY, "id").
      option("hoodie.table.name", tableName).
      option("hoodie.datasource.write.hive_style_partitioning","true").
      mode(Append).
      save(tableBase)

    start_cursor = start_cursor + 1
}

val endTime_3 = System.currentTimeMillis()
val executionTime_3 = endTime_3 - startTime_3
println(s"Append MOR Execution time: $executionTime_3 ms")

# Write COW Hudi Table

## 10M Records

In [None]:
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool

val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExample")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("aws.region", "ap-southeast-1")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")    
    .enableHiveSupport()
    .getOrCreate();

var start_cursor: Long = 0
var parqDF = spark.read.parquet("s3a://data-lake-dev/temp/test_hudi/append_bigdata2/start_0")

val startTime_3 = System.currentTimeMillis()
for (_ <- 1 to 100) {
    println(start_cursor)
    var index = start_cursor*100000
    val tableName = "big_data_cow_hudi"
    val tableBase = "s3a://data-lake-dev/temp/test_hudi/v5_big_data_cow_hudi2"
    var basePath = s"s3a://data-lake-dev/temp/test_hudi/append_bigdata2/start_$index"
    println(basePath)

    val parqDF = spark.read.parquet(basePath)

    parqDF.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
      option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator").
      option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL).
      option(PRECOMBINE_FIELD_OPT_KEY, "Partition2").
      option(PARTITIONPATH_FIELD_OPT_KEY, "Partition2").
      option(RECORDKEY_FIELD_OPT_KEY, "id").
      option("hoodie.table.name", tableName).
      option("hoodie.datasource.write.hive_style_partitioning","true").
      mode(Append).
      save(tableBase)

    start_cursor = start_cursor + 1

}
val endTime_3 = System.currentTimeMillis()
val executionTime_3 = endTime_3 - startTime_3
println(s"Append COW Execution time: $executionTime_3 ms")

- Append COW Execution time: 2147978 ms   ~ 35.79963333 min   846.8 MB

## 1M Records

In [None]:
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool

val spark = SparkSession.builder()
    .master("local[1]")
    .appName("SparkByExample")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("aws.region", "ap-southeast-1")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")    
    .enableHiveSupport()
    .getOrCreate();

var start_cursor: Long = 0
var parqDF = spark.read.parquet("s3a://sellsuki-data-lake-dev/temp/test_hudi/append_bigdata2/start_0")

val startTime_3 = System.currentTimeMillis()
for (_ <- 1 to 100) {
    println(start_cursor)
    var index = start_cursor*10000
    val tableName = "big_data_cow_hudi"
    val tableBase = "s3a://sellsuki-data-lake-dev/temp/test_hudi/v5_big_data_cow_hudi2_1"
    var basePath = s"s3a://sellsuki-data-lake-dev/temp/test_hudi/append_bigdata2_1/start_$index"
    println(basePath)

    val parqDF = spark.read.parquet(basePath)

    parqDF.write.format("hudi").
      options(getQuickstartWriteConfigs).
      option(OPERATION_OPT_KEY, BULK_INSERT_OPERATION_OPT_VAL).
      option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.SimpleKeyGenerator").
      option(TABLE_TYPE.key, COW_TABLE_TYPE_OPT_VAL).
      option(PRECOMBINE_FIELD_OPT_KEY, "Partition2").
      option(PARTITIONPATH_FIELD_OPT_KEY, "Partition2").
      option(RECORDKEY_FIELD_OPT_KEY, "id").
      option("hoodie.table.name", tableName).
      option("hoodie.datasource.write.hive_style_partitioning","true").
      mode(Append).
      save(tableBase)

    start_cursor = start_cursor + 1

}
val endTime_3 = System.currentTimeMillis()
val executionTime_3 = endTime_3 - startTime_3
println(s"Append COW Execution time: $executionTime_3 ms")