#### Streaming Log Processing

This is a simple exercise in log processing.  The log files come from various servers at various time points.
Each record in a log file is of the form

```serverID,severity,timestamp```

* serverID is a string unique to the server
* severity is a value 2 (no error, just a service call), or 1 (minor error), or 0 (fatal/severe error)
* timestamp is an integer starting at 1 (bigger numbers mean later)

For this exercise these log files will be "delivered" by being placed in a directory, for example ```/FileStore/tables/logdata-live```.
The log files for this small example have two servers, s1 and s2 and log records for times 1 through 10.
The files are delivered with one file per server for five time units.  For example, the file s115.csv has records for server 1 for times 1 through 5.

You want to process these new records incrementally, and are interested in these two "reports"

*  The *volume report* reports by server the number of SEV2 events divided by the number of time units.  The number of time units for our purposes is (max(timestamp) - min(timestamp)) + 1.  This volume report will not be cumulative -- i.e. every time new log data comes in, the mapping from server to sev2 volume is updated
* The *sev0 log* -- this is a sequence of records of the form ```serverID timestamp``` recording a SEV0 event reported by the server.  This report grows over time -- each time a new log file is processed, new records are appended to the end.

Your final result should be two streaming queries
* One that *modifies* the volume report, which is stored in memory
* One that *appends to* the sev0 log, which is stored as a Parquet file

In [0]:
from pyspark.sql.types import StructField, StructType, StringType, LongType
from pyspark.sql.functions import col, min, max, count

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

In [0]:
LOG_DATA_PATH_SOURCE = "/FileStore/tables/LogData-source/"

LOG_DATA_PATH = "/FileStore/tables/LogData/"

SEV0_OUTPUT = "/FileStore/tables/LogData-output/"
SEV0_CHECKPOINTS = "/FileStore/tables/LogData-checkpoints/"

In [0]:
# Verify that there are four log files in your staging directory
# If using EMR notebook, do this at the AWS console and leave this cell blank
dbutils.fs.ls(LOG_DATA_PATH_SOURCE)

In [0]:
# Create the schema for the log files
logSchema = StructType([
    StructField("serverID", StringType(), True),
    StructField("severity", LongType(), True),
    StructField("timestamp", LongType(), True)
])

In [0]:
# Create the streaming DataFrame (readStream) on your log directory, using the schema you just created
logStream = spark.readStream \
    .schema(logSchema) \
    .option("maxFilesPerTrigger", 1) \
    .csv(LOG_DATA_PATH)

In [0]:
# Use the data frame you just created to create another data frame with the 
# sev2 volume report.  It should have columns 'serverID' and 'avgVolume'
volumeStream = logStream \
    .filter(col("severity") == 2) \
    .groupBy("serverID") \
    .agg(min("timestamp"), max("timestamp"), count("timestamp")) \
    .withColumn("avgVolume", col("count(timestamp)") / (col("max(timestamp)") - col("min(timestamp)") + 1)) \
    .select(col("serverID"), col("avgVolume"))

In [0]:
# Create and start a query (writeStream) that generates the sev2 report;  it is an in-memory sink.
volumeQuery = volumeStream.writeStream \
    .queryName("sev2_volume") \
    .format("memory") \
    .outputMode("complete") \
    .start()

In [0]:
# Clean up the data
dbutils.fs.rm(LOG_DATA_PATH, recurse=True)
dbutils.fs.mkdirs(LOG_DATA_PATH)

In [0]:
# Write a (very simple) spark SQL query to show the contents of your query.  It should initially be empty
spark.sql("SELECT * FROM sev2_volume").show()

In [0]:
# Copy two files into your 'live data' directory for both servers, for both servers and time period 1 through 5
# If using EMR notebook, do this at the AWS console and leave this cell blank
dbutils.fs.cp(LOG_DATA_PATH_SOURCE + "/s115.csv", LOG_DATA_PATH + "/")
dbutils.fs.cp(LOG_DATA_PATH_SOURCE + "/s215.csv", LOG_DATA_PATH + "/")

In [0]:
# Rerun the query to show that the sev2 volume report has been updated
spark.sql("SELECT * FROM sev2_volume").show()

In [0]:
# Now copy the log files for times 6 to 10
# If using EMR notebook, do this at the AWS console and leave this cell blank
dbutils.fs.cp(LOG_DATA_PATH_SOURCE + "/s1610.csv", LOG_DATA_PATH + "/")
dbutils.fs.cp(LOG_DATA_PATH_SOURCE + "/s2610.csv", LOG_DATA_PATH + "/")

In [0]:
# Run the query again to verify that the report was updated. Be sure to wait for a little while
# to make sure the query is updated.
spark.sql("SELECT * FROM sev2_volume").show()

#### The SEV0 log

In [0]:
# Delete all files from your "live" directory
# If using EMR notebook, do this at the AWS console and leave this cell blank
# Clean up the data
dbutils.fs.rm(LOG_DATA_PATH, recurse=True)
dbutils.fs.mkdirs(LOG_DATA_PATH)

dbutils.fs.rm(SEV0_OUTPUT, recurse=True)
dbutils.fs.mkdirs(SEV0_OUTPUT)

dbutils.fs.rm(SEV0_CHECKPOINTS, recurse=True)
dbutils.fs.mkdirs(SEV0_CHECKPOINTS)

In [0]:
# Create a data frame on top of your original data frame that holds the raw data, 
# this data frame for the sev0 report is just <serverID> <time stamp>
errorStream = logStream \
    .filter(col("severity") == 0) \
    .select(col("serverID"), col("timestamp"))

In [0]:
# Create a query on your sev0 data frame that writes the table to a parquet file, 
#  appending new records to the file
# https://stackoverflow.com/questions/55859868/pyspark-structured-streaming-write-to-parquet-in-batches
errorQuery = errorStream.writeStream \
    .queryName("sev0_log") \
    .format("parquet") \
    .outputMode("append") \
    .option("path", SEV0_OUTPUT) \
    .option("checkpointLocation", SEV0_CHECKPOINTS) \
    .start()

In [0]:
# Display the query content by reading the parquet file (it should be empty)
spark.read.parquet(SEV0_OUTPUT).show()

In [0]:
# Copy in the files for timestamp 1 through 5
# If using EMR notebook, do this at the AWS console and leave this cell blank
dbutils.fs.cp(LOG_DATA_PATH_SOURCE + "/s115.csv", LOG_DATA_PATH + "/")
dbutils.fs.cp(LOG_DATA_PATH_SOURCE + "/s215.csv", LOG_DATA_PATH + "/")

In [0]:
# Display the query again by reading the parquet file.  Are there new records?
spark.read.parquet(SEV0_OUTPUT).show()

In [0]:
# Copy in the files for timestamp 6 through 10
# If using EMR notebook, do this at the AWS console and leave this cell blank
dbutils.fs.cp(LOG_DATA_PATH_SOURCE + "/s1610.csv", LOG_DATA_PATH + "/")
dbutils.fs.cp(LOG_DATA_PATH_SOURCE + "/s2610.csv", LOG_DATA_PATH + "/")
dbutils.fs.ls(LOG_DATA_PATH)

In [0]:
# Display the query again by reading the parquet file.  Are there new records?
# dbutils.fs.ls(SEV0_OUTPUT)
spark.read.parquet(SEV0_OUTPUT).show()

In [0]:
# Be tidy, stop all your streaming queries!
volumeQuery.stop()
errorQuery.stop()

In [0]:
# Verify that there are no active streams
spark.streams.active