In [1]:
import findspark
findspark.init()
import json
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import VectorAssembler, StandardScaler 
from pyspark.sql.functions import from_utc_timestamp

In [2]:
# Spark session & context
spark = (SparkSession
         .builder
         .master('local')
         .appName('OutlierDetection')
         # Add kafka package
         .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1")
         .config("spark.mongodb.input.uri","mongodb://127.0.0.1/RizkyApp.data")
         .config("spark.mongodb.output.uri","mongodb://127.0.0.1/RizkyApp.data")
         .getOrCreate())
spark.conf.set("spark.sql.caseSensitive", "true")
sc = spark.sparkContext

In [3]:
#normaliza using data benign.csv
df_train = spark.read.csv('data/Benign.csv', header="true", inferSchema =True)
df_train = df_train.select([F.col(column).cast('double') for column in df_train.columns])

In [4]:
df_train = df_train.drop('_c0')

In [5]:
vector_assembler = VectorAssembler(inputCols=df_train.columns, outputCol="SS_features")
df_train = vector_assembler.transform(df_train)
scaler = StandardScaler(inputCol="SS_features", outputCol="scaledFeatures", withStd=True, withMean=True)


In [6]:
normalize_df_benign = scaler.fit(df_train).transform(df_train)

In [7]:
import numpy as np
import pandas as pd

In [8]:
benign_resample_1400 = pd.read_csv('data/Benign_resample_1400.csv')
benign_resample_1400.drop(['Label'], axis=1, inplace=True)

In [9]:
from sklearn.neighbors import LocalOutlierFactor

clf = LocalOutlierFactor(n_neighbors=2, novelty=True, contamination=0.1)
clf.fit(benign_resample_1400)

LocalOutlierFactor(contamination=0.1, n_neighbors=2, novelty=True)

In [10]:
jsonFormatSchema = spark.read.json("schema/schema.json")

In [11]:
rawData = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092") # kafka server
  .option("subscribe", "netflowmeter") # topic
  .option("startingOffsets", "latest") 
  .load())

In [12]:
parsedData = rawData.selectExpr("cast (value as string) as json").select(F.from_json("json",jsonFormatSchema.schema).alias("data")).select("data.*")

In [13]:
featureExtraction = parsedData.select(F.col('flow_id'), F.col('src_ip'), F.col('src_port'), F.col('dst_ip'), F.col('dst_port'), F.col('protocol'), F.col('timestamp'),F.col("extractFeature.*"))

In [14]:
def flatten_df(nested_df):
    flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
    nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']

    flat_df = nested_df.select(flat_cols +
                               [F.col(nc+'.'+c).alias(nc+'_'+c)
                                for nc in nested_cols
                                for c in nested_df.select(nc+'.*').columns])
    return flat_df

In [15]:
data_flat = flatten_df(featureExtraction)


In [16]:
duplicate_columns = ['ActivePacket_max', 'ActivePacket_mean', 'ActivePacket_min', 'ActivePacket_std', 'IdlePacket_max', 'IdlePacket_mean', 'IdlePacket_min','IdlePacket_std',]
data_flat = data_flat.drop(*duplicate_columns)

In [17]:
vector_assembler_1 = VectorAssembler(inputCols=df_train.columns, outputCol="SS_features")
data_flat = vector_assembler.transform(data_flat)

In [18]:
#normalize data sensor
data_flat = scaler.fit(df_train).transform(data_flat)

In [19]:
data_flat.printSchema()

root
 |-- flow_id: string (nullable = true)
 |-- src_ip: string (nullable = true)
 |-- src_port: long (nullable = true)
 |-- dst_ip: string (nullable = true)
 |-- dst_port: long (nullable = true)
 |-- protocol: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- average_packet_size: double (nullable = true)
 |-- bwd_IAT_total: double (nullable = true)
 |-- bwd_PSH_flags: double (nullable = true)
 |-- bwd_URG_flags: double (nullable = true)
 |-- bwd_header_length: double (nullable = true)
 |-- bwd_packets_per_second: double (nullable = true)
 |-- bwd_segment_size_avg: double (nullable = true)
 |-- bwd_win_bytes: long (nullable = true)
 |-- download_upload_ratio: double (nullable = true)
 |-- flow_bytes_per_second: double (nullable = true)
 |-- flow_duration: long (nullable = true)
 |-- flow_pkts_per_second: double (nullable = true)
 |-- fwd_IAT_total: double (nullable = true)
 |-- fwd_PSH_flags: double (nullable = true)
 |-- fwd_URG_flags: double (nullable = true)
 |-- 

In [20]:
def predict_udf(x):
  newlist = [x]
  z = clf.predict(newlist)
  label = int(z[0])
  if  label == 1:
      predict = 'Benign'
      return predict
  else:
      predict = 'Anomaly'
      return predict

In [21]:
label_udf = F.udf(predict_udf, StringType())
data = data_flat.withColumn('Label', label_udf(data_flat['scaledFeatures']))

In [22]:
from datetime import datetime 
from decimal import Decimal

def date_udf(x):
  dec = Decimal(x)
  c = datetime.fromtimestamp(int(dec)/1000).strftime("%Y-%m-%d %H:%M:%S")
  return c

In [23]:
datetime_udf = F.udf(date_udf, StringType())
pred_df= data.withColumn('datetime', datetime_udf(data['timestamp']))

In [24]:
kolom = ['SS_features','scaledFeatures','timestamp']
pred_df = pred_df.drop(*kolom)

In [25]:
# pred_df_1 = data.select(F.col('flow_id'), F.col('src_ip'), F.col('src_port'), F.col('dst_ip'), F.col('dst_port'), F.col('protocol'), F.col('timestamp'), F.col('Label'))

In [26]:
pred_df.printSchema()

root
 |-- flow_id: string (nullable = true)
 |-- src_ip: string (nullable = true)
 |-- src_port: long (nullable = true)
 |-- dst_ip: string (nullable = true)
 |-- dst_port: long (nullable = true)
 |-- protocol: long (nullable = true)
 |-- average_packet_size: double (nullable = true)
 |-- bwd_IAT_total: double (nullable = true)
 |-- bwd_PSH_flags: double (nullable = true)
 |-- bwd_URG_flags: double (nullable = true)
 |-- bwd_header_length: double (nullable = true)
 |-- bwd_packets_per_second: double (nullable = true)
 |-- bwd_segment_size_avg: double (nullable = true)
 |-- bwd_win_bytes: long (nullable = true)
 |-- download_upload_ratio: double (nullable = true)
 |-- flow_bytes_per_second: double (nullable = true)
 |-- flow_duration: long (nullable = true)
 |-- flow_pkts_per_second: double (nullable = true)
 |-- fwd_IAT_total: double (nullable = true)
 |-- fwd_PSH_flags: double (nullable = true)
 |-- fwd_URG_flags: double (nullable = true)
 |-- fwd_act_data_pkts: long (nullable = true)

In [56]:
def write_mongo_row(df, epoch_id):
    df.write.format("mongo").mode("append").option("database","RizkyApp").option("collection", "data").save()
    pass

query=pred_df.writeStream.foreachBatch(write_mongo_row).start()
query.awaitTermination()

StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/opt/spark/python/pyspark/sql/utils.py", line 196, in call
    raise e
  File "/opt/spark/python/pyspark/sql/utils.py", line 193, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "<ipython-input-56-c570824c13f6>", line 2, in write_mongo_row
    df.write.format("mongo").mode("append").option("database","RizkyApp").option("collection", "data").save()
  File "/opt/spark/python/pyspark/sql/readwriter.py", line 1107, in save
    self._jwrite.save()
  File "/opt/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/opt/spark/python/pyspark/sql/utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.IllegalArgumentException: requirement failed: Invalid uri: 'mongodb+srv://admin:admin@cluster0.px47h.mongodb.net/RizkyApp?retryWrites=true&w=majority'

=== Streaming Query ===
Identifier: [id = 8111595e-405b-4253-855d-da1e6a996e17, runId = a7769eaf-5d91-4f5f-b185-491c3f02dda1]
Current Committed Offsets: {}
Current Available Offsets: {KafkaV2[Subscribe[netflowmeter]]: {"netflowmeter":{"0":14599}}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [flow_id#3691, src_ip#3694, src_port#3695L, dst_ip#3688, dst_port#3689L, protocol#3693L, average_packet_size#3709, bwd_IAT_total#3711, bwd_PSH_flags#3712, bwd_URG_flags#3713, bwd_header_length#3715, bwd_packets_per_second#3717, bwd_segment_size_avg#3718, bwd_win_bytes#3720L, download_upload_ratio#3721, flow_bytes_per_second#3724, flow_duration#3725L, flow_pkts_per_second#3726, fwd_IAT_total#3728, fwd_PSH_flags#3729, fwd_URG_flags#3730, fwd_act_data_pkts#3731L, fwd_header_length#3733, fwd_packets_per_second#3735, ... 60 more fields]
+- Project [flow_id#3691, src_ip#3694, src_port#3695L, dst_ip#3688, dst_port#3689L, protocol#3693L, timestamp#3696, average_packet_size#3709, bwd_IAT_total#3711, bwd_PSH_flags#3712, bwd_URG_flags#3713, bwd_header_length#3715, bwd_packets_per_second#3717, bwd_segment_size_avg#3718, bwd_win_bytes#3720L, download_upload_ratio#3721, flow_bytes_per_second#3724, flow_duration#3725L, flow_pkts_per_second#3726, fwd_IAT_total#3728, fwd_PSH_flags#3729, fwd_URG_flags#3730, fwd_act_data_pkts#3731L, fwd_header_length#3733, ... 63 more fields]
   +- Project [flow_id#3691, src_ip#3694, src_port#3695L, dst_ip#3688, dst_port#3689L, protocol#3693L, timestamp#3696, average_packet_size#3709, bwd_IAT_total#3711, bwd_PSH_flags#3712, bwd_URG_flags#3713, bwd_header_length#3715, bwd_packets_per_second#3717, bwd_segment_size_avg#3718, bwd_win_bytes#3720L, download_upload_ratio#3721, flow_bytes_per_second#3724, flow_duration#3725L, flow_pkts_per_second#3726, fwd_IAT_total#3728, fwd_PSH_flags#3729, fwd_URG_flags#3730, fwd_act_data_pkts#3731L, fwd_header_length#3733, ... 62 more fields]
      +- Project [flow_id#3691, src_ip#3694, src_port#3695L, dst_ip#3688, dst_port#3689L, protocol#3693L, timestamp#3696, average_packet_size#3709, bwd_IAT_total#3711, bwd_PSH_flags#3712, bwd_URG_flags#3713, bwd_header_length#3715, bwd_packets_per_second#3717, bwd_segment_size_avg#3718, bwd_win_bytes#3720L, download_upload_ratio#3721, flow_bytes_per_second#3724, flow_duration#3725L, flow_pkts_per_second#3726, fwd_IAT_total#3728, fwd_PSH_flags#3729, fwd_URG_flags#3730, fwd_act_data_pkts#3731L, fwd_header_length#3733, ... 61 more fields]
         +- Project [flow_id#3691, src_ip#3694, src_port#3695L, dst_ip#3688, dst_port#3689L, protocol#3693L, timestamp#3696, average_packet_size#3709, bwd_IAT_total#3711, bwd_PSH_flags#3712, bwd_URG_flags#3713, bwd_header_length#3715, bwd_packets_per_second#3717, bwd_segment_size_avg#3718, bwd_win_bytes#3720L, download_upload_ratio#3721, flow_bytes_per_second#3724, flow_duration#3725L, flow_pkts_per_second#3726, fwd_IAT_total#3728, fwd_PSH_flags#3729, fwd_URG_flags#3730, fwd_act_data_pkts#3731L, fwd_header_length#3733, ... 60 more fields]
            +- Project [flow_id#3691, src_ip#3694, src_port#3695L, dst_ip#3688, dst_port#3689L, protocol#3693L, timestamp#3696, average_packet_size#3709, bwd_IAT_total#3711, bwd_PSH_flags#3712, bwd_URG_flags#3713, bwd_header_length#3715, bwd_packets_per_second#3717, bwd_segment_size_avg#3718, bwd_win_bytes#3720L, download_upload_ratio#3721, flow_bytes_per_second#3724, flow_duration#3725L, flow_pkts_per_second#3726, fwd_IAT_total#3728, fwd_PSH_flags#3729, fwd_URG_flags#3730, fwd_act_data_pkts#3731L, fwd_header_length#3733, ... 59 more fields]
               +- Project [flow_id#3691, src_ip#3694, src_port#3695L, dst_ip#3688, dst_port#3689L, protocol#3693L, timestamp#3696, average_packet_size#3709, bwd_IAT_total#3711, bwd_PSH_flags#3712, bwd_URG_flags#3713, bwd_header_length#3715, bwd_packets_per_second#3717, bwd_segment_size_avg#3718, bwd_win_bytes#3720L, download_upload_ratio#3721, flow_bytes_per_second#3724, flow_duration#3725L, flow_pkts_per_second#3726, fwd_IAT_total#3728, fwd_PSH_flags#3729, fwd_URG_flags#3730, fwd_act_data_pkts#3731L, fwd_header_length#3733, ... 67 more fields]
                  +- Project [flow_id#3691, src_ip#3694, src_port#3695L, dst_ip#3688, dst_port#3689L, protocol#3693L, timestamp#3696, extractFeature#3690.ActivePacket AS ActivePacket#3706, extractFeature#3690.IdlePacket AS IdlePacket#3707, extractFeature#3690.activePacket AS activePacket#3708, extractFeature#3690.average_packet_size AS average_packet_size#3709, extractFeature#3690.bwd_IAT AS bwd_IAT#3710, extractFeature#3690.bwd_IAT_total AS bwd_IAT_total#3711, extractFeature#3690.bwd_PSH_flags AS bwd_PSH_flags#3712, extractFeature#3690.bwd_URG_flags AS bwd_URG_flags#3713, extractFeature#3690.bwd_bulk AS bwd_bulk#3714, extractFeature#3690.bwd_header_length AS bwd_header_length#3715, extractFeature#3690.bwd_packet_length AS bwd_packet_length#3716, extractFeature#3690.bwd_packets_per_second AS bwd_packets_per_second#3717, extractFeature#3690.bwd_segment_size_avg AS bwd_segment_size_avg#3718, extractFeature#3690.bwd_subflow AS bwd_subflow#3719, extractFeature#3690.bwd_win_bytes AS bwd_win_bytes#3720L, extractFeature#3690.download_upload_ratio AS download_upload_ratio#3721, extractFeature#3690.flagCount AS flagCount#3722, ... 21 more fields]
                     +- Project [data#3686.dst_ip AS dst_ip#3688, data#3686.dst_port AS dst_port#3689L, data#3686.extractFeature AS extractFeature#3690, data#3686.flow_id AS flow_id#3691, data#3686.label AS label#3692, data#3686.protocol AS protocol#3693L, data#3686.src_ip AS src_ip#3694, data#3686.src_port AS src_port#3695L, data#3686.timestamp AS timestamp#3696]
                        +- Project [from_json(StructField(dst_ip,StringType,true), StructField(dst_port,LongType,true), StructField(extractFeature,StructType(StructField(ActivePacket,StructType(StructField(max,DoubleType,true), StructField(mean,DoubleType,true), StructField(min,DoubleType,true), StructField(std,DoubleType,true)),true), StructField(IdlePacket,StructType(StructField(max,DoubleType,true), StructField(mean,DoubleType,true), StructField(min,DoubleType,true), StructField(std,DoubleType,true)),true), StructField(activePacket,StructType(StructField(max,DoubleType,true), StructField(mean,DoubleType,true), StructField(min,DoubleType,true), StructField(std,DoubleType,true)),true), StructField(average_packet_size,DoubleType,true), StructField(bwd_IAT,StructType(StructField(max,DoubleType,true), StructField(mean,DoubleType,true), StructField(min,DoubleType,true), StructField(std,DoubleType,true)),true), StructField(bwd_IAT_total,DoubleType,true), StructField(bwd_PSH_flags,DoubleType,true), StructField(bwd_URG_flags,DoubleType,true), StructField(bwd_bulk,StructType(StructField(bulk_rate,LongType,true), StructField(bytes_per_bulk,LongType,true), StructField(packet_per_bulk,LongType,true)),true), StructField(bwd_header_length,DoubleType,true), StructField(bwd_packet_length,StructType(StructField(max,DoubleType,true), StructField(mean,DoubleType,true), StructField(min,DoubleType,true), StructField(std,DoubleType,true)),true), StructField(bwd_packets_per_second,DoubleType,true), StructField(bwd_segment_size_avg,DoubleType,true), StructField(bwd_subflow,StructType(StructField(subflow_bytes,LongType,true), StructField(subflow_packets,LongType,true)),true), StructField(bwd_win_bytes,LongType,true), StructField(download_upload_ratio,DoubleType,true), StructField(flagCount,StructType(StructField(ack,LongType,true), StructField(cwr,LongType,true), StructField(ece,LongType,true), StructField(fin,LongType,true), StructField(psh,LongType,true), StructField(rst,LongType,true), StructField(syn,LongType,true), StructField(ugr,LongType,true)),true), StructField(flow_IAT,StructType(StructField(max,DoubleType,true), StructField(mean,DoubleType,true), StructField(min,DoubleType,true), StructField(std,DoubleType,true)),true), StructField(flow_bytes_per_second,DoubleType,true), StructField(flow_duration,LongType,true), StructField(flow_pkts_per_second,DoubleType,true), StructField(fwd_IAT,StructType(StructField(max,DoubleType,true), StructField(mean,DoubleType,true), StructField(min,DoubleType,true), StructField(std,DoubleType,true)),true), StructField(fwd_IAT_total,DoubleType,true), StructField(fwd_PSH_flags,DoubleType,true), StructField(fwd_URG_flags,DoubleType,true), StructField(fwd_act_data_pkts,LongType,true), StructField(fwd_bulk,StructType(StructField(bulk_rate,LongType,true), StructField(bytes_per_bulk,LongType,true), StructField(packet_per_bulk,LongType,true)),true), StructField(fwd_header_length,DoubleType,true), StructField(fwd_packet_length,StructType(StructField(max,DoubleType,true), StructField(mean,DoubleType,true), StructField(min,DoubleType,true), StructField(std,DoubleType,true)),true), StructField(fwd_packets_per_second,DoubleType,true), StructField(fwd_seg_size_min,LongType,true), StructField(fwd_segment_size_avg,DoubleType,true), StructField(fwd_subflow,StructType(StructField(subflow_bytes,LongType,true), StructField(subflow_packets,LongType,true)),true), StructField(fwd_win_bytes,LongType,true), StructField(idlePacket,StructType(StructField(max,DoubleType,true), StructField(mean,DoubleType,true), StructField(min,DoubleType,true), StructField(std,DoubleType,true)),true), StructField(packet_lenght,StructType(StructField(max,DoubleType,true), StructField(mean,DoubleType,true), StructField(min,DoubleType,true), StructField(std,DoubleType,true)),true), StructField(packet_length_variance,DoubleType,true), StructField(totalPacketFeature,StructType(StructField(backward,LongType,true), StructField(forward,LongType,true), StructField(length_of_backward,DoubleType,true), StructField(length_of_forward,DoubleType,true)),true)),true), StructField(flow_id,StringType,true), StructField(label,StringType,true), StructField(protocol,LongType,true), StructField(src_ip,StringType,true), StructField(src_port,LongType,true), StructField(timestamp,StringType,true), json#3684, Some(Asia/Jakarta)) AS data#3686]
                           +- Project [cast(value#3671 as string) AS json#3684]
                              +- StreamingDataSourceV2Relation [key#3670, value#3671, topic#3672, partition#3673, offset#3674L, timestamp#3675, timestampType#3676], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@aa84d55, KafkaV2[Subscribe[netflowmeter]]
