### Important Lib

In [None]:
%pyspark
from pyspark.sql.functions import *
from pyspark.ml.feature import *
from pyspark.ml import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.mllib.evaluation import MulticlassMetrics

CSV File read

In [1]:
%pyspark
# File location and type
file_location = "/user/vmehala/project/5560/"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
org_df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

CSV SChema

In [2]:
%pyspark
org_df.printSchema()

### Data Cleaning

In [4]:
%pyspark
cols_not_required = ("TotLen Fwd","Tot Bwd Pkts","Fwd Pkt Len Max","Fwd Pkt Len Min","Fwd Pkt Len Mean","Fwd Pkt Len Std","Bwd Pkt Len Max","Bwd Pkt Len Min","Bwd Pkt Len Mean","Bwd Pkt Len Std","Flow IAT Mean","Flow IAT Std","Flow IAT Min","Flow IAT Max","Bwd IAT Tot","Bwd IAT Mean","Bwd IAT Std","Bwd IAT Max","Bwd IAT Min","Pkt Len Min","Pkt Len Max","Pkt Len Mean","Pkt Len Std","Fwd IAT Mean","Fwd IAT Std","Fwd IAT Max","Fwd IAT Min")

raw_df=org_df.drop(*cols_not_required)

from pyspark.sql.functions import isnan, when, count, col


#### Converting All the String Col into Int

In [5]:
%pyspark
strIdx = StringIndexer(inputCol = "Label", handleInvalid='skip',outputCol = "lableIdx").fit(raw_df)
clean_df = strIdx.transform(raw_df)

In [6]:
%pyspark
lable_dict={'ddos':0.0, 'Benign':1.0}

In [7]:
%pyspark
#z.show(clean_df.summary())
#z.show(clean_df.select([count(when(isnan(c), c)).alias(c) for c in clean_df.columns]))

### Data sampling

In [9]:
%pyspark
v1_seed=55601
v2_seed=55602
sample_dateset_v1 = clean_df.sampleBy(col("Label"), fractions={"Benign": 0.015,"ddos": 0.015}, seed=v1_seed)

In [10]:
%pyspark

#print("Sample size % : ",((sample_dateset_v1.count()/raw_df.count())*100))

### Model 1 logistic regression

#### Model Data Pre

In [13]:
%pyspark
splits_v2 = sample_dateset_v1.randomSplit([0.7, 0.3])
train_v1 = splits_v2[0]
test_v1 = splits_v2[1].withColumnRenamed("stx_label", "stx_trueLabel")


In [14]:
%pyspark
assembler_m1 = VectorAssembler(inputCols =['Flow Duration', 'Tot Fwd Pkts', 'TotLen Fwd Pkts', 'TotLen Bwd Pkts', 'Flow Byts/s', 'Flow Pkts/s', 'Fwd IAT Tot', 'Fwd PSH Flags', 'Fwd Header Len', 'Bwd Header Len', 'Fwd Pkts/s', 'Bwd Pkts/s', 'Pkt Len Var', 'FIN Flag Cnt', 'SYN Flag Cnt', 'RST Flag Cnt', 'PSH Flag Cnt', 'ACK Flag Cnt', 'URG Flag Cnt', 'CWE Flag Count', 'ECE Flag Cnt', 'Down/Up Ratio', 'Pkt Size Avg', 'Fwd Seg Size Avg', 'Bwd Seg Size Avg', 'Subflow Fwd Pkts', 'Subflow Fwd Byts', 'Subflow Bwd Pkts', 'Subflow Bwd Byts', 'Init Fwd Win Byts', 'Init Bwd Win Byts', 'Fwd Act Data Pkts', 'Active Mean', 'Active Std', 'Active Max', 'Active Min', 'Idle Mean', 'Idle Std', 'Idle Max', 'Idle Min'], handleInvalid="skip",outputCol="features")

# assembler_m1 = VectorAssembler(inputCols =['Flow Duration','Fwd Seg Size Min', 'Src Port','Tot Fwd Pkts','Init Bwd Win Byts'], handleInvalid="skip",outputCol="features")

In [15]:
%pyspark
training_m1 = assembler_m1.transform(train_v1).select(col("features"),(col("lableIdx").cast("Int")))
lr_m1 = LogisticRegression(labelCol="lableIdx",featuresCol="features",maxIter=10,regParam=0.3)


In [16]:
%pyspark
pipeline_m1 = Pipeline(stages=[assembler_m1,lr_m1])

In [17]:
%pyspark
model_m1 = pipeline_m1.fit(train_v1)

In [18]:
%pyspark
preduction_m1 = model_m1.transform(test_v1)

### Model 1 logistic regression Result

In [19]:
%pyspark
lr_metric_m1 = MulticlassMetrics(preduction_m1['lableIdx','prediction'].rdd)

In [20]:
%pyspark
print("Accuracy:",lr_metric_m1.accuracy)
print("Precision:",lr_metric_m1.precision(1.0))
print("Recall:",lr_metric_m1.recall(1.0))
print("F1Score:",lr_metric_m1.fMeasure(1.0))

SVM


### LSVM Model 2

In [23]:
%pyspark
assembler_m2 = VectorAssembler(inputCols =['Flow Duration', 'Tot Fwd Pkts', 'TotLen Fwd Pkts', 'TotLen Bwd Pkts', 'Flow Byts/s', 'Flow Pkts/s', 'Fwd IAT Tot', 'Fwd PSH Flags', 'Fwd Header Len', 'Bwd Header Len', 'Fwd Pkts/s', 'Bwd Pkts/s', 'Pkt Len Var', 'FIN Flag Cnt', 'SYN Flag Cnt', 'RST Flag Cnt', 'PSH Flag Cnt', 'ACK Flag Cnt', 'URG Flag Cnt', 'CWE Flag Count', 'ECE Flag Cnt', 'Down/Up Ratio', 'Pkt Size Avg', 'Fwd Seg Size Avg', 'Bwd Seg Size Avg', 'Subflow Fwd Pkts', 'Subflow Fwd Byts', 'Subflow Bwd Pkts', 'Subflow Bwd Byts', 'Init Fwd Win Byts', 'Init Bwd Win Byts', 'Fwd Act Data Pkts', 'Active Mean', 'Active Std', 'Active Max', 'Active Min', 'Idle Mean', 'Idle Std', 'Idle Max', 'Idle Min'], handleInvalid="skip",outputCol="features")


In [24]:
%pyspark
training_m2 = assembler_m2.transform(train_v1).select(col("features"),(col("lableIdx").cast("Int")))
lsvc_m2 = LinearSVC(labelCol="lableIdx", maxIter=50)

In [25]:
%pyspark
pipeline_m2 = Pipeline(stages=[assembler_m2,lsvc_m2])
model_m2 = pipeline_m2.fit(train_v1)

In [26]:
%pyspark
preduction_m2 = model_m2.transform(test_v1)

#### LSVM Model Result 

In [27]:
%pyspark
lr_metric_m2 = MulticlassMetrics(preduction_m2['lableIdx','prediction'].rdd)

In [28]:

%pyspark
print("Accuracy:",lr_metric_m2.accuracy)
print("Precision:",lr_metric_m2.precision(1.0))
print("Recall:",lr_metric_m2.recall(1.0))
print("F1Score:",lr_metric_m2.fMeasure(1.0))