# Create Spark Session

In [2]:
# Create the Spark Session
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Streaming Process Files") \
    .config("spark.streaming.stopGracefullyOnShutdown", True) \
    .master("local[*]") \
    .getOrCreate()

spark

# Create the DataFrameStreamReader dataframe

In [None]:
# To allow automatic schemaInference while reading
spark.conf.set("spark.sql.streaming.schemaInference", True)

# Create the streaming_df to read from input directory
streaming_df = spark.readStream\
    .format("json") \
    .option("cleanSource", "archive") \
    .option("sourceArchiveDir", "data/archive/") \
    .option("maxFilesPerTrigger", 1) \
    .option("multiline","true")\
    .option("inferSchema", "true")\
    .load("data/input-05111940000188/")

In [None]:
# To the schema of the data, place a sample json file and change readStream to read 
streaming_df.printSchema()
streaming_df.show(truncate=False)

# Flatten the data

In [5]:
# Lets explode the data as devices contains list/array of device reading
from pyspark.sql.functions import explode, col

exploded_df = streaming_df \
    .select("link", "headline", "category", "short_description", "authors", "date")
    # .withColumn("devices", explode("data.devices")) \
    # .drop("data")

In [6]:
# Flatten the exploded df
flattened_df = exploded_df \
    .selectExpr("link", "headline", "category", "short_description", "authors", "date") 

In [None]:
# Write the output to console sink to check the output
writing_df = flattened_df.writeStream \
    .format("json") \
    .option("path", "data/output-05111940000188") \
    .option("checkpointLocation","checkpoint_dir") \
    .outputMode("append") \
    .start()
    
# Start the streaming application to run until the following happens
# 1. Exception in the running program
# 2. Manual Interruption
writing_df.awaitTermination()

In [3]:
# Check the data at the output location

out_df = spark.read.json("data/output-05111940000188/")
out_df.show(truncate=False)

+---------------------------------------+---------+----------+----------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|authors                                |category |date      |headline                                                                    |link                                                                                                          |short_description                                                                                                                                                                                       |
+---------------------------------------+---------+----------+----------------------------------