## 1. Load dataset

In [47]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
import os

sqlCtx = SQLContext(sc)

df_train = sqlCtx.read.load(os.path.join('..', 'data', 'processed', 'train_balanced.csv'),
                     format='com.databricks.spark.csv', header='true',
                     inferSchema='true')
df_test = sqlCtx.read.load(os.path.join('..', 'data', 'processed', 'test.csv'),
                     format='com.databricks.spark.csv', header='true',
                     inferSchema='true')

In [48]:
FEATURES = [
    # 'UserID',
    # 'UUID',
    # 'Version',
    # 'TimeStemp',
    'GyroscopeStat_x_MEAN',
    'GyroscopeStat_z_MEAN',
    'GyroscopeStat_COV_z_x',
    'GyroscopeStat_COV_z_y',
    'MagneticField_x_MEAN',
    'MagneticField_z_MEAN',
    'MagneticField_COV_z_x',
    'MagneticField_COV_z_y',
    'Pressure_MEAN',
    'LinearAcceleration_COV_z_x',
    'LinearAcceleration_COV_z_y',
    'LinearAcceleration_x_MEAN',
    'LinearAcceleration_z_MEAN',
    # 'attack'
    ]

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=FEATURES, outputCol='features')
X_train = assembler.transform(df_train)
X_test = assembler.transform(df_test)

X_train.columns
#X_train = df_train[FEATURES]
#X_test = df_test[FEATURES]

#y_train = df_train['attack']
#y_test = df_test['attack']

['UserID',
 'UUID',
 'Version',
 'TimeStemp',
 'GyroscopeStat_x_MEAN',
 'GyroscopeStat_z_MEAN',
 'GyroscopeStat_COV_z_x',
 'GyroscopeStat_COV_z_y',
 'MagneticField_x_MEAN',
 'MagneticField_z_MEAN',
 'MagneticField_COV_z_x',
 'MagneticField_COV_z_y',
 'Pressure_MEAN',
 'LinearAcceleration_COV_z_x',
 'LinearAcceleration_COV_z_y',
 'LinearAcceleration_x_MEAN',
 'LinearAcceleration_z_MEAN',
 'attack',
 'features']

In [49]:
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(labelCol='attack', featuresCol='features',
                           maxDepth=5, minInstancesPerNode=20,
                           impurity='gini')

In [50]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[dt])
model = pipeline.fit(X_train)

In [51]:
predictions = model.transform(X_test)

In [52]:
predictions.select('prediction', 'attack').show(10)

+----------+------+
|prediction|attack|
+----------+------+
|       1.0|     1|
|       1.0|     1|
|       1.0|     1|
|       0.0|     1|
|       0.0|     1|
|       1.0|     1|
|       1.0|     1|
|       1.0|     1|
|       0.0|     0|
|       0.0|     0|
+----------+------+
only showing top 10 rows



In [53]:
predictions.select('prediction', 'attack').write.save(
    path=os.path.join('..', 'output', 'dt_predictions'),
    format='com.databricks.spark.csv', header='true')

In [54]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='attack',
                                            predictionCol='prediction',
                                            metricName='accuracy')
accuracy = evaluator.evaluate(predictions)
print('Accuracy:', accuracy)

Accuracy: 0.9006933115823818


In [55]:
predictions = predictions.select('prediction', predictions.attack.cast('double').alias('attack'))
predictions.rdd.take(2)

[Row(prediction=1.0, attack=1.0), Row(prediction=1.0, attack=1.0)]

In [56]:
from pyspark.mllib.evaluation import MulticlassMetrics

metrics = MulticlassMetrics(predictions.rdd.map(tuple))
metrics.confusionMatrix().toArray().transpose()

array([[4.411e+03, 2.000e+00],
       [4.850e+02, 6.000e+00]])