In [1]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Streaming Process Files") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .master("local[*]") \
    .getOrCreate()

spark

In [2]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

In [3]:
# To allow automatic schemaInference while reading
spark.conf.set("spark.sql.streaming.schemaInference", True)

# Create the streaming_df to read from input directory
streaming_df = spark.readStream\
    .format("json") \
    .option("cleanSource", "archive") \
    .option("sourceArchiveDir", "data/archive/device_data/") \
    .option("maxFilesPerTrigger", 1) \
    .load("C:/Users/kevin/Documents/news")

In [None]:
streaming_df.writeStream.format("console").outputMode("append").start().awaitTermination()
streaming_df.printSchema()
streaming_df.show(truncate=False)

In [None]:
# Lets explode the data as devices contains list/array of device reading
from pyspark.sql.functions import explode, col

exploded_df = streaming_df \
    .select("customerId", "eventId", "eventOffset", "eventPublisher", "eventTime", "data") \
    .withColumn("devices", explode("data.devices")) \
    .drop("data")