# Prerrequisites

In [1]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/home/jovyan/spark-3.0.2-bin-hadoop3.2"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages io.delta:delta-core_2.12:0.8.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.2 --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog pyspark-shell'

In [2]:
import findspark
findspark.init()

# Set up Spark Session

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession \
    .builder \
    .appName("Challenge") \
    .getOrCreate()

spark

# Connection to Kafka

In [5]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType
import pyspark.sql.functions as f

schema_customers= StructType([
    StructField('id', StringType(), True),
    StructField('first_name', StringType(), True),
    StructField('last_name', StringType(), True),
    StructField('email', StringType(), True),
    StructField('__deleted', StringType(), True)
])

Schema= StructType([
                StructField('schema', StringType()),
                StructField('payload', schema_customers)])


In [6]:
df_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka:9092") \
  .option("subscribe", "dbserver1.inventory.customers") \
  .option("startingOffsets", "earliest") \
  .load() \
  .select(f.from_json(f.col("value").cast("string"), Schema).alias("parsed_value"))
  

print(df_stream.printSchema())

root
 |-- parsed_value: struct (nullable = true)
 |    |-- schema: string (nullable = true)
 |    |-- payload: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- first_name: string (nullable = true)
 |    |    |-- last_name: string (nullable = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- __deleted: string (nullable = true)

None


In [7]:
df_stream.select("parsed_value.payload.*").writeStream\
    .outputMode("update")\
    .format("memory")\
    .queryName("Customers_query")\
    .start()

21/08/12 08:22:50 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a896a585-3aaf-4e88-b8e2-e3fbbda1c189. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7f792c62dfa0>

In [None]:
spark.sql("SELECT * FROM Customers_query").show(truncate=False)

In [10]:
spark.sql("""
CREATE TABLE customers
(id int, first_name string, last_name string, email string, __deleted boolean)
USING delta
LOCATION 'hdfs://namenode:9000/*****'
""")

                                                                                

DataFrame[]

In [11]:
from delta.tables import *

deltaTable_customers = DeltaTable.forPath(spark, "hdfs://namenode:9000/*****")

# Function to upsert microBatchOutputDF into Delta table using merge
def upsertCustomersToDelta(microBatchOutputDF, batchId):
  deltaTable_customers.alias("t").merge(
      microBatchOutputDF.alias("s"),
      "s.id = t.id") \
      .whenMatchedDelete(condition = "s.__deleted = 'true'") \
      .whenMatchedUpdate(set = { "id": "s.id", "first_name": "s.first_name", "last_name": "s.last_name", "email": "s.email" } ) \
      .whenNotMatchedInsert(values = {"id": "s.id", "first_name": "s.first_name", "last_name": "s.last_name", "email": "s.email"}) \
      .execute()

In [None]:
df_stream.select("parsed_value.payload.*").writeStream\
    .format("delta") \
    .foreachBatch(upsertCustomersToDelta) \
    .outputMode("append") \
    .option("checkpointLocation","hdfs://namenode:9000/*****") \
    .start("hdfs://namenode:9000/*****")
    