In [1]:
AWS_ACCESS_KEY = ""
AWS_SECRET_KEY = ""
sc._jsc.hadoopConfiguration().set(
"fs.s3n.awsAccessKeyId", AWS_ACCESS_KEY)
sc._jsc.hadoopConfiguration().set(
"fs.s3n.awsSecretAccessKey", AWS_SECRET_KEY)

In [2]:
import time
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, \
                                      NaiveBayes, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import regexp_replace
import sys
import logging
import mlflow
import mlflow.mleap
import pandas as pd
import numpy as np

In [3]:
#spark = SparkSession.builder.appName("Network Attacks Classifier").master("local").getOrCreate()
#spark.sparkContext.setLogLevel("ERROR")
start = time.time()
dataset=spark.read.csv("s3://sparkml/networktrafficdata.csv",header=True, inferSchema = True)
dataset=dataset.toDF(*dataset.columns)

In [4]:
dataset = dataset.withColumn("label", regexp_replace("label", "\.", ""))
print("Dataset sizes: {row} samples, {cols} features".format(row=dataset.count(), cols=len(dataset.columns)))

In [5]:
dataset=dataset.withColumnRenamed(" flag","flag")

In [6]:
#Data Transformation Pipelining
categorical_features = ["protocol_type", "service", "flag"]
indexers = [StringIndexer(inputCol=column, outputCol=column + "_num") for column in categorical_features]
indexers.append(StringIndexer(inputCol="label", outputCol="label_num"))
pipeline = Pipeline(stages=indexers)
dataset = pipeline.fit(dataset).transform(dataset)

exclude_list = categorical_features + ["label", "label_num"]
numerical_cols = [col for col in dataset.columns if col not in exclude_list]


In [7]:
#Feature Engineering
#df_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="features")
#dataset = df_assembler.transform(dataset)
# dataset.printSchema()

#dataset = dataset.select(["features","label_num"])
# dataset.printSchema()

#train_set, test_set = dataset.randomSplit([0.75, 0.25], seed=2019)
#print("Training set Count: " + str(train_set.count()))
#print("Test set Count: " + str(test_set.count()))

In [8]:
#Model Selection
# Logistic Regression model
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0.8, featuresCol="features",
                        labelCol="label_num", family="multinomial")

# Decision Tree model
dt = DecisionTreeClassifier(labelCol="label_num", featuresCol="features",  maxBins=70)

# Random Forest model
rf = RandomForestClassifier(labelCol="label_num", featuresCol="features", numTrees=20, maxBins=70)

classifiers = {"Logistic Regression": lr, "Decision Tree": dt,
               "Random Forest": rf}

metrics = ["accuracy", "weightedPrecision", "weightedRecall", "f1"]

In [9]:
# print("\nModels Evaluation:")
# print("{:-<30}".format(""))
# for c in classifiers:
# 	print(c)
# 	# fit the model
# 	model = classifiers[c].fit(train_set)
	
# 	# make predictions
# 	predictions = model.transform(test_set)
# 	predictions.cache()
	
# 	# evaluate performance
# 	evaluator = MulticlassClassificationEvaluator(labelCol="label_num", predictionCol="prediction")
	
# 	for m in metrics:
# 		evaluator.setMetricName(m)
# 		metric = evaluator.evaluate(predictions)
# 		print("{name} = {value:.2f}".format(name=m, value=metric))
	
# 	print("{:-<30}".format(""))

In [10]:
#Running the best model using mlflow
traindf, testdf = dataset.randomSplit([0.80, 0.20], seed=2020)
def trainmodel():
  df_assembler = VectorAssembler(inputCols=numerical_cols, outputCol="features")
  rfnew=RandomForestClassifier(labelCol="label_num", featuresCol="features", numTrees=30, maxDepth=10, maxBins=70)
  pl=Pipeline(stages=[df_assembler,rfnew])
  model=pl.fit(traindf)
  prediction=model.transform(testdf)
  evaluator = MulticlassClassificationEvaluator(labelCol="label_num", predictionCol="prediction")
  
  for m in metrics:
    evaluator.setMetricName(m)
    metric = evaluator.evaluate(prediction)
    mlflow.log_metric(m,metric)
    print("{name} = {value:.2f}".format(name=m, value=metric))
  
  
  mlflow.log_param("featuresCol", "features")
  mlflow.log_param("labelCol", "label_num")
  mlflow.log_param("numTrees", 30)
  mlflow.log_param("maxDepth", 10)
  mlflow.log_param("maxBins", 70)
  mlflow.mleap.log_model(spark_model=model, sample_input=testdf, artifact_path="spark_network_traffic_model")
  mlflow.end_run()
  

In [11]:
with mlflow.start_run() as run:
  trainmodel()

In [12]:
#import mlflow.sagemaker as mfs
#mlflow.get_artifact_uri('spark_network_traffic_model')
#mlflow.tracking.get_tracking_uri() 
#mfs.DEFAULT_IMAGE_NAME

In [13]:
mlflow.get_run('585995d94fbc469c820eda98ce80aa79')