# Building real time live incremental data lake on S3 using Apache Iceberg + Spark structured streaming

In [1]:
%%configure -f
{
    "conf":  {       
             "spark.sql.catalog.glue_catalog1":"org.apache.iceberg.spark.SparkCatalog",
             "spark.sql.catalog.glue_catalog1.warehouse":"s3://vasveena-dremio/iceberg/glue_catalog1/tables/",
             "spark.sql.catalog.glue_catalog1.catalog-impl":"org.apache.iceberg.aws.glue.GlueCatalog",
             "spark.sql.catalog.glue_catalog1.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
             "spark.sql.catalog.glue_catalog1.lock-impl":"org.apache.iceberg.aws.glue.DynamoLockManager",
             "spark.sql.catalog.glue_catalog1.lock.table":"myGlueLockTable",
             "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
           } 
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1659608060840_0001,pyspark,idle,Link,Link,,


In [2]:
%%sql 

use glue_catalog1

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,application_1659608060840_0004,spark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [3]:
%%sql
CREATE SCHEMA IF NOT EXISTS streamdb;

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [4]:
%%sql
use streamdb

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [18]:
%%sql
drop table if exists glue_catalog1.streamdb.streaming_trips_table

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [19]:
%%sql 
CREATE TABLE glue_catalog1.streamdb.streaming_trips_table
(
  tripId string, 
  routeId string, 
  startTime timestamp, 
  arrivalTime array<string>, 
  depatureTime array<string>, 
  futureStopIds array<string>, 
  numOfFutureStops int, 
  currentStopSequence int, 
  currentStatus string, 
  stopId string, 
  currentTs string)
USING iceberg

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(), EncodingWidget(children=(VBox(children=(HTML(value='Encoding:'), Dropdown(description='…

Output()

In [20]:
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
import java.util.HashMap
import spark.implicits._

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.types._
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
import java.util.HashMap
import spark.implicits._


In [21]:
val trip_update_topic = "trip_update_topic"
val trip_status_topic = "trip_status_topic"
val broker = "b-1.test.1tklkx.c2.kafka.us-east-1.amazonaws.com:9092,b-3.test.1tklkx.c2.kafka.us-east-1.amazonaws.com:9092,b-2.test.1tklkx.c2.kafka.us-east-1.amazonaws.com:9092"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

trip_update_topic: String = trip_update_topic
trip_status_topic: String = trip_status_topic
broker: String = b-1.test.1tklkx.c2.kafka.us-east-1.amazonaws.com:9092,b-3.test.1tklkx.c2.kafka.us-east-1.amazonaws.com:9092,b-2.test.1tklkx.c2.kafka.us-east-1.amazonaws.com:9092


In [22]:
object MTASubwayTripUpdates extends Serializable {

    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")

    @transient var producer : KafkaProducer[String, String] = null
    var msgId : Long = 1
    @transient var joined_query : StreamingQuery = null
    @transient var joined_query_s3 : StreamingQuery = null

    val spark = SparkSession.builder.appName("MSK streaming Example").getOrCreate()
    

    def start() = {
        //Start producer for kafka
        producer = new KafkaProducer[String, String](props)

        //Create a datastream from trip update topic
        val trip_update_df = spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", broker)
        .option("subscribe", trip_update_topic)
        .option("startingOffsets", "latest").option("failOnDataLoss","false").load()

        //Create a datastream from trip status topic
        val trip_status_df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", broker)
        .option("subscribe", trip_status_topic)
        .option("startingOffsets", "latest").option("failOnDataLoss","false").load()

        // define schema of data

        val trip_update_schema = new StructType()
        .add("trip", new StructType().add("tripId","string").add("startTime","string").add("startDate","string").add("routeId","string"))
        .add("stopTimeUpdate",ArrayType(new StructType().add("arrival",new StructType().add("time","string")).add("stopId","string").add("departure",new StructType().add("time","string"))))

        val trip_status_schema = new StructType()
        .add("trip", new StructType().add("tripId","string").add("startTime","string").add("startDate","string").add("routeId","string")).add("currentStopSequence","integer").add("currentStatus", "string").add("timestamp", "string").add("stopId","string")

        // covert datastream into a datasets and apply schema
        val trip_update_ds = trip_update_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
        val trip_update_ds_schema = trip_update_ds
        .select(from_json($"value", trip_update_schema).as("data")).select("data.*")
        trip_update_ds_schema.printSchema()

        val trip_status_ds = trip_status_df
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
        val trip_status_ds_schema = trip_status_ds
        .select(from_json($"value", trip_status_schema).as("data")).select("data.*")
        trip_status_ds_schema.printSchema()

        val trip_status_ds_unnest = trip_status_ds_schema
        .select("trip.*","currentStopSequence","currentStatus","timestamp","stopId")

        val trip_update_ds_unnest = trip_update_ds_schema
        .select($"trip.*", $"stopTimeUpdate.arrival.time".as("arrivalTime"), 
                $"stopTimeUpdate.departure.time".as("depatureTime"), $"stopTimeUpdate.stopId")

        val trip_update_ds_unnest2 = trip_update_ds_unnest
        .withColumn("numOfFutureStops", size($"arrivalTime"))
        .withColumnRenamed("stopId","futureStopIds")

        val joined_ds = trip_update_ds_unnest2
        .join(trip_status_ds_unnest, Seq("tripId","routeId","startTime","startDate"))
        .withColumn("startTime",(col("startTime").cast("timestamp")))
        .withColumn("currentTs",from_unixtime($"timestamp".divide(1000)))
        .drop("startDate").drop("timestamp")

        joined_ds.printSchema()
        
        val query = joined_ds.writeStream
                          .queryName("iceberg_streaming_query")
                          .format("iceberg")
                          .outputMode("append")
                          .trigger(Trigger.ProcessingTime("30 seconds"))
                          .option("path", "glue_catalog1.streamdb.streaming_trips_table") //database.table_name
                          .option("checkpointLocation", "/user/hadoop/checkpoint")
                          .start()
        
      query.awaitTermination()
        
    }
}

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

defined object MTASubwayTripUpdates


In [None]:
MTASubwayTripUpdates.start

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…