In [61]:
from delta import configure_spark_with_delta_pip
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, row_number
from pyspark.sql import Window

In [62]:
jars = """
/opt/workspace/utils/jars/spark-sql-kafka-0-10_2.12-3.4.0.jar,
/opt/workspace/utils/jars/kafka-clients-3.3.2.jar,
/opt/workspace/utils/jars/spark-token-provider-kafka-0-10_2.12-3.4.0.jar,
/opt/workspace/utils/jars/commons-pool2-2.11.1.jar,
/opt/workspace/utils/jars/delta-core_2.12-2.4.0.jar,
/opt/workspace/utils/jars/delta-storage-2.4.0.jar
"""

builder = (
    SparkSession
    .builder
    .master("spark://spark:7077")
    .appName("cleaned")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.jars", jars)
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [63]:
%%markdown

Read delta streaming from raw data


Read delta streaming from raw data


In [64]:
df = spark.readStream.format('delta').load('/opt/workspace/data/raw/test/')

In [65]:
%%markdown

Transform the data


Transform the data


In [66]:
schema = """
payload STRUCT<
    before:STRING,
    after:STRUCT<
        id: STRING, 
        first_name: STRING, 
        last_name: STRING, 
        email: STRING
    >, 
    op: STRING
>
"""

In [67]:
df = (
    df
    .withColumn('json', from_json(col('value'), schema))
    .withColumn('id', col('json.payload.after.id'))
    .withColumn('first_name', col('json.payload.after.first_name'))
    .withColumn('last_name', col('json.payload.after.last_name'))
    .withColumn('email', col('json.payload.after.email'))
    .withColumn('op', col('json.payload.op'))
    .select(
        'id',
        'first_name',
        'last_name',
        'email',
        'op',
        'timestamp'
    )
)

In [68]:
%%markdown

Write to cleaned path


Write to cleaned path


In [69]:
def merge(micro_df, batch_id):
    delta_df = DeltaTable.forPath(spark, '/opt/workspace/data/cleaned/test/') if DeltaTable.isDeltaTable(spark, '/opt/workspace/data/cleaned/test/') else None

    w = Window.partitionBy("id").orderBy(col("timestamp").desc())
    micro_df = (
        micro_df
        .withColumn("rn", row_number().over(w))
        .filter("rn = 1")
        .drop("rn")
    )
    
    if delta_df:
        (
            delta_df.alias('t')
            .merge(
                micro_df.alias('s'),
                't.id = s.id'
            )
            .whenMatchedUpdateAll()
            .whenNotMatchedInsertAll()
        )
    else:
        micro_df.write.format('delta').save('/opt/workspace/data/cleaned/test/')

In [70]:
query = (
    df
    .writeStream
    .format('delta')
    .foreachBatch(merge)
    .option("checkpointLocation", '/opt/workspace/data/cleaned/test/checkpoint/')
    .trigger(once=True)
    .start('/opt/workspace/data/cleaned/test/')
)

query.awaitTermination()

In [56]:
spark.stop()