# FIT5202 - Data processing for Big Data
Name : Vaibhavi Bhardwaj

Student Id : 30154987

Email id : vbha0006@student.monash.edu

## 3. Streaming application using Spark Structured Streaming (60%)

### 1. SparkSession is created using a SparkConf object, which would use two local cores with a proper application name, and use UTC as the timezone (4%)

In [57]:
# Initialising Spark and importing files
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.types import *

# 
spark = SparkSession \
    .builder \
    .appName("Streaming application Analysis in Spark") \
    .master("local[2]")\
    .getOrCreate()

### 2. From the Kafka producers in Task 1.1 and 1.2, ingest the streaming data into Spark Streaming for both process and memory activities(3%)

**Ingesting Process Data from producer**

In [58]:
topic = "A2BprocessStream"# topic of the  process producer for streaming application

# ingesting data
df_process = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()

In [59]:
df_process.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



**Ingesting Memory Data from producer**

In [60]:
topic = "A2BmemoryStream"# topic of the memory producer for streaming application

#Ingesting Data
df_memory = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "127.0.0.1:9092") \
    .option("subscribe", topic) \
    .load()

In [61]:
df_memory.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



**Step 3**

**3. Then the streaming data format should be transformed into the proper formats
following the metadata file schema for both process and memory, similar to
assignment 2A
4
(3%)**


**Process**

In [62]:
# Taking the value from the producer data as string
df_process = df_process.selectExpr("CAST(value AS STRING)")

#Define the schema for the structured datastream received
schema_process = ArrayType(StructType([    
    StructField('sequence', StringType(), True), 
    StructField('machine', StringType(), True), 
    StructField('PID', StringType(), True), 
    StructField('TRUN', StringType(), True), 
    StructField('TSLPI', StringType(), True), 
    StructField('TSLPU', StringType(), True), 
    StructField('POLI', StringType(), True), 
    StructField('NICE', StringType(), True), 
    StructField('PRI', StringType(), True), 
    StructField('RTPR', StringType(), True), 
    StructField('CPUNR', StringType(), True), 
    StructField('Status', StringType(), True), 
    StructField('EXC', StringType(), True), 
    StructField('State', StringType(), True), 
    StructField('CPU', StringType(), True), 
    StructField('CMD', StringType(), True), 
    StructField('ts', StringType(), True)      
]))

# Unnesting the dataframees 
df_process = df_process.select(F.from_json(F.col("value").cast("string"), schema_process).alias('parsed_value'))
df_process = df_process.select(F.explode(F.col("parsed_value")).alias('unnested_value')) 

# Renaming the columns for simplicity and according to the description
df_formated_process = df_process.select(
                    F.col("unnested_value.sequence").alias("sequence"),
                    F.col("unnested_value.machine").alias("machine"),
                    F.col("unnested_value.PID").alias("PID") ,                   
                    F.col("unnested_value.TRUN").alias("TRUN"),
                    F.col("unnested_value.TSLPI").alias("TSLPI"),
                    F.col("unnested_value.TSLPU").alias("TSLPU"),                    
                    F.col("unnested_value.POLI").alias("POLI"),
                    F.col("unnested_value.NICE").alias("NICE"),
                    F.col("unnested_value.PRI").alias("PRI") ,                   
                    F.col("unnested_value.RTPR").alias("RTPR"),
                    F.col("unnested_value.CPUNR").alias("CPUNR"),
                    F.col("unnested_value.Status").alias("Status"),
                    F.col("unnested_value.EXC").alias("EXC"),
                    F.col("unnested_value.State").alias("State"),
                    F.col("unnested_value.CPU").alias("CPU"),
                    F.col("unnested_value.CMD").alias("CMD"),
                    F.col("unnested_value.ts").alias("ts")
                )

**Memory**

In [63]:
# Taking the value from the producer data as string
df_memory = df_memory.selectExpr("CAST(value AS STRING)")

#Define the schema for the structured datastream received
schema_process = ArrayType(StructType([    
    StructField('sequence', StringType(), True), 
    StructField('machine', StringType(), True), 
    StructField('PID', StringType(), True), 
    StructField('MINFLT', StringType(), True), 
    StructField('MAJFLT', StringType(), True), 
    StructField('VSTEXT', StringType(), True), 
    StructField('VSIZE', StringType(), True), 
    StructField('RSIZE', StringType(), True), 
    StructField('VGROW', StringType(), True), 
    StructField('RGROW', StringType(), True), 
    StructField('MEM', StringType(), True),
    StructField('CMD', StringType(), True), 
    StructField('ts', StringType(), True)      
]))


# Unnesting the dataframees 
df_memory = df_memory.select(F.from_json(F.col("value").cast("string"), schema_process).alias('parsed_value'))
df_memory = df_memory.select(F.explode(F.col("parsed_value")).alias('unnested_value')) 

# Renaming the columns for simplicity and according to the description
df_memory = df_memory.select(
                    F.col("unnested_value.sequence").alias("sequence"),
                    F.col("unnested_value.machine").alias("machine"),
                    F.col("unnested_value.PID").alias("PID") ,                   
                    F.col("unnested_value.MINFLT").alias("MINFLT"),
                    F.col("unnested_value.MAJFLT").alias("MAJFLT"),
                    F.col("unnested_value.VSTEXT").alias("VSTEXT"),
                    F.col("unnested_value.VSIZE").alias("VSIZE"),
                    F.col("unnested_value.RSIZE").alias("RSIZE"),
                    F.col("unnested_value.VGROW").alias("VGROW") ,                   
                    F.col("unnested_value.RGROW").alias("RGROW"),
                    F.col("unnested_value.MEM").alias("MEM"),
                    F.col("unnested_value.CMD").alias("CMD"),
                    F.col("unnested_value.ts").alias("ts")
                )

### 3. Then the streaming data format should be transformed into the proper formats following the metadata file schema for both process and memory, similar toassignment 2A4 (3%)

### 4. For process and memory, respectively, create a new column “CMD_PID” concatenating “CMD” and “PID” columns, and a new column “event_time” as timestamp format based on the unix time in “ts” column (5%)

**Process**

In [64]:
#Transforming the data 
from pyspark.sql.functions import concat, col, lit
from pyspark.sql.functions import from_unixtime

# .................................................question 3 changes .....................................................
# Reassigning the columns types to integer 
df_formated_process1 = df_formated_process.withColumn("sequence",F.col("sequence").cast(IntegerType()))\
    .withColumn("machine",F.col("machine").cast(IntegerType()))\
    .withColumn("PID",F.col("PID").cast(IntegerType()))\
    .withColumn("TRUN",F.col("TRUN").cast(IntegerType()))\
    .withColumn("TSLPI",F.col("TSLPI").cast(IntegerType()))\
    .withColumn("TSLPU",F.col("TSLPU").cast(IntegerType()))\
    .withColumn("NICE",F.col("NICE").cast(IntegerType()))\
    .withColumn("PRI",F.col("PRI").cast(IntegerType()))\
    .withColumn("RTPR",F.col("RTPR").cast(IntegerType()))\
    .withColumn("CPUNR",F.col("CPUNR").cast(IntegerType()))\
    .withColumn("EXC",F.col("EXC").cast(IntegerType()))\
    .withColumn("CPU",F.col("CPU").cast(IntegerType()))\
    .withColumn("ts",F.col("ts").cast(IntegerType()))

# printing the changed schema
df_formated_process1.printSchema()

# fixing the relationship between 'NICE' and 'PRI'
# NICE = PRI-120 if PRI is not equal to 0 , else NICE is 0
df_formated_process1 = df_formated_process1.withColumn("NICE", F.when(F.col("PRI")==0, 0).otherwise(F.col("PRI")-120))


# .................................................question 4 changes .....................................................
# making the CMD_PID columns 
df_formated_process1 = df_formated_process1.withColumn("CMD_PID", concat(F.col('CMD'),F.col("PID")))
# Making event column by changing ts to timestamp 
df_formated_process1 = df_formated_process1.withColumn('event_time', \
                                                       from_unixtime(col('ts')).cast(TimestampType()))
# with 20 second water mark 
df_formated_process1 = df_formated_process1.withWatermark("event_time", "20 seconds")

root
 |-- sequence: integer (nullable = true)
 |-- machine: integer (nullable = true)
 |-- PID: integer (nullable = true)
 |-- TRUN: integer (nullable = true)
 |-- TSLPI: integer (nullable = true)
 |-- TSLPU: integer (nullable = true)
 |-- POLI: string (nullable = true)
 |-- NICE: integer (nullable = true)
 |-- PRI: integer (nullable = true)
 |-- RTPR: integer (nullable = true)
 |-- CPUNR: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- EXC: integer (nullable = true)
 |-- State: string (nullable = true)
 |-- CPU: integer (nullable = true)
 |-- CMD: string (nullable = true)
 |-- ts: integer (nullable = true)



**Memory**

In [65]:
from pyspark.sql.functions import concat, col, lit,when , expr
from pyspark.sql.functions import from_unixtime

# .................................................question 3 changes .....................................................
# Fixing the values of string to number 
# removing 'K','G' and 'M' from the value in integer cols of memory 
# k is kilo so 1000
# m is million so 1000000
df_memory = df_memory.withColumn('MINFLT',when(col('MINFLT').contains('K'),expr('substring(MINFLT,1,length(MINFLT)-1)').\
                                              cast('double')*1000).otherwise(col('MINFLT').cast('double')))


df_memory = df_memory.withColumn('MINFLT',when(col('MAJFLT').contains('M'),expr('substring(MAJFLT,1,length(MAJFLT)-1)').\
                                              cast('double')*1000000).otherwise(col('MAJFLT').cast('double')))

df_memory = df_memory.withColumn('MINFLT',when(col('VSTEXT').contains('K'),expr('substring(VSTEXT,1,length(VSTEXT)-1)').\
                                              cast('double')*1000).otherwise(col('VSTEXT').cast('double')))

df_memory = df_memory.withColumn('MINFLT',when(col('RSIZE').contains('K'),expr('substring(RSIZE,1,length(RSIZE)-1)').\
                                              cast('double')*1000).\
                                 when(col('RSIZE').contains('M'),expr('substring(RSIZE,1,length(RSIZE)-1)').\
                                              cast('double')*1000000).otherwise(col('RSIZE').cast('double')))

df_memory = df_memory.withColumn('MINFLT',when(col('VGROW').contains('K'),expr('substring(VGROW,1,length(VGROW)-1)').\
                                              cast('double')*1000).otherwise(col('VGROW').cast('double')))

df_memory = df_memory.withColumn('MINFLT',when(col('RGROW').contains('K'),expr('substring(RGROW,1,length(RGROW)-1)').\
                                              cast('double')*1000).otherwise(col('RGROW').cast('double')))


# Now changing the datatypes of string or double to string 
df_memory_formatted1 = df_memory.withColumn("sequence",F.col("sequence").cast(IntegerType()))\
    .withColumn("machine",F.col("machine").cast(IntegerType()))\
    .withColumn("PID",F.col("PID").cast(IntegerType()))\
    .withColumn("MINFLT",F.col("MINFLT").cast(IntegerType()))\
    .withColumn("MAJFLT",F.col("MAJFLT").cast(IntegerType()))\
    .withColumn("VSTEXT",F.col("VSTEXT").cast(IntegerType()))\
    .withColumn("VSIZE",F.col("VSIZE").cast(IntegerType()))\
    .withColumn("RSIZE",F.col("RSIZE").cast(IntegerType()))\
    .withColumn("VGROW",F.col("VGROW").cast(IntegerType()))\
    .withColumn("RGROW",F.col("RGROW").cast(IntegerType()))\
    .withColumn("MEM",F.col("MEM").cast(IntegerType()))\
    .withColumn("ts",F.col("ts").cast(IntegerType()))


# .................................................question 4 changes .....................................................

# Adding event time column to the dataframe with 20 seconds watermark
df_memory_formatted1 = df_memory_formatted1.withColumn('event_time', \
                                                       from_unixtime(col('ts')).cast(TimestampType()))
# assigning a 20 secong water mark to event_time
df_memory_formatted1 = df_memory_formatted1.withWatermark("event_time", "20 seconds")
#making the CMD_PID column
df_memory_formatted1 = df_memory_formatted1.withColumn("CMD_PID", concat(F.col('CMD'),F.col("PID")))

In [66]:
#Query to check in the console 
query = df_memory_formatted1 \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='20 seconds') \
    .start()

In [67]:
query.stop()

### 5. Persist the transformed streaming data in parquet format for both process and memory (5%) The process data should be stored in “process.parquet” in the same folder of your notebook, and the memory data should be stored in “memory.parquet” in the same folder of your notebook

**Process**

In [68]:
# Persisting the transformed data into the parquet dataset
# for process it is process.parquet"
query = df_formated_process1 \
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("path", "process.parquet")\
    .option("checkpointLocation","process.parquet/checkpoint") \
    .trigger(processingTime='20 seconds') \
    .start()

In [69]:
query.stop()

**Memory**

In [70]:
# Persisting the transformed data into the parquet dataset
# for memory it is memory.parquet"
query = df_memory_formatted1 \
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("path", "memory.parquet")\
    .option("checkpointLocation","memory.parquet/checkpoint") \
    .trigger(processingTime='20 seconds') \
    .start()

In [71]:
query.stop()

#### 6. Load the machine learning models given , and use the models to predict whether each process or memory streaming record is an attack event, respectively (5%)

In [72]:
# Loading the models by importing PipelineModel
from pyspark.ml import PipelineModel

# process model
processModel = PipelineModel.load('process_pipeline_model')
# memory model 
memoryModel = PipelineModel.load('memory_pipeline_model')

In [73]:
# Predicting values for the streaming ingeste data 
processPredictions = processModel.transform(df_formated_process1) # tranforming process data
memoryPredictions = memoryModel.transform(df_memory_formatted1) # tranforming memory data

#### 7. Using the prediction result, and monitor the data following the requirements below (30%)

A. If any program in one machine is predicted as an attack in EITHER process or memory activity prediction, it could be a false alarm or a potential attack. Keep track of the approximate count of such events in every 2-min window for each machine for process and memory, respectively, and write the stream into Spark Memory sink using complete mode.
    - Your aggregated result should include machine ID, the time window, and the counts
    - Note that if there are more than one entries having the SAME “CMD_PID” in a 2-min window, get the approximate distinct count
    - For example, if two or more records of “atop” program with the exact same “CMD_PID” are predicted as an attack in the process between 2020-10-10 10:10:10 and 2020-10-10 10:11:09 , only need to count this “atop” program attack once. 

In [74]:
# 7.a Memory 
# importing libraries needed by this cell for the stated tranformation
from pyspark.sql.functions import col, window 
import pyspark.sql.functions as func

# windowing the tranformed after passing through the model 
# 2 min window, with machine id and total attacks 
# prediction = 1 is an attack and unique CMD_PID or cmd+pid
windowedCountsmemory = memoryPredictions.where(col('prediction')==1)\
    .groupBy(window(memoryPredictions.event_time, "120 seconds"),'machine')\
    .agg(func.approx_count_distinct(concat(F.col('CMD'),F.col('PID'))).alias("total"))\
    .select("window",'machine','total')

# Making a query to run the data and save it in the memory output 
query7amemory = windowedCountsmemory \
    .writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("7aquerym") \
    .option("truncate","false") \
    .start()

In [75]:
# stopping the query 
query7amemory.stop()

In [76]:
# 7.a Process 
# implementing the same as above but on process data 
windowedCountsprocess = processPredictions \
    .where(col('prediction')==1)\
    .groupBy(window(processPredictions.event_time, "120 seconds"),'machine')\
    .agg(func.approx_count_distinct(F.col('CMD_PID')).alias("total"))\
    .select("window",'machine','total')

# Making a query to run the data and save it in the memory output 
query7aprocess = windowedCountsprocess \
    .writeStream \
    .format("memory") \
    .outputMode("complete") \
    .queryName("7aqueryp") \
    .option("truncate","false") \
    .start()

In [77]:
# stopping the query 
query7aprocess.stop()

b. If a program in one machine, having the same “CMD” and “PID” in both
process and memory streaming data, is predicted as an attack in BOTH
process and memory activity prediction, then this is considered as an attack
event. Find the streaming events fulfilling the criteria, create a new column to
record the processing time
8 and persist them in parquet.
- Note the program with the same “CMD” and “PID” might not be
generated at the exact same event time. If the difference between the
event times in process and memory is less than 30 seconds and the
program fulfills the criteria of matching “CMD” and “PID”, then you
should include them for the above checking.
- If there are multiple entries fulfilling the above criteria in process or
memory, do not remove the extra entries
- Persist the program’s relevant information (including process &
memory data, process & memory’s event and processing timestamp,

In [78]:
# 7.b 
# Taking the data needed to implement a join between the 2 streaming datasets
from pyspark.sql.functions import expr

# selecting rows needed to be used further for process 
# changing the column names for removing ambiguity between common fields between data from process and memory
processWithWatermark = processPredictions \
  .selectExpr("CMD AS CMDpre", "PID AS PIDpre","event_time AS event_timePre",'TRUN', \
              'TSLPI', 'TSLPU', 'POLI', 'NICE', 'PRI', 'RTPR', 'CPUNR', 'Status', 'EXC', 'State', 'CPU', \
               'ts  AS tsPre', 'CMD_PID','prediction AS proPrediction','sequence', 'machine')

# selecting rows needed to be used further for memory  
# changing the column names for removing ambiguity between common fields between data from process and memory
memoryWithWatermark = memoryPredictions \
  .selectExpr("CMD", "PID" ,"event_time",'sequence as sequenceMem', 'machine as machineMem', 'MINFLT', \
              'MAJFLT', 'VSTEXT', 'VSIZE', 'RSIZE', 'VGROW', 'RGROW', 'MEM', 'ts  as tsMem', 'event_time','prediction')


In [79]:
# Inner join with time range conditions
# inner joining the dataset and putting conditions as asked in the question statement
# CMD , PID of process and memory should be same 
# process event time should be within 30 sec of memory event  
TEST = processWithWatermark.join( memoryWithWatermark, expr(""" 
      CMDpre = CMD AND 
      PID =  PIDpre AND
      proPrediction = 1 AND
      prediction = 1 AND
      (event_timePre <= event_time + interval 30 seconds OR
      event_time <= event_timePre + interval 30 seconds)
      """))

In [80]:
# printing in the console for checking 
### for debugging purpose 
query = TEST.select("event_time","CMD","PID","event_timePre")\
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .trigger(processingTime='20 seconds') \
    .start()

In [81]:
# stopping the query 
query.stop()

In [82]:
# assigning the a new column process_time to the dataset 
from pyspark.sql.functions import current_timestamp

TEST = TEST.withColumn('processing_time',current_timestamp())

In [83]:
# Persist the dataset into a parquet file
# finally putting the data from Process and memory to put in the parquet file 
# data from process and memory is taken further to the parquet file 
query = TEST.select("CMD", "PID" ,'sequenceMem', 'machineMem','machine','sequence', 'MINFLT','TRUN', \
              'TSLPI', 'TSLPU', 'POLI', 'NICE', 'PRI', 'RTPR', 'CPUNR', 'Status', 'EXC', 'State', 'CPU','CMD_PID',
              'MAJFLT', 'VSTEXT', 'VSIZE', 'RSIZE', 'VGROW', 'RGROW', 'MEM', 'tsMem','tsPre','event_time','event_timePre',\
                    'prediction','processing_time') \
    .writeStream \
    .format("parquet") \
    .outputMode("append") \
    .option("path", "process_memory_attack.parquet")\
    .option("checkpointLocation","process_memory_attack.parquet/checkpoint") \
    .trigger(processingTime='20 seconds') \
    .start()

In [84]:
query.stop()

In [85]:
# preinting in the console for checking with at the strcture of the data 
query = TEST.select("CMD", "PID" ,'sequenceMem', 'machineMem','machine','sequence', 'MINFLT','TRUN', \
              'TSLPI', 'TSLPU', 'POLI', 'NICE', 'PRI', 'RTPR', 'CPUNR', 'Status', 'EXC', 'State', 'CPU','CMD_PID',
              'MAJFLT', 'VSTEXT', 'VSIZE', 'RSIZE', 'VGROW', 'RGROW', 'MEM', 'tsMem','tsPre','event_time','event_timePre',\
                    'prediction','processing_time') \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .trigger(processingTime='20 seconds') \
    .start()

In [86]:
query.stop()

#### 8. Visualise the data in line charts for step 7a (5%)
    - For the count of suspect attacks for each machine in step 7a, use Spark SQL to query the data from Spark Memory sink, and prepare a line chart plot for showing the count of suspect attacks for each machine at each 2-min window from the start to the most recent, and refresh the plot every 10 minutes
    - Hint - x-axis can be used to represent the timeline, while y-axis can be used to represent the count; each machine’s line data can be represented in different color legends


In [56]:
def init_plots():
    try:
        width = 9.5
        height = 6
        fig = plt.figure(figsize=(width,height)) # create new figure
        ax = fig.add_subplot(111) # adding the subplot axes to the given grid position
        fig.suptitle('Real-time uniform stream data visualization') # giving figure a title
        ax.set_xlabel('Time')
        ax.set_ylabel('Value')        
        fig.show() # displaying the figure
        fig.canvas.draw() # drawing on the canvas
        return fig, ax
    except Exception as ex:
        print(str(ex))

In [None]:
import time
import matplotlib.pyplot as plt
%matplotlib notebook

fig, ax = init_plots()

while True:
    df = spark.sql("select * from 7aqueryp").toPandas()
    x4 =df[df['machine']==4] 
   
    x = df['minute_bin'].to_list()
    y = df['Total Impressions'].to_list() 
    ax.clear()
    print(x)
    print(y)
    ax.plot(x, y)
    ax.set_xlabel('Time')
    ax.set_ylabel('Impressions')    
    fig.canvas.draw()
   
        
    time.sleep(1)
    