Below code will download the required JAR file

In [None]:
%%sh
wget https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.20/postgresql-42.2.20.jar
mkdir -p /home/jovyan/.ivy2/jars/
mv postgresql-42.2.20.jar /home/jovyan/.ivy2/jars/org.postgresql_postgresql-42.2.20.jar
ls -ltr /home/jovyan/.ivy2/jars/

Run the below SQL Query to create the required tables in Postgres DB. Make sure to select connection as Postgres from the dropdown. You can access SQLPAD to query Postgres DB using `https://localhost:3000`. Login credentials for SQLPAD `USER: admin@sqlpad.com PASSWORD: admin`
```sql
CREATE TABLE public.device_data (
	customerid varchar,
	eventid varchar,
	eventoffset varchar,
	eventpublisher varchar,
	eventtime varchar,
	deviceid varchar,
	measure varchar,
	status varchar,
	temperature varchar
);
```

In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = (
    SparkSession 
    .builder 
    .appName("Writing to Multiple Sinks") 
    .config("spark.streaming.stopGracefullyOnShutdown", True) 
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0')
    .config('spark.jars', '/home/jovyan/.ivy2/jars/org.postgresql_postgresql-42.2.20.jar')
    .config("spark.sql.shuffle.partitions", 8)
    .master("local[*]") 
    .getOrCreate()
)

spark

In [2]:
# Create the kafka_df to read from kafka

kafka_df = (
    spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "ed-kafka:29092")
    .option("subscribe", "device-data")
    .option("startingOffsets", "earliest")
    .load()
)


In [3]:
# View schema for raw kafka_df
kafka_df.printSchema()
#kafka_df.show()
#kafka_df.rdd.getNumPartitions()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [4]:
# Parse value from binay to string into kafka_json_df
from pyspark.sql.functions import expr

kafka_json_df = kafka_df.withColumn("value", expr("cast(value as string)"))

In [5]:
# Schema of the Pyaload

from pyspark.sql.types import StringType, StructField, StructType, ArrayType, LongType

json_schema = (
    StructType(
    [StructField('customerId', StringType(), True), 
    StructField('data', StructType(
        [StructField('devices', 
                     ArrayType(StructType([ 
                        StructField('deviceId', StringType(), True), 
                        StructField('measure', StringType(), True), 
                        StructField('status', StringType(), True), 
                        StructField('temperature', LongType(), True)
                    ]), True), True)
        ]), True), 
    StructField('eventId', StringType(), True), 
    StructField('eventOffset', LongType(), True), 
    StructField('eventPublisher', StringType(), True), 
    StructField('eventTime', StringType(), True)
    ])
)

In [6]:
# Apply the schema to payload to read the data
from pyspark.sql.functions import from_json,col

streaming_df = kafka_json_df.withColumn("values_json", from_json(col("value"), json_schema)).selectExpr("values_json.*")

In [7]:
# To the schema of the data, place a sample json file and change readStream to read 
streaming_df.printSchema()
#streaming_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)



In [8]:
# Lets explode the data as devices contains list/array of device reading
from pyspark.sql.functions import explode

exploded_df = streaming_df.withColumn("data_devices", explode("data.devices"))


In [9]:
# Check the schema of the exploded_df, place a sample json file and change readStream to read 
exploded_df.printSchema()
#exploded_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- data_devices: struct (nullable = true)
 |    |-- deviceId: string (nullable = true)
 |    |-- measure: string (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- temperature: long (nullable = true)



In [18]:
# Flatten the exploded df
from pyspark.sql.functions import col

flattened_df = (
    exploded_df
    .drop("data")
    .withColumn("deviceId", col("data_devices.deviceId"))
    .withColumn("measure", col("data_devices.measure"))
    .withColumn("status", col("data_devices.status"))
    .withColumn("temperature", col("data_devices.temperature"))
    .drop("data_devices")
)


In [11]:
# Check the schema of the flattened_df, place a sample json file and change readStream to read 
flattened_df.printSchema()
#flattened_df.show(truncate=False)

root
 |-- customerId: string (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)
 |-- deviceId: string (nullable = true)
 |-- measure: string (nullable = true)
 |-- status: string (nullable = true)
 |-- temperature: long (nullable = true)



In [None]:
# Python function to write to multiple sinks
def device_data_output(df, batch_id):
    print("Batch id: "+ str(batch_id))
    
    # Write to parquet
    df.write.format("parquet").mode("append").save("data/output/device_data.parquet/")
    
    
    # Write to JDBC Postgres
    (
        df.write
        .mode("append")
        .format("jdbc")
        .option("driver", "org.postgresql.Driver")
        .option("url", "jdbc:postgresql://postgres-db:5432/sqlpad")
        .option("dbtable", "device_data")
        .option("user", "sqlpad")
        .option("password", "sqlpad")
        .save()
    
    )
    
    # Diplay
    df.show()
    



In [None]:
# Running foreachBatch
# Write the output to Multiple Sinks

(flattened_df
 .writeStream
 .foreachBatch(device_data_output)
 .trigger(processingTime='10 seconds')
 .option("checkpointLocation", "checkpoint_dir_kafka")
 .start()
 .awaitTermination())

Batch id: 0
+----------+-------+-----------+--------------+---------+--------+-------+------+-----------+
|customerId|eventId|eventOffset|eventPublisher|eventTime|deviceId|measure|status|temperature|
+----------+-------+-----------+--------------+---------+--------+-------+------+-----------+
+----------+-------+-----------+--------------+---------+--------+-------+------+-----------+

Batch id: 1
+----------+--------------------+-----------+--------------+--------------------+--------+-------+-------+-----------+
|customerId|             eventId|eventOffset|eventPublisher|           eventTime|deviceId|measure| status|temperature|
+----------+--------------------+-----------+--------------+--------------------+--------+-------+-------+-----------+
|   CI00118|209cab2d-7934-4ad...|      10010|        device|2023-01-05 11:13:...|    D002|      C|SUCCESS|         27|
|   CI00118|209cab2d-7934-4ad...|      10010|        device|2023-01-05 11:13:...|    D005|      C|STANDBY|         23|
+---