In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

sc = SparkSession.builder.appName("word-count").master("local").getOrCreate()
sc
sc.conf.set("spark.sql.shuffle.partitions", 5)
sc.conf.set("spark.sql.streaming.schemaInference", True)

In [2]:
df = (
    sc.readStream
    .option("cleanSource", "archive")
    .option("sourceArchiveDir", "archived_files")
    .option("maxFilesPerTrigger", 1)
    .format("json")
    .load("data/event_data/")
)
df.printSchema()

root
 |-- customerId: string (nullable = true)
 |-- data: struct (nullable = true)
 |    |-- devices: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- deviceId: string (nullable = true)
 |    |    |    |-- measure: string (nullable = true)
 |    |    |    |-- status: string (nullable = true)
 |    |    |    |-- temperature: long (nullable = true)
 |-- eventId: string (nullable = true)
 |-- eventOffset: long (nullable = true)
 |-- eventPublisher: string (nullable = true)
 |-- eventTime: string (nullable = true)



In [3]:
from pyspark.sql.functions import *
mid_flat_df = df.withColumn("devices", explode(col("data.devices"))).drop(col("data"))

In [4]:
flat_df =( mid_flat_df
            .withColumn("deviceId", col("devices.deviceId"))
            .withColumn("measure", col("devices.measure"))
            .withColumn("status", col("devices.status"))
            .withColumn("temperature", col("devices.temperature"))
            .drop("devices")
)

In [None]:
# flat_df.writeStream.format("console").outputMode("append").start().awaitTermination()
(flat_df
    .writeStream
    .format("csv")
    .outputMode("append")
    .option("path", "data/output/device_data.csv")
    .option("checkpointLocation", "checkpointDir")
    .start()
    .awaitTermination()
)