In [1]:
import sys
sys.path.append('../../ml_utils')

import findspark
findspark.init()

import timeit
import data_utils
import spark_utils as su
import sentiment_classifier

import pandas as pd

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import LongType, DoubleType, IntegerType, StringType, BooleanType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics


Using TensorFlow backend.
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [2]:
import platform

if platform.system() == 'Darwin':
    spark_master = "spark://spark.home.net:7077"
    base_dir = '/Users/administrator/'
else:
    spark_master = "spark://lasvegas:7077"
    base_dir = '/home/administrator/'

In [3]:
conf = SparkConf().setMaster(spark_master).setAppName("Logistic Regression")

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
#sc.getConf().getAll()

In [4]:
# read the log statements based on a given schema
log_entries_df = sqlContext.read.format('com.databricks.spark.csv') \
    .schema(su.feature_schema) \
    .options(header = 'false', inferschema = 'false', delimiter = '\t') \
    .load('./../../shared/data/swissid_authorize_logs_april_to_sept_2019.csv')

#log_entries_df.show(2)

In [5]:
# filter 'bad' statements and select a subset of all features
filtered_df = su.clean_log_entries(log_entries_df, True, False, False)

#filtered_df.show(2)

In [6]:
filtered_df.printSchema()

root
 |-- label: string (nullable = true)
 |-- label_nr: double (nullable = true)
 |-- date_weekday: integer (nullable = true)
 |-- date_hour: integer (nullable = true)
 |-- src_software_name: string (nullable = true)
 |-- src_operating_system_name: string (nullable = true)
 |-- src_software_type: string (nullable = true)
 |-- src_hardware_type: string (nullable = true)
 |-- http_method: string (nullable = true)
 |-- response_status: string (nullable = true)
 |-- oidc_response_type: string (nullable = true)
 |-- oidc_acr_values: string (nullable = true)
 |-- oidc_client_id: string (nullable = true)
 |-- client_type: string (nullable = true)
 |-- ido_id: string (nullable = true)
 |-- oidc_scopes: string (nullable = true)
 |-- oidc_ui_locales: string (nullable = true)
 |-- loc_country: string (nullable = true)
 |-- label: string (nullable = true)



In [7]:
categorical_columns = filtered_df.schema.names
categorical_columns.remove('label')
categorical_columns.remove('label_nr')

In [8]:
pipeline_stages = []

for col in categorical_columns:
    string_indexer = StringIndexer(inputCol = col, outputCol = col + '_idx')
    encoder = OneHotEncoderEstimator(inputCols = [string_indexer.getOutputCol()], outputCols = [col + "_ohe"])
    pipeline_stages += [string_indexer, encoder]

In [9]:
#label_string_indexer = StringIndexer(inputCol = 'label', outputCol = 'label_idx')
#pipeline_stages += [label_string_indexer]

vector_assembler_inputs = [c + "_ohe" for c in categorical_columns]
vector_assembler = VectorAssembler(inputCols = vector_assembler_inputs, outputCol = "features")
pipeline_stages += [vector_assembler]

In [10]:
pipeline = Pipeline(stages = pipeline_stages)

pipeline_model = pipeline.fit(filtered_df)

#label_column = 'label_idx'
start = timeit.default_timer()
filtered_features_df = pipeline_model.transform(filtered_df).select(['features', 'label_nr', 'label'])
stop = timeit.default_timer()

print("Time: {} seconds\n".format(stop - start))

Time: 4.875693701000046 seconds



In [11]:
anomaly_sample = filtered_features_df.filter("label = 'anomaly'")
anomaly_sample_count = anomaly_sample.count()
#anomaly_sample.show(10)

In [12]:
normal_df = filtered_features_df.filter("label = 'normal'")
normal_count = normal_df.count()
normal_sample = normal_df.sample(False, anomaly_sample_count/normal_count)
#normal_sample.show(10)

In [13]:
sample_df = normal_sample.union(anomaly_sample)
sample_df.count()

45347

In [14]:
min_anomaly_sample = anomaly_sample.sample(False, 0.25)
min_normal_sample = normal_sample.sample(False, 0.25)
min_sample_df = min_normal_sample.union(min_anomaly_sample)
min_sample_df.count()

11268

In [15]:
train_df, test_df = min_sample_df.randomSplit([0.8, 0.2], seed=12345)
train_df.count(), test_df.count()

(9005, 2263)

In [16]:
from pyspark.ml.classification import LogisticRegression

logistic_regression = LogisticRegression(featuresCol = 'features', labelCol = 'label_nr')

start = timeit.default_timer()
logistic_regression_model = logistic_regression.fit(train_df)
#logistic_regression_model = logistic_regression.train(train_df)
stop = timeit.default_timer()

run_time = stop - start

print("Time: {} seconds\n".format(run_time))
print("Time: {} minutes\n".format(run_time/60))

Time: 1391.4850676359997 seconds

Time: 23.19141779393333 minutes



In [17]:
test_logistic_regression_predictions_df = logistic_regression_model.transform(test_df)

test_logistic_regression_predictions_df.select("probability","rawPrediction", "prediction", "label_nr", "features").show()

+--------------------+--------------------+----------+--------+--------------------+
|         probability|       rawPrediction|prediction|label_nr|            features|
+--------------------+--------------------+----------+--------+--------------------+
|[0.99999999276373...|[18.7441609031566...|       0.0|     0.0|(705567,[0,7,32,1...|
|[0.99999999815000...|[20.1080853517952...|       0.0|     0.0|(705567,[0,9,30,1...|
|[0.99999999728095...|[19.7229862597264...|       0.0|     0.0|(705567,[0,9,33,1...|
|[0.99999999157593...|[18.5921736132090...|       0.0|     0.0|(705567,[0,11,33,...|
|[0.99999977519002...|[15.3080101412597...|       0.0|     0.0|(705567,[0,12,31,...|
|[0.99999999992509...|[23.3148255108057...|       0.0|     0.0|(705567,[0,12,32,...|
|[0.99999998857606...|[18.2875553934923...|       0.0|     0.0|(705567,[0,20,31,...|
|[0.99999999461817...|[19.0402389856457...|       0.0|     0.0|(705567,[1,6,32,1...|
|[0.99999998242313...|[17.8566823076974...|       0.0|     0.0|(7

In [18]:
test_summary = logistic_regression_model.evaluate(test_df)

roc = test_summary.roc.toPandas()
roc

Unnamed: 0,FPR,TPR
0,0.000000,0.000000
1,0.000000,0.050347
2,0.000000,0.114583
3,0.000000,0.214410
4,0.000000,0.276910
...,...,...
113,0.966697,1.000000
114,0.978398,1.000000
115,0.990999,1.000000
116,1.000000,1.000000


In [19]:
evaluator_roc_area = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction", 
                                                   labelCol = 'label_nr',
                                                   metricName = "areaUnderROC")

print("Area Under ROC Curve on Test Data = %g" % evaluator_roc_area.evaluate(test_logistic_regression_predictions_df))

Area Under ROC Curve on Test Data = 1


In [20]:
test_logistic_regression_predictions_df.select("probability","rawPrediction", "prediction", "label_nr", "features").show()

+--------------------+--------------------+----------+--------+--------------------+
|         probability|       rawPrediction|prediction|label_nr|            features|
+--------------------+--------------------+----------+--------+--------------------+
|[0.99999999276373...|[18.7441609031566...|       0.0|     0.0|(705567,[0,7,32,1...|
|[0.99999999815000...|[20.1080853517952...|       0.0|     0.0|(705567,[0,9,30,1...|
|[0.99999999728095...|[19.7229862597264...|       0.0|     0.0|(705567,[0,9,33,1...|
|[0.99999999157593...|[18.5921736132090...|       0.0|     0.0|(705567,[0,11,33,...|
|[0.99999977519002...|[15.3080101412597...|       0.0|     0.0|(705567,[0,12,31,...|
|[0.99999999992509...|[23.3148255108057...|       0.0|     0.0|(705567,[0,12,32,...|
|[0.99999998857606...|[18.2875553934923...|       0.0|     0.0|(705567,[0,20,31,...|
|[0.99999999461817...|[19.0402389856457...|       0.0|     0.0|(705567,[1,6,32,1...|
|[0.99999998242313...|[17.8566823076974...|       0.0|     0.0|(7

In [21]:
predictions_and_label_rdd = test_logistic_regression_predictions_df.select("prediction", "label_nr").rdd

metrics = MulticlassMetrics(predictions_and_label_rdd)

print(metrics.confusionMatrix())

DenseMatrix([[1111.,    0.],
             [   0., 1152.]])


In [22]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [23]:
rows = metrics.confusionMatrix().toArray().tolist()
cm_df = spark.createDataFrame(rows, ['TP','FP'])
cm_pdf = cm_df.toPandas()
cm_pdf

Unnamed: 0,TP,FP
0,1111.0,0.0
1,0.0,1152.0


In [24]:
# Print the intercept for logistic regression
print("Intercept: " + str(logistic_regression_model.intercept))

Intercept: 0.3765047064196512


In [25]:
# Print the coefficients for logistic regression
#print("Coefficients: " + str(logistic_regression_model.coefficients))

In [26]:
training_summary = logistic_regression_model.summary

In [27]:
# Obtain the objective per iteration
objectiveHistory = training_summary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

objectiveHistory:
0.693146681114956
0.12987922812990033
0.0807684729363865
0.06860689523050088
0.029863472250343574
0.01376323839005431
0.006115840715742139
0.00309332518632462
0.0015341108150373937
0.0008165979899110044
0.00030901997485115906
0.00019316961581520428
0.00010355024685040142
3.339964099308997e-05
2.255809259750256e-05
1.0421109021290287e-05
5.6523930444413356e-06
2.917628493767206e-06
2.3913362758669004e-06
1.1312344590865928e-06
6.731059113930506e-07
2.7122776151139885e-07
1.5411388921239668e-07
7.831104962653362e-08
4.025636293694728e-08
1.997064595309959e-08
1.7064572105254703e-08
8.880672756181992e-09
4.605132817119638e-09
2.3279540440762627e-09
1.1846634474685746e-09


In [28]:
# Obtain the receiver-operating characteristic as a dataframe and areaUnderROC.
training_summary.roc.show()
print("areaUnderROC: " + str(training_summary.areaUnderROC))

+---+-------------------+
|FPR|                TPR|
+---+-------------------+
|0.0|                0.0|
|0.0|0.12868870645662303|
|0.0|0.27579321056134903|
|0.0| 0.4018193920568005|
|0.0| 0.4859108054138008|
|0.0| 0.6035056578655424|
|0.0| 0.6538717550477036|
|0.0| 0.7299755935211892|
|0.0| 0.7399600621255824|
|0.0| 0.7495007765697803|
|0.0| 0.7619258930552474|
|0.0| 0.7810073219436432|
|0.0| 0.7949855779897936|
|0.0| 0.8105169735966274|
|0.0|  0.816729531839361|
|0.0|  0.827379631684047|
|0.0| 0.8338140670068782|
|0.0| 0.8446860439316619|
|0.0| 0.8504548480142001|
|0.0| 0.8619924561792767|
+---+-------------------+
only showing top 20 rows

areaUnderROC: 0.9999966456999896


In [29]:
# Set the model threshold to maximize F-Measure
fMeasure = training_summary.fMeasureByThreshold
maxFMeasure = fMeasure.groupBy().max('F-Measure').select('max(F-Measure)').head()
bestThreshold = fMeasure.where(fMeasure['F-Measure'] == maxFMeasure['max(F-Measure)']).select('threshold').head()['threshold']

fMeasure.show()
#logistic_regression.setThreshold(bestThreshold)

+------------------+-------------------+
|         threshold|          F-Measure|
+------------------+-------------------+
|0.9999999999999873|0.22803223904069195|
|0.9999999999999778|0.43234782608695654|
|0.9999999999999589| 0.5732826843937955|
|0.9999999999999509| 0.6540241899357921|
|0.9999999999999343| 0.7527328075273281|
|0.9999999999999245| 0.7907163938824792|
|0.9999999999998912| 0.8439143260228293|
|0.9999999999998446| 0.8505483295077786|
|0.9999999999997502| 0.8568167406467977|
|0.9999999999996954|   0.86487847878101|
|0.9999999999996054| 0.8770399900336365|
|0.9999999999995162| 0.8857849196538937|
| 0.999999999999371| 0.8953431372549019|
|0.9999999999992826| 0.8991206643869077|
|0.9999999999991058| 0.9055366682855756|
|0.9999999999989218| 0.9093768905021175|
|0.9999999999984943| 0.9158046668270388|
|0.9999999999980054| 0.9191846522781774|
|0.9999999999954687| 0.9258817921830315|
|0.9999999999909115| 0.9307236061684461|
+------------------+-------------------+
only showing top

In [30]:
maxFMeasure, bestThreshold

(Row(max(F-Measure)=0.9995560488346282), 0.9999999299699358)

In [31]:
# (16) Stop the Spark Context
sc.stop()