# Building real time live incremental data lake on S3 using Apache Hudi + Spark structured streaming + Iceberg

In [None]:
%%configure -f
{
    "conf":  { 
             "spark.jars":"hdfs:///user/hadoop/aws-java-sdk-bundle-1.12.31.jar,hdfs:///user/hadoop/httpcore-4.4.11.jar,hdfs:///user/hadoop/httpclient-4.5.9.jar,hdfs:////user/hadoop/hudi-spark-bundle.jar,hdfs:///user/hadoop/spark-avro.jar",
             "spark.sql.hive.convertMetastoreParquet":"false", 
             "spark.serializer":"org.apache.spark.serializer.KryoSerializer"
           } 
}

You can also run the commands on Spark shell

spark-shell --jars hdfs:///user/hadoop/aws-java-sdk-bundle-1.12.31.jar,hdfs:///user/hadoop/httpcore-4.4.11.jar,hdfs:///user/hadoop/httpclient-4.5.9.jar,hdfs:////user/hadoop/hudi-spark-bundle.jar,hdfs:///user/hadoop/spark-avro.jar --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.serializer=org.apache.spark.serializer.KryoSerializer

In [None]:
// General Constants
val HUDI_FORMAT = "org.apache.hudi"
val TABLE_NAME = "hoodie.table.name"
val RECORDKEY_FIELD_OPT_KEY = "hoodie.datasource.write.recordkey.field"
val PRECOMBINE_FIELD_OPT_KEY = "hoodie.datasource.write.precombine.field"
val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
val BULK_INSERT_OPERATION_OPT_VAL = "bulk_insert"
val UPSERT_OPERATION_OPT_VAL = "upsert"
val BULK_INSERT_PARALLELISM = "hoodie.bulkinsert.shuffle.parallelism"
val UPSERT_PARALLELISM = "hoodie.upsert.shuffle.parallelism"
val S3_CONSISTENCY_CHECK = "hoodie.consistency.check.enabled"
val HUDI_CLEANER_POLICY = "hoodie.cleaner.policy"
val KEEP_LATEST_COMMITS = "KEEP_LATEST_COMMITS"
val HUDI_COMMITS_RETAINED = "hoodie.cleaner.commits.retained"
val PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class"
val EMPTY_PAYLOAD_CLASS_OPT_VAL = "org.apache.hudi.common.model.EmptyHoodieRecordPayload"
val TABLE_TYPE_OPT_KEY="hoodie.datasource.write.table.type"

// Hive Constants
val HIVE_SYNC_ENABLED_OPT_KEY="hoodie.datasource.hive_sync.enable"
val HIVE_PARTITION_FIELDS_OPT_KEY="hoodie.datasource.hive_sync.partition_fields"
val HIVE_ASSUME_DATE_PARTITION_OPT_KEY="hoodie.datasource.hive_sync.assume_date_partitioning"
val HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY="hoodie.datasource.hive_sync.partition_extractor_class"
val HIVE_TABLE_OPT_KEY="hoodie.datasource.hive_sync.table"

// Partition Constants
val NONPARTITION_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.NonPartitionedExtractor"
val MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL="org.apache.hudi.hive.MultiPartKeysValueExtractor"
val KEYGENERATOR_CLASS_OPT_KEY="hoodie.datasource.write.keygenerator.class"
val NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.keygen.NonpartitionedKeyGenerator"
val COMPLEX_KEYGENERATOR_CLASS_OPT_VAL="org.apache.hudi.ComplexKeyGenerator"
val PARTITIONPATH_FIELD_OPT_KEY="hoodie.datasource.write.partitionpath.field"

//Incremental Constants
val VIEW_TYPE_OPT_KEY="hoodie.datasource.view.type"
val BEGIN_INSTANTTIME_OPT_KEY="hoodie.datasource.read.begin.instanttime"
val VIEW_TYPE_INCREMENTAL_OPT_VAL="incremental"
val END_INSTANTTIME_OPT_KEY="hoodie.datasource.read.end.instanttime"

In [None]:
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
import java.util.HashMap
import spark.implicits._
import org.apache.hudi._

In [None]:
val trip_update_topic = "trip_update_topic"
val trip_status_topic = "trip_status_topic"
val broker = "yourbootstrapbrokers"

In [None]:
object MTASubwayTripUpdates extends Serializable {

    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    @transient var producer : KafkaProducer[String, String] = null
    var msgId : Long = 1
    @transient var joined_query : StreamingQuery = null
    @transient var joined_query_s3 : StreamingQuery = null

    val spark = SparkSession.builder.appName("MSK streaming Example").getOrCreate()
    

    def start() = {
        //Start producer for kafka
        producer = new KafkaProducer[String, String](props)

        //Create a datastream from trip update topic
        val trip_update_df = spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", broker)
        .option("subscribe", trip_update_topic)
        .option("startingOffsets", "latest").option("failOnDataLoss","false").load()

        //Create a datastream from trip status topic
        val trip_status_df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker)
        .option("subscribe", trip_status_topic)
        .option("startingOffsets", "latest").option("failOnDataLoss","false").load()

        // define schema of data

        val trip_update_schema = new StructType()
        .add("trip", new StructType().add("tripId","string").add("startTime","string").add("startDate","string").add("routeId","string"))
        .add("stopTimeUpdate",ArrayType(new StructType().add("arrival",new StructType().add("time","string")).add("stopId","string").add("departure",new StructType().add("time","string"))))

        val trip_status_schema = new StructType()
        .add("trip", new StructType().add("tripId","string").add("startTime","string").add("startDate","string").add("routeId","string")).add("currentStopSequence","integer").add("currentStatus", "string").add("timestamp", "string").add("stopId","string")

        // covert datastream into a datasets and apply schema
        val trip_update_ds = trip_update_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
        val trip_update_ds_schema = trip_update_ds
        .select(from_json($"value", trip_update_schema).as("data")).select("data.*")
        trip_update_ds_schema.printSchema()

        val trip_status_ds = trip_status_df
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
        val trip_status_ds_schema = trip_status_ds
        .select(from_json($"value", trip_status_schema).as("data")).select("data.*")
        trip_status_ds_schema.printSchema()

        val trip_status_ds_unnest = trip_status_ds_schema
        .select("trip.*","currentStopSequence","currentStatus","timestamp","stopId")

        val trip_update_ds_unnest = trip_update_ds_schema
        .select($"trip.*", $"stopTimeUpdate.arrival.time".as("arrivalTime"), 
                $"stopTimeUpdate.departure.time".as("depatureTime"), $"stopTimeUpdate.stopId")

        val trip_update_ds_unnest2 = trip_update_ds_unnest
        .withColumn("numOfFutureStops", size($"arrivalTime"))
        .withColumnRenamed("stopId","futureStopIds")

        val joined_ds = trip_update_ds_unnest2
        .join(trip_status_ds_unnest, Seq("tripId","routeId","startTime","startDate"))
        .withColumn("startTime",(col("startTime").cast("timestamp")))
        .withColumn("currentTs",from_unixtime($"timestamp".divide(1000)))
        .drop("startDate").drop("timestamp")

        joined_ds.printSchema()
        
        def myFunc( batchDF:DataFrame, batchID:Long ) : Unit = {
            batchDF.persist()
            batchDF.write.format("org.apache.hudi")
                .option(TABLE_TYPE_OPT_KEY, "COPY_ON_WRITE")
                .option(PRECOMBINE_FIELD_OPT_KEY, "currentTs")
                .option(RECORDKEY_FIELD_OPT_KEY, "tripId")
                .option(TABLE_NAME, "hudi_trips_streaming_table")
                .option(UPSERT_PARALLELISM, 200)
                .option(HUDI_CLEANER_POLICY, KEEP_LATEST_COMMITS)
                .option(S3_CONSISTENCY_CHECK, "true")
                .option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
                .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
                .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY,NONPARTITION_EXTRACTOR_CLASS_OPT_VAL)
                .option(KEYGENERATOR_CLASS_OPT_KEY,NONPARTITIONED_KEYGENERATOR_CLASS_OPT_VAL)
                .mode(SaveMode.Append)
                .save("s3://mrworkshop-youraccountID-dayone/hudi/streaming_output/")
            
            batchDF.unpersist()
        }
        
        val query = joined_ds.writeStream.queryName("lab3")
        .trigger(Trigger.ProcessingTime("30 seconds"))
        .foreachBatch(myFunc _)
    
      .option("checkpointLocation", "/user/hadoop/checkpoint")
      .start()
        
      query.awaitTermination()
        
    }
}

In [None]:
MTASubwayTripUpdates.start