# Writing a Kafka Stream to Delta Lake with Spark Structured Streaming

In [None]:
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", subscribeTopic)
    .load()
)

In [None]:
schema = StructType([
 StructField("student_name", StringType()),
 StructField("graduation_year", StringType()),
 StructField("major", StringType()),
])


In [None]:
def with_normalized_names(df, schema):
    parsed_df = (
        df.withColumn("json_data", from_json(col("value").cast("string"), schema))
        .withColumn("student_name", col("json_data.student_name"))
        .withColumn("graduation_year", col("json_data.graduation_year"))
        .withColumn("major", col("json_data.major"))
        .drop(col("json_data"))
        .drop(col("value"))
    )
    split_col = split(parsed_df["student_name"], "XX")
    return (
        parsed_df.withColumn("first_name", split_col.getItem(0))
        .withColumn("last_name", split_col.getItem(1))
        .drop("student_name")
    )


In [None]:
def perform_available_now_update():
    checkpointPath = "data/tmp_students_checkpoint/"
    deltaPath = "data/tmp_students_delta"
    return df.transform(lambda df: with_normalized_names(df)).writeStream.trigger(
        availableNow=True
    ).format("delta").option("checkpointLocation", checkpointPath).start(deltaPath)


In [None]:
perform_available_now_update()
spark.read.format("delta").load(deltaPath).show()


In [None]:
## Read Kafka stream and write to a Delta Lake table continuously
def perform_trigger_fixed_interval_update():
    checkpointPath = "data/tmp_students_checkpoint/"
    deltaPath = "data/tmp_students_delta"
    return df.transform(lambda df: with_normalized_names(df)).writeStream.trigger(
        processingTime='10 seconds'
    ).format("delta").option("checkpointLocation", checkpointPath).start(deltaPath)
