In [None]:
# Task 3
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 pyspark-shell'

# importing required libraries
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.ml import PipelineModel
import datetime as dt
from pyspark.sql.functions import col, concat, expr, explode, split, regexp_replace
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, DoubleType, TimestampType

In [None]:
# Task 3.1
# running Spark with 2 logical cores
master = "local[2]"
# setting the appname
app_name = "Hacking Analysis"
# setting spark's configuration parameters
spark_conf = SparkConf().setMaster(master).setAppName(app_name)
# Building SparkSession with Timezone as UTC
spark = SparkSession.builder.config(conf=spark_conf)\
            .config("spark.sql.session.timeZone", 'UTC')\
            .getOrCreate()

In [None]:
# Task 3.2
# subscribe to topic memory
topic = "Memory"
# connect to the kafka server for memory data
memory_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", topic) \
    .load()

In [None]:
# raw schema 
memory_df.printSchema()

In [None]:
# Task 3.2
# subscribe to topic process
topic = "Process"
# connect to the kafka server for process data
process_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", topic) \
    .load()

In [None]:
# raw schema
process_df.printSchema()

In [None]:
# Task 3.3
# cast the values and keys as string
memory_df = memory_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
# raw schema
memory_df.printSchema()

In [None]:
# Task 3.3
# cast the values and keys as string
process_df = process_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
# raw schema
process_df.printSchema()

In [None]:
# Task 3.3
# memory schema
mem_schema = ArrayType(StructType([
    StructField("sequence", StringType(), True),
    StructField("machine", StringType(), True),
    StructField("PID", StringType(), True),
    StructField("MINFLT", StringType(), True),
    StructField("MAJFLT", StringType(), True),
    StructField("VSTEXT", StringType(), True),
    StructField("VSIZE", StringType(), True),
    StructField("RSIZE", StringType(), True),
    StructField("VGROW", StringType(), True),
    StructField("RGROW", StringType(), True),
    StructField("MEM", StringType(), True),
    StructField("CMD", StringType(), True),
    StructField("ts", TimestampType(), True)
]))

In [None]:
# Task 3.3
# process schema
pro_schema = ArrayType(StructType([
    StructField("sequence", StringType(), True),
    StructField("machine", StringType(), True),
    StructField("PID", StringType(), True),
    StructField("TRUN", StringType(), True),
    StructField("TSLPI", StringType(), True),
    StructField("TSLPU", StringType(), True),
    StructField("POLI", StringType(), True),
    StructField("NICE", StringType(), True),
    StructField("PRI", StringType(), True),
    StructField("RTPR", StringType(), True),
    StructField("CPUNR", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("EXC", StringType(), True),
    StructField("State", StringType(), True),
    StructField("CPU", StringType(), True),
    StructField("CMD", StringType(), True),
    StructField("ts", TimestampType(), True)
]))

In [None]:
# Task 3.3
# unnesting the data and renaming the columns
memory_df = memory_df.select(F.from_json(F.col("value").cast("string"), mem_schema).alias('parsed_value'))
memory_df = memory_df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))  
memory_df = memory_df.select(
                    F.col("unnested_value.sequence").alias("sequence"),
                    F.col("unnested_value.machine").alias("machine"),
                    F.col("unnested_value.PID").alias("PID"),
                    F.col("unnested_value.MINFLT").alias("MINFLT"),
                    F.col("unnested_value.MAJFLT").alias("MAJFLT"),
                    F.col("unnested_value.VSTEXT").alias("VSTEXT"),
                    F.col("unnested_value.VSIZE").alias("VSIZE"),
                    F.col("unnested_value.RSIZE").alias("RSIZE"),
                    F.col("unnested_value.VGROW").alias("VGROW"),
                    F.col("unnested_value.RGROW").alias("RGROW"),
                    F.col("unnested_value.MEM").alias("MEM"),
                    F.col("unnested_value.CMD").alias("CMD"),
                    F.col("unnested_value.ts").alias("ts")
                )

In [None]:
# Task 3.3
# changing the datatype as per the metadata
memory_df = memory_df.withColumn("sequence",col("sequence").cast(IntegerType())) \
                    .withColumn("machine",col("machine").cast(IntegerType())) \
                    .withColumn("PID",col("PID").cast(IntegerType())) \
                    .withColumn("MINFLT",col("MINFLT").cast(IntegerType())) \
                    .withColumn("MAJFLT",col("MAJFLT").cast(IntegerType())) \
                    .withColumn("VSTEXT",col("VSTEXT").cast(IntegerType())) \
                    .withColumn("VSIZE",col("VSIZE").cast(DoubleType())) \
                    .withColumn("RSIZE",col("RSIZE").cast(DoubleType())) \
                    .withColumn("VGROW",col("VGROW").cast(DoubleType())) \
                    .withColumn("RGROW",col("RGROW").cast(DoubleType())) \
                    .withColumn("MEM",col("MEM").cast(DoubleType()))

In [None]:
# final schema
memory_df.printSchema()

In [None]:
# Task 3.3
# unnesting the data and renaming the columns
process_df = process_df.select(F.from_json(F.col("value").cast("string"), pro_schema).alias('parsed_value'))
process_df = process_df.select(F.explode(F.col("parsed_value")).alias('unnested_value'))  
process_df = process_df.select(
                    F.col("unnested_value.sequence").alias("sequence"),
                    F.col("unnested_value.machine").alias("machine"),
                    F.col("unnested_value.PID").alias("PID"),
                    F.col("unnested_value.TRUN").alias("TRUN"),
                    F.col("unnested_value.TSLPI").alias("TSLPI"),
                    F.col("unnested_value.TSLPU").alias("TSLPU"),
                    F.col("unnested_value.POLI").alias("POLI"),
                    F.col("unnested_value.NICE").alias("NICE"),
                    F.col("unnested_value.PRI").alias("PRI"),
                    F.col("unnested_value.RTPR").alias("RTPR"),
                    F.col("unnested_value.CPUNR").alias("CPUNR"),
                    F.col("unnested_value.Status").alias("Status"),
                    F.col("unnested_value.EXC").alias("EXC"),
                    F.col("unnested_value.State").alias("State"),
                    F.col("unnested_value.CPU").alias("CPU"),
                    F.col("unnested_value.CMD").alias("CMD"),
                    F.col("unnested_value.ts").alias("ts")
                )

In [None]:
# Task 3.3
# changing the datatype as per the metadata
process_df = process_df.withColumn("sequence",col("sequence").cast(IntegerType())) \
                    .withColumn("machine",col("machine").cast(IntegerType())) \
                    .withColumn("PID",col("PID").cast(IntegerType())) \
                    .withColumn("TRUN",col("TRUN").cast(IntegerType())) \
                    .withColumn("TSLPI",col("TSLPI").cast(IntegerType())) \
                    .withColumn("TSLPU",col("TSLPU").cast(IntegerType())) \
                    .withColumn("NICE",col("NICE").cast(IntegerType())) \
                    .withColumn("PRI",col("PRI").cast(IntegerType())) \
                    .withColumn("RTPR",col("RTPR").cast(IntegerType())) \
                    .withColumn("CPUNR",col("CPUNR").cast(IntegerType())) \
                    .withColumn("EXC",col("EXC").cast(IntegerType())) \
                    .withColumn("CPU",col("CPU").cast(DoubleType()))

In [None]:
# final schema
process_df.printSchema()

In [None]:
# Task 3.3
# Replacing K with 0000s
# Using substring to remove K and typecast the data to int
# Multiply by 1000 after typecasting
memory_df = memory_df.withColumn('MINFLT',\
                     F.when(col("MINFLT").contains('K'),\
                            expr("substring(MINFLT, 1, length(MINFLT)-1)")\
                            .cast('int')*1000).otherwise(col("MINFLT").cast('int')))

# Replacing M with 000000s
# Using substring to remove M and typecast the data to int
# Multiply by 1000000 after typecasting
memory_df = memory_df.withColumn('MAJFLT',\
                 F.when(col("MAJFLT").contains('M'),\
                        expr("substring(MAJFLT, 1, length(MAJFLT)-1)")\
                        .cast('double')*1000000).otherwise(col("MAJFLT").cast('double')))

# Replacing K with 0000s
# Using substring to remove K and typecast the data to int
# Multiply by 1000 after typecasting
memory_df = memory_df.withColumn('VSTEXT',\
                 F.when(col("VSTEXT").contains('K'),\
                        expr("substring(VSTEXT, 1, length(VSTEXT)-1)")\
                        .cast('double')*1000).otherwise(col("VSTEXT").cast('double')))

# Replacing M with 000000s
# Using substring to remove M and typecast the data to int
# Multiply by 1000000 after typecasting
memory_df = memory_df.withColumn('RSIZE',\
                 F.when(col("RSIZE").contains('M'),\
                        expr("substring(RSIZE, 1, length(RSIZE)-1)")\
                        .cast('double')*1000000).otherwise(col("RSIZE")))

# Replacing K with 0000s
# Using substring to remove K and typecast the data to int
# Multiply by 1000 after typecasting
memory_df = memory_df.withColumn('RSIZE',\
                 F.when(col("RSIZE").contains('K'),\
                        expr("substring(RSIZE, 1, length(RSIZE)-1)")\
                        .cast('double')*1000).otherwise(col("RSIZE").cast('double')))

# Removing spaces from between numbers
memory_df = memory_df.withColumn('VGROW', regexp_replace(col('VGROW'), ' ', ''))

# Replacing K with 0000s
# Using substring to remove K and typecast the data to int
# Multiply by 1000 after typecasting
memory_df = memory_df.withColumn('VGROW',\
                 F.when(col("VGROW").contains('K'),\
                        expr("substring(VGROW, 1, length(VGROW)-1)")\
                        .cast('double')*1000).otherwise(col("VGROW").cast('double')))

# Replacing K with 0000s
# Using substring to remove K and typecast the data to int
# Multiply by 1000 after typecasting
memory_df = memory_df.withColumn('RGROW',\
                 F.when(col("RGROW").contains('K'),\
                        expr("substring(RGROW, 1, length(RGROW)-1)")\
                        .cast('double')*1000).otherwise(col("RGROW").cast('double')))

In [None]:
# Task 3.3
# NICE = PRI - 120 when NICE!=0 else PRI = 0
process_df = process_df.withColumn('NICE', F.when(F.col('PRI') != 0, F.col('PRI') - 120).otherwise(0))

In [None]:
# Task 3.4
# concatinating CMD and PID
process_df = process_df.withColumn('CMD_PID', F.concat(process_df.CMD, F.lit('_'), process_df.PID))
# creating event time
process_df = process_df.withColumn('event_time', F.col('ts'))

In [None]:
# Task 3.4
# concatinating CMD and PID
memory_df = memory_df.withColumn('CMD_PID', F.concat(memory_df.CMD, F.lit('_'), memory_df.PID))
# creating event time
memory_df = memory_df.withColumn('event_time', F.col('ts'))

In [None]:
# Task 3.5
# persisting the data in parquet format
memory_sink = memory_df.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet.memory")\
        .option("checkpointLocation", "parquet.memory/checkpoint")\
        .start()

In [None]:
# stopping the sink
memory_sink.stop()

In [None]:
# Task 3.5
# persisting the data in parquet format
process_sink = process_df.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet.process")\
        .option("checkpointLocation", "parquet.process/checkpoint")\
        .start()

In [None]:
# stopping the sink
process_sink.stop()

In [None]:
# Task 3.6
# loading the given machine learning models to predict attack events
memory_mod = PipelineModel.load('memory_pipeline_model')
# passing the data to the model to generate predictions
memory_pred = memory_mod.transform(memory_df)

In [None]:
# Task 3.6
# loading the given machine learning models to predict attack events
process_mod = PipelineModel.load('process_pipeline_model')
# passing the data to the model to generate predictions
process_pred = process_mod.transform(process_df)

In [None]:
# Task 3.7 a
# setting the watermark to 120 secs to count all entries having the SAME
# “CMD_PID” in a 2-min window
# checking for attacks only
# groupping by machine and ts
pro_count = process_pred \
    .withWatermark("ts", "120 seconds") \
    .where('prediction==1')\
    .groupBy(F.window(process_df.ts, "120 seconds"),F.col('machine'))\
    .agg(F.approx_count_distinct("CMD_PID").alias("total"))\
    .select("window","machine","total")

In [None]:
# Task 3.7 a
# viewing the output by writing to console
query = pro_count \
    .writeStream \
    .outputMode("Complete")\
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [None]:
# stopping the query
query.stop()

In [None]:
# Task 3.7 a
# writing the count to spark memory sink using complete output mode
mem_ac_sink = pro_count \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("mem_ac")\
    .trigger(processingTime='5 seconds') \
    .option("truncate","false")\
    .start()

In [None]:
# stopping the query
mem_ac_sink.stop()

In [None]:
# Task 3.7 a
# setting the watermark to 120 secs to count all entries having the SAME
# “CMD_PID” in a 2-min window
# checking for attacks only
# groupping by machine and ts
mem_count = memory_pred \
    .withWatermark("ts", "120 seconds") \
    .where('prediction==1')\
    .groupBy(F.window(memory_df.ts, "120 seconds"),F.col('machine'))\
    .agg(F.approx_count_distinct("CMD_PID").alias("total"))\
    .select("window","machine","total")

In [None]:
# Task 3.7 a
# viewing the output by writing to console
query = mem_count \
    .writeStream \
    .outputMode("Complete")\
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [None]:
# stopping the query
query.stop()

In [None]:
# Task 3.7 a
# writing the count to spark memory sink using complete output mode
pro_ac_sink = pro_count \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("pro_ac")\
    .trigger(processingTime='5 seconds') \
    .option("truncate","false")\
    .start()

In [None]:
# stopping the query
pro_ac_sink.stop()

In [None]:
# Task 3.7 b
# extracting the columns from memory predictions to be merged
memory_uni = memory_pred\
            .selectExpr("MINFLT", "MAJFLT", "VSTEXT", "VSIZE", "RSIZE", "VGROW",\
                        "RGROW", "MEM", "CMD_PID AS M_CMD_PID", "ts AS M_ts",\
                        "event_time as M_event_time", "prediction AS M_prediction")\
            .withWatermark("M_ts", "40 seconds")

In [None]:
# Task 3.7 b
# extracting the columns from process predictions to be merged
process_uni = process_pred\
            .selectExpr("TRUN", "TSLPI", "TSLPU", "POLI", "NICE", "PRI", "RTPR",\
                        "CPUNR", "Status", "EXC", "State", "CPU", "CMD_PID as P_CMD_PID",\
                        "ts as P_ts", "event_time as P_event_time", "prediction as P_prediction")\
            .withWatermark("P_ts", "40 seconds")

In [None]:
# Task 3.7 b
# joining the above two dataframes on CMD_PID where attack = 1 for both
# and timestamp difference is no more than 30 seconds
# REF - https://databricks.com/blog/2018/03/13/introducing-stream-stream-joins-in-apache-spark-2-3.html
joint_df = process_uni.join(memory_uni,\
                               expr(""" 
                                    P_CMD_PID = M_CMD_PID AND 
                                    P_ts >= M_ts + interval 30 seconds AND 
                                    P_ts <= M_ts + interval 30 seconds AND
                                    P_prediction = 1.0 AND
                                    M_prediction = 1.0
                                    """
                                  )
                                )

In [None]:
# Task 3.7 b
# adding new column processing time to the joint df
joint_df = joint_df.withColumn('processing_time', F.lit(int(dt.datetime.now().timestamp())))

In [None]:
# Task 3.7 b
# viewing the output by writing to console 
query = joint_df \
    .writeStream \
    .outputMode("Append")\
    .format("console") \
    .trigger(processingTime='5 seconds') \
    .start()

In [None]:
# stop the query
query.stop()

In [None]:
# Task 3.7 b
# writing all the column in joint_df in parquet format
joint_sink = joint_df.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "process_memory_attack.parquet")\
        .option("checkpointLocation", "process_memory_attack.parquet/checkpoint")\
        .start()

In [None]:
# stopping the query
joint_sink.stop()