# ASSIGNMENT 2 B - SPARK STRUCTURED STREAMING
### Sandeep Sethumadhavan
### 30980283



Importing required libraries and setting up environment

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, decode, expr, explode, split
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import datetime as dt
from datetime import datetime
from pyspark.sql.functions import concat,col,lit
from pyspark.ml import PipelineModel
import calendar
from datetime import timedelta

Creating spark session using sparkconf object

In [2]:
conf = SparkConf().setAppName("Spark Stream Assignment").setMaster("local[2]")
spark = SparkSession.builder.config(conf = conf).getOrCreate()
spark.conf.set("spark.sql.session.timeZone", "UTC")

Ingesting data from the kafka producers

In [3]:
topic1 = "Memory_Producer"
topic2 = "Process_Producer"

#memory producer
memory_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic1) \
    .load()

#process producer
process_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic2) \
    .load()

Flattening the dataframe

In [4]:
process_df = process_stream.selectExpr("CAST(key AS STRING) AS key_process", "CAST(value AS STRING) AS value_process")
memory_df= memory_stream.selectExpr("CAST(key AS STRING) AS key_memory", "CAST(value AS STRING) AS value_memory")
process_df = process_df.select(F.col("value_process").cast("string"))
memory_df = memory_df.select(F.col("value_memory").cast("string"))

Defining two schemas for data preprocessing before setting up the actual schema.

In [5]:
#Define the schema for the structured datastream received
schema1 = ArrayType(StructType([    
    StructField('sequence', StringType(), True), 
    StructField('machine', StringType(), True),
    StructField('PID', StringType(), True),
    StructField('TRUN', StringType(), True), 
    StructField('TSLPI', StringType(), True),
    StructField('TSLPU', StringType(), True),
    StructField('POLI', StringType(), True), 
    StructField('NICE', StringType(), True),
    StructField('PRI', StringType(), True),
    StructField('RTPR', StringType(), True), 
    StructField('CPUNR', StringType(), True),
    StructField('Status', StringType(), True),
    StructField('EXC', StringType(), True), 
    StructField('State', StringType(), True),
    StructField('CPU', StringType(), True),
    StructField('CMD', StringType(), True), 
    StructField('ts', IntegerType(), True),
    
]))

#Define the schema for the structured datastream received
schema2 = ArrayType(StructType([    
    StructField('sequence', StringType(), True), 
    StructField('machine', StringType(), True),
    StructField('PID', StringType(), True),
    StructField('MINFLT', StringType(), True), 
    StructField('MAJFLT', StringType(), True),
    StructField('VSTEXT', StringType(), True),
    StructField('VSIZE', StringType(), True), 
    StructField('RSIZE', StringType(), True),
    StructField('VGROW', StringType(), True),
    StructField('RGROW', StringType(), True), 
    StructField('MEM', StringType(), True),
    StructField('CMD', StringType(), True),
    StructField('ts', IntegerType(), True)
]))

Flattening the dataframe again

In [6]:
process_df = process_df.select(F.from_json(F.col("value_process").cast("string"), schema1).alias('parsed_value'))
memory_df = memory_df.select(F.from_json(F.col("value_memory").cast("string"), schema2).alias('parsed_value'))
process_df = process_df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))      
memory_df = memory_df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))      

#renaming the columns
process_df_formatted = process_df.select(
                    F.col("unnested_value.sequence").alias("sequence"),
                    F.col("unnested_value.machine").alias("machine"),
                    F.col("unnested_value.PID").alias("PID"),
                    F.col("unnested_value.TRUN").alias("TRUN"),
                    F.col("unnested_value.TSLPI").alias("TSLPI"),
                    F.col("unnested_value.TSLPU").alias("TSLPU"),
                    F.col("unnested_value.POLI").alias("POLI"),
                    F.col("unnested_value.NICE").alias("NICE"),
                    F.col("unnested_value.PRI").alias("PRI"),
                    F.col("unnested_value.RTPR").alias("RTPR"),
                    F.col("unnested_value.CPUNR").alias("CPUNR"),
                    F.col("unnested_value.Status").alias("Status"),
                    F.col("unnested_value.EXC").alias("EXC"),
                    F.col("unnested_value.State").alias("State"),
                    F.col("unnested_value.CPU").alias("CPU"),
                    F.col("unnested_value.CMD").alias("CMD"),
                    F.col("unnested_value.ts").alias("ts")
                )

#renaming the columns
memory_df_formatted = memory_df.select(
                    F.col("unnested_value.sequence").alias("sequence"),
                    F.col("unnested_value.machine").alias("machine"),
                    F.col("unnested_value.PID").alias("PID"),
                    F.col("unnested_value.MINFLT").alias("MINFLT"),
                    F.col("unnested_value.MAJFLT").alias("MAJFLT"),
                    F.col("unnested_value.VSTEXT").alias("VSTEXT"),
                    F.col("unnested_value.VSIZE").alias("VSIZE"),
                    F.col("unnested_value.RSIZE").alias("RSIZE"),
                    F.col("unnested_value.VGROW").alias("VGROW"),
                    F.col("unnested_value.RGROW").alias("RGROW"),
                    F.col("unnested_value.MEM").alias("MEM"),
                    F.col("unnested_value.CMD").alias("CMD"),
                    F.col("unnested_value.ts").alias("ts")
                )

Dataframe has been flattened and has the required column names. Now to preprocess the data before assigning the final schema

In [7]:
memory_df_formatted.printSchema()

root
 |-- sequence: string (nullable = true)
 |-- machine: string (nullable = true)
 |-- PID: string (nullable = true)
 |-- MINFLT: string (nullable = true)
 |-- MAJFLT: string (nullable = true)
 |-- VSTEXT: string (nullable = true)
 |-- VSIZE: string (nullable = true)
 |-- RSIZE: string (nullable = true)
 |-- VGROW: string (nullable = true)
 |-- RGROW: string (nullable = true)
 |-- MEM: string (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: integer (nullable = true)



In [8]:
process_df_formatted.printSchema()

root
 |-- sequence: string (nullable = true)
 |-- machine: string (nullable = true)
 |-- PID: string (nullable = true)
 |-- TRUN: string (nullable = true)
 |-- TSLPI: string (nullable = true)
 |-- TSLPU: string (nullable = true)
 |-- POLI: string (nullable = true)
 |-- NICE: string (nullable = true)
 |-- PRI: string (nullable = true)
 |-- RTPR: string (nullable = true)
 |-- CPUNR: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- EXC: string (nullable = true)
 |-- State: string (nullable = true)
 |-- CPU: string (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: integer (nullable = true)



Creating user defined functions for cleaning the data

In [9]:
spaceDeleteUDF = udf(lambda s: s.replace(" ", ""), StringType()) # user defined function to remove white spaces.
changekUDF = udf(lambda s: s.replace("K", "000"), StringType()) # user defined function to change K to 000.
changeMUDF = udf(lambda s: s.replace("M", "000000"), StringType()) # user defined function to change M to 000000.
changeGUDF = udf(lambda s: s.replace("G", "000000000"), StringType()) # user defined function to change G to 000000000.

Executing the udf's on all the required columns

In [10]:
process_df_formatted=process_df_formatted.withColumn("sequence", spaceDeleteUDF("sequence"))
process_df_formatted=process_df_formatted.withColumn("PID", spaceDeleteUDF("PID"))
process_df_formatted=process_df_formatted.withColumn("TRUN", spaceDeleteUDF("TRUN"))
process_df_formatted=process_df_formatted.withColumn("TSLPI", spaceDeleteUDF("TSLPI"))
process_df_formatted=process_df_formatted.withColumn("TSLPU", spaceDeleteUDF("TSLPU"))
process_df_formatted=process_df_formatted.withColumn("NICE", spaceDeleteUDF("NICE"))
process_df_formatted=process_df_formatted.withColumn("PRI", spaceDeleteUDF("PRI"))
process_df_formatted=process_df_formatted.withColumn("RTPR", spaceDeleteUDF("RTPR"))
process_df_formatted=process_df_formatted.withColumn("CPUNR", spaceDeleteUDF("CPUNR"))
process_df_formatted=process_df_formatted.withColumn("EXC", spaceDeleteUDF("EXC"))
process_df_formatted=process_df_formatted.withColumn("CPU", spaceDeleteUDF("CPU"))

memory_df_formatted=memory_df_formatted.withColumn("VGROW", spaceDeleteUDF("VGROW"))
memory_df_formatted=memory_df_formatted.withColumn("RGROW", spaceDeleteUDF("RGROW"))
memory_df_formatted=memory_df_formatted.withColumn("RSIZE", spaceDeleteUDF("RSIZE"))
memory_df_formatted=memory_df_formatted.withColumn("VSTEXT", spaceDeleteUDF("VSTEXT"))
memory_df_formatted=memory_df_formatted.withColumn("MAJFLT", spaceDeleteUDF("MAJFLT"))
memory_df_formatted=memory_df_formatted.withColumn("MINFLT", spaceDeleteUDF("MINFLT"))
memory_df_formatted=memory_df_formatted.withColumn("sequence", spaceDeleteUDF("sequence"))
memory_df_formatted=memory_df_formatted.withColumn("PID", spaceDeleteUDF("PID"))
memory_df_formatted=memory_df_formatted.withColumn("VSIZE", spaceDeleteUDF("VSIZE"))
memory_df_formatted=memory_df_formatted.withColumn("MEM", spaceDeleteUDF("MEM"))


In [11]:
process_df_formatted=process_df_formatted.withColumn("sequence", changekUDF("sequence"))
process_df_formatted=process_df_formatted.withColumn("PID", changekUDF("PID"))
process_df_formatted=process_df_formatted.withColumn("TRUN", changekUDF("TRUN"))
process_df_formatted=process_df_formatted.withColumn("TSLPI", changekUDF("TSLPI"))
process_df_formatted=process_df_formatted.withColumn("TSLPU", changekUDF("TSLPU"))
process_df_formatted=process_df_formatted.withColumn("NICE", changekUDF("NICE"))
process_df_formatted=process_df_formatted.withColumn("PRI", changekUDF("PRI"))
process_df_formatted=process_df_formatted.withColumn("RTPR", changekUDF("RTPR"))
process_df_formatted=process_df_formatted.withColumn("CPUNR", changekUDF("CPUNR"))
process_df_formatted=process_df_formatted.withColumn("EXC", changekUDF("EXC"))
process_df_formatted=process_df_formatted.withColumn("CPU", changekUDF("CPU"))

memory_df_formatted=memory_df_formatted.withColumn("VGROW", changekUDF("VGROW"))
memory_df_formatted=memory_df_formatted.withColumn("RGROW", changekUDF("RGROW"))
memory_df_formatted=memory_df_formatted.withColumn("RSIZE", changekUDF("RSIZE"))
memory_df_formatted=memory_df_formatted.withColumn("VSTEXT", changekUDF("VSTEXT"))
memory_df_formatted=memory_df_formatted.withColumn("MAJFLT", changekUDF("MAJFLT"))
memory_df_formatted=memory_df_formatted.withColumn("MINFLT", changekUDF("MINFLT"))
memory_df_formatted=memory_df_formatted.withColumn("sequence", changekUDF("sequence"))
memory_df_formatted=memory_df_formatted.withColumn("PID", changekUDF("PID"))
memory_df_formatted=memory_df_formatted.withColumn("VSIZE", changekUDF("VSIZE"))
memory_df_formatted=memory_df_formatted.withColumn("MEM", changekUDF("MEM"))


In [12]:
process_df_formatted=process_df_formatted.withColumn("sequence", changeMUDF("sequence"))
process_df_formatted=process_df_formatted.withColumn("PID", changeMUDF("PID"))
process_df_formatted=process_df_formatted.withColumn("TRUN", changeMUDF("TRUN"))
process_df_formatted=process_df_formatted.withColumn("TSLPI", changeMUDF("TSLPI"))
process_df_formatted=process_df_formatted.withColumn("TSLPU", changeMUDF("TSLPU"))
process_df_formatted=process_df_formatted.withColumn("NICE", changeMUDF("NICE"))
process_df_formatted=process_df_formatted.withColumn("PRI", changeMUDF("PRI"))
process_df_formatted=process_df_formatted.withColumn("RTPR", changeMUDF("RTPR"))
process_df_formatted=process_df_formatted.withColumn("CPUNR", changeMUDF("CPUNR"))
process_df_formatted=process_df_formatted.withColumn("EXC", changeMUDF("EXC"))
process_df_formatted=process_df_formatted.withColumn("CPU", changeMUDF("CPU"))

memory_df_formatted=memory_df_formatted.withColumn("VGROW", changeMUDF("VGROW"))
memory_df_formatted=memory_df_formatted.withColumn("RGROW", changeMUDF("RGROW"))
memory_df_formatted=memory_df_formatted.withColumn("RSIZE", changeMUDF("RSIZE"))
memory_df_formatted=memory_df_formatted.withColumn("VSTEXT", changeMUDF("VSTEXT"))
memory_df_formatted=memory_df_formatted.withColumn("MAJFLT", changeMUDF("MAJFLT"))
memory_df_formatted=memory_df_formatted.withColumn("MINFLT", changeMUDF("MINFLT"))
memory_df_formatted=memory_df_formatted.withColumn("sequence", changeMUDF("sequence"))
memory_df_formatted=memory_df_formatted.withColumn("PID", changeMUDF("PID"))
memory_df_formatted=memory_df_formatted.withColumn("VSIZE", changeMUDF("VSIZE"))
memory_df_formatted=memory_df_formatted.withColumn("MEM", changeMUDF("MEM"))


In [13]:
process_df_formatted=process_df_formatted.withColumn("sequence", changeGUDF("sequence"))
process_df_formatted=process_df_formatted.withColumn("PID", changeGUDF("PID"))
process_df_formatted=process_df_formatted.withColumn("TRUN", changeGUDF("TRUN"))
process_df_formatted=process_df_formatted.withColumn("TSLPI", changeGUDF("TSLPI"))
process_df_formatted=process_df_formatted.withColumn("TSLPU", changeGUDF("TSLPU"))
process_df_formatted=process_df_formatted.withColumn("NICE", changeGUDF("NICE"))
process_df_formatted=process_df_formatted.withColumn("PRI", changeGUDF("PRI"))
process_df_formatted=process_df_formatted.withColumn("RTPR", changeGUDF("RTPR"))
process_df_formatted=process_df_formatted.withColumn("CPUNR", changeGUDF("CPUNR"))
process_df_formatted=process_df_formatted.withColumn("EXC", changeGUDF("EXC"))
process_df_formatted=process_df_formatted.withColumn("CPU", changeGUDF("CPU"))

memory_df_formatted=memory_df_formatted.withColumn("VGROW", changeGUDF("VGROW"))
memory_df_formatted=memory_df_formatted.withColumn("RGROW", changeGUDF("RGROW"))
memory_df_formatted=memory_df_formatted.withColumn("RSIZE", changeGUDF("RSIZE"))
memory_df_formatted=memory_df_formatted.withColumn("VSTEXT", changeGUDF("VSTEXT"))
memory_df_formatted=memory_df_formatted.withColumn("MAJFLT", changeGUDF("MAJFLT"))
memory_df_formatted=memory_df_formatted.withColumn("MINFLT", changeGUDF("MINFLT"))
memory_df_formatted=memory_df_formatted.withColumn("sequence", changeGUDF("sequence"))
memory_df_formatted=memory_df_formatted.withColumn("PID", changeGUDF("PID"))
memory_df_formatted=memory_df_formatted.withColumn("VSIZE", changeGUDF("VSIZE"))
memory_df_formatted=memory_df_formatted.withColumn("MEM", changeGUDF("MEM"))



Now transforming data into the required schema

In [16]:
#for process
process_df_formatted = process_df_formatted.withColumn("sequence", process_df_formatted["sequence"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("machine", process_df_formatted["machine"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("PID", process_df_formatted["PID"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("TRUN", process_df_formatted["TRUN"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("TSLPI", process_df_formatted["TSLPI"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("TSLPU", process_df_formatted["TSLPU"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("POLI", process_df_formatted["POLI"].cast(StringType()))
process_df_formatted = process_df_formatted.withColumn("NICE", process_df_formatted["NICE"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("PRI", process_df_formatted["PRI"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("RTPR", process_df_formatted["RTPR"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("CPUNR", process_df_formatted["CPUNR"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("Status", process_df_formatted["Status"].cast(StringType()))
process_df_formatted = process_df_formatted.withColumn("EXC", process_df_formatted["EXC"].cast(IntegerType()))
process_df_formatted = process_df_formatted.withColumn("State", process_df_formatted["State"].cast(StringType()))
process_df_formatted = process_df_formatted.withColumn("CPU", process_df_formatted["CPU"].cast(DoubleType()))
process_df_formatted = process_df_formatted.withColumn("CMD", process_df_formatted["CMD"].cast(StringType()))
process_df_formatted = process_df_formatted.withColumn("ts", process_df_formatted["ts"].cast(IntegerType()))

#for memory
memory_df_formatted = memory_df_formatted.withColumn("sequence", memory_df_formatted["sequence"].cast(IntegerType()))
memory_df_formatted = memory_df_formatted.withColumn("machine", memory_df_formatted["machine"].cast(IntegerType()))
memory_df_formatted = memory_df_formatted.withColumn("PID", memory_df_formatted["PID"].cast(IntegerType()))
memory_df_formatted = memory_df_formatted.withColumn("MINFLT", memory_df_formatted["MINFLT"].cast(IntegerType()))
memory_df_formatted = memory_df_formatted.withColumn("MAJFLT", memory_df_formatted["MAJFLT"].cast(IntegerType()))
memory_df_formatted = memory_df_formatted.withColumn("VSTEXT", memory_df_formatted["VSTEXT"].cast(IntegerType()))
memory_df_formatted = memory_df_formatted.withColumn("VSIZE", memory_df_formatted["VSIZE"].cast(DoubleType()))
memory_df_formatted = memory_df_formatted.withColumn("RSIZE", memory_df_formatted["RSIZE"].cast(DoubleType()))
memory_df_formatted = memory_df_formatted.withColumn("VGROW", memory_df_formatted["VGROW"].cast(DoubleType()))
memory_df_formatted = memory_df_formatted.withColumn("RGROW", memory_df_formatted["RGROW"].cast(DoubleType()))
memory_df_formatted = memory_df_formatted.withColumn("MEM", memory_df_formatted["MEM"].cast(DoubleType()))
memory_df_formatted = memory_df_formatted.withColumn("CMD", memory_df_formatted["CMD"].cast(StringType()))
memory_df_formatted = memory_df_formatted.withColumn("ts", memory_df_formatted["ts"].cast(IntegerType()))

Updating NICE according to value of PRI

In [17]:
process_df_formatted=process_df_formatted.withColumn("NICE", col("PRI")-120)

Now querying the created dataframes to see the output

In [18]:
query = memory_df_formatted \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [19]:
query.stop()

In [20]:
query = process_df_formatted \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [21]:
query.stop()

Checking the dataframes' schema and they follow requirements

In [22]:
process_df_formatted.printSchema()
memory_df_formatted.printSchema()

root
 |-- sequence: integer (nullable = true)
 |-- machine: integer (nullable = true)
 |-- PID: integer (nullable = true)
 |-- TRUN: integer (nullable = true)
 |-- TSLPI: integer (nullable = true)
 |-- TSLPU: integer (nullable = true)
 |-- POLI: string (nullable = true)
 |-- NICE: integer (nullable = true)
 |-- PRI: integer (nullable = true)
 |-- RTPR: integer (nullable = true)
 |-- CPUNR: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- EXC: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- CPU: double (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: integer (nullable = true)

root
 |-- sequence: integer (nullable = true)
 |-- machine: integer (nullable = true)
 |-- PID: integer (nullable = true)
 |-- MINFLT: integer (nullable = true)
 |-- MAJFLT: integer (nullable = true)
 |-- VSTEXT: integer (nullable = true)
 |-- VSIZE: double (nullable = true)
 |-- RSIZE: double (nullable = true)
 |-- VGROW: double (nullable = true)
 |-- RGROW: do

Creating new column CMD_PID

In [23]:
process_df_formatted=process_df_formatted.withColumn('CMD_PID', concat(col("CMD"), lit("_"), col("PID")))
memory_df_formatted=memory_df_formatted.withColumn('CMD_PID', concat(col("CMD"), lit("_"), col("PID")))

Creating new column event_time 

In [24]:
process_df_formatted=process_df_formatted.withColumn('event_time',col("ts") )
memory_df_formatted=memory_df_formatted.withColumn('event_time', col("ts") )

#converting from unix timestamp to UTC date time
process_df_formatted=process_df_formatted.withColumn('event_time', F.from_unixtime('ts').cast(TimestampType()))

memory_df_formatted=memory_df_formatted.withColumn('event_time', F.from_unixtime('ts').cast(TimestampType()))


Allowing 20 second delay on event_time using watermark

In [25]:
process_df_formatted = process_df_formatted.withWatermark("event_time", "20 seconds")  #late arrivals up to 20 seconds allowed

memory_df_formatted = memory_df_formatted.withWatermark("event_time", "20 seconds")  #late arrivals up to 20 seconds allowed


Now querying the updated dataframes to see the output

In [26]:
query = process_df_formatted \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [27]:
query.stop()

In [28]:
query = memory_df_formatted \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [29]:
query.stop()

Persisting into memory using parquets

In [30]:
# Write into parquet files for process
process_parquet = process_df_formatted.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "process.parquet")\
        .option("checkpointLocation", "process.parquet/checkpoint")\
        .start()

In [31]:
process_parquet.stop()

In [32]:
# Write into parquet files for memory
memory_parquet = memory_df_formatted.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "memory.parquet")\
        .option("checkpointLocation", "memory.parquet/checkpoint")\
        .start()

In [33]:
memory_parquet.stop()

Loading the parquet data

In [34]:
# Read the saved parquet data for process
process_parquet_df = spark.read.parquet("process.parquet")
process_parquet_df.printSchema()
process_parquet_df.show()

root
 |-- sequence: integer (nullable = true)
 |-- machine: integer (nullable = true)
 |-- PID: integer (nullable = true)
 |-- TRUN: integer (nullable = true)
 |-- TSLPI: integer (nullable = true)
 |-- TSLPU: integer (nullable = true)
 |-- POLI: string (nullable = true)
 |-- NICE: integer (nullable = true)
 |-- PRI: integer (nullable = true)
 |-- RTPR: integer (nullable = true)
 |-- CPUNR: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- EXC: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- CPU: double (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: integer (nullable = true)
 |-- CMD_PID: string (nullable = true)
 |-- event_time: timestamp (nullable = true)

+--------+-------+-----+----+-----+-----+----+----+---+----+-----+------+---+-----+----+--------------+----------+-------------------+-------------------+
|sequence|machine|  PID|TRUN|TSLPI|TSLPU|POLI|NICE|PRI|RTPR|CPUNR|Status|EXC|State| CPU|           CMD|        ts|           

In [35]:
# Read the saved parquet data for memory
memory_parquet_df = spark.read.parquet("memory.parquet")
memory_parquet_df.printSchema()
memory_parquet_df.show()

root
 |-- sequence: integer (nullable = true)
 |-- machine: integer (nullable = true)
 |-- PID: integer (nullable = true)
 |-- MINFLT: integer (nullable = true)
 |-- MAJFLT: integer (nullable = true)
 |-- VSTEXT: integer (nullable = true)
 |-- VSIZE: double (nullable = true)
 |-- RSIZE: double (nullable = true)
 |-- VGROW: double (nullable = true)
 |-- RGROW: double (nullable = true)
 |-- MEM: double (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: integer (nullable = true)
 |-- CMD_PID: string (nullable = true)
 |-- event_time: timestamp (nullable = true)

+--------+-------+-----+------+------+------+-------+-------+-----+-----+----+--------------+----------+--------------------+-------------------+
|sequence|machine|  PID|MINFLT|MAJFLT|VSTEXT|  VSIZE|  RSIZE|VGROW|RGROW| MEM|           CMD|        ts|             CMD_PID|         event_time|
+--------+-------+-----+------+------+------+-------+-------+-----+-----+----+--------------+----------+--------------------+------

Loading the machine learning models to predict 

In [36]:
#Loading the Pipeline Model From the filesystem

process_pipelineModel = PipelineModel.load('process_pipeline_model')

memory_pipelineModel = PipelineModel.load('memory_pipeline_model')


Predicting for both process and memory

In [37]:
process_predictions=process_pipelineModel.transform(process_df_formatted)
memory_predictions=memory_pipelineModel.transform(memory_df_formatted)

Viewing predictions in real time in the console

In [38]:
query = process_predictions \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [39]:
query.stop()

In [40]:
query = memory_predictions \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .queryName("memory_pred") \
    .trigger(processingTime='5 seconds') \
    .start()

In [41]:
query.stop()

If a program in one machine,having the same “CMD” and “PID” in both process and memory streaming data, is predicted as an attack in BOTH process and memory activity prediction, then this is considered as an attack event. Find the streaming events fulfilling the criteria, create a new column to record the processing time and persist them in parquet. 

In [42]:
for_process_pred=process_predictions
for_memory_pred=memory_predictions

Getting the names of all the columns in both the predictions dataframes

In [43]:
column1=for_process_pred.schema.names
column2=for_memory_pred.schema.names

Updating the column names for joining purposes

In [44]:
for_process_pred=for_process_pred.select([F.col(c).alias("Process_" + c) for c in column1]) #adding Process_ as a prefix

In [45]:
for_memory_pred=for_memory_pred.select([F.col(c).alias("Memory_" + c) for c in column2]) #adding Memory_ as a prefix

Joining the two 

In [46]:
joinb = for_process_pred.join(for_memory_pred, for_process_pred.Process_CMD_PID == for_memory_pred.Memory_CMD_PID)

In [47]:
joinb.schema.names

['Process_sequence',
 'Process_machine',
 'Process_PID',
 'Process_TRUN',
 'Process_TSLPI',
 'Process_TSLPU',
 'Process_POLI',
 'Process_NICE',
 'Process_PRI',
 'Process_RTPR',
 'Process_CPUNR',
 'Process_Status',
 'Process_EXC',
 'Process_State',
 'Process_CPU',
 'Process_CMD',
 'Process_ts',
 'Process_CMD_PID',
 'Process_event_time',
 'Process_POLI_idx',
 'Process_Status_idx',
 'Process_State_idx',
 'Process_POLI_vec',
 'Process_Status_vec',
 'Process_State_vec',
 'Process_tmp_CMD',
 'Process_CMD_vec',
 'Process_features',
 'Process_scaled_features',
 'Process_rawPrediction',
 'Process_probability',
 'Process_prediction',
 'Memory_sequence',
 'Memory_machine',
 'Memory_PID',
 'Memory_MINFLT',
 'Memory_MAJFLT',
 'Memory_VSTEXT',
 'Memory_VSIZE',
 'Memory_RSIZE',
 'Memory_VGROW',
 'Memory_RGROW',
 'Memory_MEM',
 'Memory_CMD',
 'Memory_ts',
 'Memory_CMD_PID',
 'Memory_event_time',
 'Memory_tmp_CMD',
 'Memory_CMD_vec',
 'Memory_features',
 'Memory_scaled_features',
 'Memory_rawPrediction

dropping unneccessary columns

In [48]:
cols_to_drop=[
 'Process_POLI_idx',
 'Process_Status_idx',
 'Process_State_idx',
 'Process_POLI_vec',
 'Process_Status_vec',
 'Process_State_vec',
 'Process_tmp_CMD',
 'Process_CMD_vec',
 'Process_features',
 'Process_scaled_features',
 'Process_rawPrediction',
 'Process_probability',
 'Memory_tmp_CMD',
 'Memory_CMD_vec',
 'Memory_features',
 'Memory_scaled_features',
 'Memory_rawPrediction',
 'Memory_probability']
joinb = joinb.drop(*cols_to_drop)

Getting those data where both memory and process predicts attack in the same machine and CMD_PID time lag is less than 30 seconds

In [49]:
joinb=joinb.withColumn("delta_time",abs(col("Process_ts")-col("Memory_ts"))) #getting time difference b/w process and memory

In [50]:
joinb=joinb.filter(col("Memory_Prediction")+col("Process_prediction") == 2 ) #both predictions =1

In [51]:
joinb=joinb.filter(col("Process_machine")==col("Memory_machine")) #machine id equal for both


In [52]:
joinb=joinb.filter(col("delta_time")<30) # time difference less than 30 seconds
#joinb=joinb.filter(col("delta_time")>-30)

In [53]:
joinb=joinb.withColumn('Processing_time', F.current_timestamp()) #adding processing time

Viewing the output

In [54]:
query = joinb \
    .writeStream \
    .outputMode("Append") \
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .option("truncate","false")\
    .start()

In [55]:
query.stop()

Persisting to system memory

In [60]:
# Write into parquet files for process and memory predictions
process_memory_attack = joinb.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "process_memory_attack.parquet")\
        .option("checkpointLocation", "process_memory_attack.parquet/checkpoint")\
        .start()

In [61]:
process_memory_attack.stop()

In [62]:
# Read the saved parquet data for process
process_memory_attack = spark.read.parquet("process_memory_attack.parquet")
process_memory_attack.printSchema()
process_memory_attack.show()

root
 |-- Process_sequence: integer (nullable = true)
 |-- Process_machine: integer (nullable = true)
 |-- Process_PID: integer (nullable = true)
 |-- Process_TRUN: integer (nullable = true)
 |-- Process_TSLPI: integer (nullable = true)
 |-- Process_TSLPU: integer (nullable = true)
 |-- Process_POLI: string (nullable = true)
 |-- Process_NICE: integer (nullable = true)
 |-- Process_PRI: integer (nullable = true)
 |-- Process_RTPR: integer (nullable = true)
 |-- Process_CPUNR: integer (nullable = true)
 |-- Process_Status: string (nullable = true)
 |-- Process_EXC: integer (nullable = true)
 |-- Process_State: string (nullable = true)
 |-- Process_CPU: double (nullable = true)
 |-- Process_CMD: string (nullable = true)
 |-- Process_ts: integer (nullable = true)
 |-- Process_CMD_PID: string (nullable = true)
 |-- Process_event_time: timestamp (nullable = true)
 |-- Process_prediction: double (nullable = false)
 |-- Memory_sequence: integer (nullable = true)
 |-- Memory_machine: integer (