In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Writing to Multiple Sinks")
    .config("spark.streaming.stopGracefullyOnShutdown", True)
    .config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0,org.postgresql:postgresql:42.7.3"
    )
    .config("spark.sql.shuffle.partitions", 8)
    .master("local[*]")
    .getOrCreate()
)

spark

In [6]:
# from py4j.java_gateway import java_import
# from pyspark.sql import SparkSession

# spark = SparkSession.builder.getOrCreate()

# # Import Scala JavaConversions
# java_import(spark._jvm, "scala.collection.JavaConverters")

# # Get the jars as a Scala Seq
# jars_seq = spark.sparkContext._jsc.sc().listJars()

# # Convert Scala Seq to Python list
# jars_list = list(spark._jvm.scala.collection.JavaConverters.seqAsJavaList(jars_seq))
# for jar in jars_list:
#     print(jar)


In [7]:
# # Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers","ed-kafka:29092")
    .option("subscribe","device-data")
    .option("startingOffsets","earliest")
    .load()
)

In [8]:
# View schema for raw kafka_df
kafka_df.printSchema()
#kafka_df.show()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [9]:
# parse value from binary to string into kafka_json_df
from pyspark.sql.functions import expr
kafka_json_df = kafka_df.withColumn("value",expr("cast(value as string)"))

In [10]:
#kafka_json_df.show()

In [11]:
# Schema of the Pyaload

from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

json_schema = (
    StructType(
    [StructField('customerId', StringType(), True), 
    StructField('data', StructType(
        [StructField('devices', 
                     ArrayType(StructType([ 
                        StructField('deviceId', StringType(), True), 
                        StructField('measure', StringType(), True), 
                        StructField('status', StringType(), True), 
                        StructField('temperature', LongType(), True)
                    ]), True), True)
        ]), True), 
    StructField('eventId', StringType(), True), 
    StructField('eventOffset', LongType(), True), 
    StructField('eventPublisher', StringType(), True), 
    StructField('eventTime', StringType(), True)
    ])
)

In [12]:
# apply the schema to payload to read the data
from pyspark.sql.functions import from_json,col
streaming_df = kafka_json_df.withColumn("values_json",from_json(col("value"),json_schema)).selectExpr("values_json.*")

In [13]:
# To the schema of the data, place a sample json file and change readStream to read 
streaming_df.printSchema()
#streaming_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)



In [14]:
# Lets explode the data as devices contains list/array of device reading
from pyspark.sql.functions import explode

exploded_df = streaming_df.withColumn("data_devices", explode("data.devices"))

In [15]:
# Check the schema of the exploded_df, place a sample json file and change readStream to read 
exploded_df.printSchema()
#exploded_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- data_devices: struct (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- measure: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- temperature: long (nullable = true)



In [16]:
# Flatten the exploded df
from pyspark.sql.functions import col

flattened_df = (
    exploded_df
    .drop("data")
    .withColumn("deviceId", col("data_devices.deviceId"))
    .withColumn("measure", col("data_devices.measure"))
    .withColumn("status", col("data_devices.status"))
    .withColumn("temperature", col("data_devices.temperature"))
    .drop("data_devices")
)

In [17]:
# Check the schema of the flattened_df, place a sample json file and change readStream to read 
flattened_df.printSchema()
#flattened_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: long (nullable = true)



In [18]:
# python function to write to multiple sinks
def device_data_output(df,batch_id):
    print("batch id:"+str(batch_id))

    # write to parquet
    df.write.format("parquet").mode("append").save("data/output/device_data.parquet/")


    # write to jdbc postgres
    (
        df.write
        .mode("append")
        .format("jdbc")
        .option("driver", "org.postgresql.Driver")
        .option("url", "jdbc:postgresql://postgres-db:5432/sqlpad")
        .option("dbtable", "device_data")
        .option("user", "sqlpad")
        .option("password", "sqlpad")
        .save()
    
    )

In [None]:
# Running foreachBatch
# Write the output to Multiple Sinks

(flattened_df
 .writeStream
 .foreachBatch(device_data_output)
 .trigger(processingTime='10 seconds')
 .option("checkpointLocation", "checkpoint_dir_kafka")
 .start()
 .awaitTermination())

batch id:5
batch id:6
batch id:7
batch id:8
batch id:9
batch id:10
batch id:11
