In [1]:
import numpy as np
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import DenseVector
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.sql.types import StringType, IntegerType, LongType
from pyspark.sql import SparkSession
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
import time

In [2]:
conf = pyspark.SparkConf()
pyspark.SparkContext.setSystemProperty('spark.executor.memory', '6g')
sc = pyspark.SparkContext('spark://192.168.2.84:7077','Run supervised learning - decision trees')
spark = SparkSession.builder.getOrCreate()

In [3]:
columns = ['labels']
for i in range(1, 1025):
    columns.append("f" + str(i))
df = spark.read.format('csv').options(header='true').load('/MLInput_u/MLInput_u.csv') 

df.printSchema()
df.show()

root
 |-- labels: string (nullable = true)
 |-- f1: string (nullable = true)
 |-- f2: string (nullable = true)
 |-- f3: string (nullable = true)
 |-- f4: string (nullable = true)
 |-- f5: string (nullable = true)
 |-- f6: string (nullable = true)
 |-- f7: string (nullable = true)
 |-- f8: string (nullable = true)
 |-- f9: string (nullable = true)
 |-- f10: string (nullable = true)
 |-- f11: string (nullable = true)
 |-- f12: string (nullable = true)
 |-- f13: string (nullable = true)
 |-- f14: string (nullable = true)
 |-- f15: string (nullable = true)
 |-- f16: string (nullable = true)
 |-- f17: string (nullable = true)
 |-- f18: string (nullable = true)
 |-- f19: string (nullable = true)
 |-- f20: string (nullable = true)
 |-- f21: string (nullable = true)
 |-- f22: string (nullable = true)
 |-- f23: string (nullable = true)
 |-- f24: string (nullable = true)
 |-- f25: string (nullable = true)
 |-- f26: string (nullable = true)
 |-- f27: string (nullable = true)
 |-- f28: string (nul

+------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+-

In [4]:
def convertColumn(df, names, newType):
    for name in names: 
        df = df.withColumn(name, df[name].cast(newType))
    return df 

df = convertColumn(df, columns, LongType())

In [9]:
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
df_new = spark.createDataFrame(input_data, ["label", "features"])

standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
scaler = standardScaler.fit(df_new)
scaled_df = scaler.transform(df_new)

scaled_df.drop('features')

scaled_df.take(2)
scaled_df.printSchema()
scaled_df.show()

# #Not Scaling
# scaled_df = df_new
# scaled_df.printSchema()
# scaled_df.show()

root
 |-- label: long (nullable = true)
 |-- features: vector (nullable = true)
 |-- features_scaled: vector (nullable = true)

+-----+--------------------+--------------------+
|label|            features|     features_scaled|
+-----+--------------------+--------------------+
|    0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|    1|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|    1|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,0.0,...|[0.0,0.0,0.0,0.0,...|
|    0|[0.0,0.0,0.0,1.0,...|[0.0,0.0,0.0,7.43...|
|    0|[0.0,0.0,0.0,0.

In [6]:
splits = scaled_df.randomSplit([0.7, 0.3], 1234)
trainData = splits[0]
testData = splits[1]

rddTrain = trainData.rdd.map(lambda row: LabeledPoint(row['label'], row['features_scaled'].toArray()))
rddTest = testData.rdd.map(lambda row: LabeledPoint(row['label'], row['features_scaled'].toArray()))

maxDepths = [16, 18, 20]
maxBins = [32]

bestMD = 0
bestMB = 0
lowestError = 1

for mD in maxDepths:
    for mB in maxBins:
        print('mD = ' + str(mD))
        print('mB = ' + str(mB))
        start_time = time.time()
        model = DecisionTree.trainClassifier(rddTrain, numClasses=2, categoricalFeaturesInfo={},
                                             impurity='gini', maxDepth=mD, maxBins=mB)
        # Evaluate model on test instances and compute test error

#         predictions = model.predict(rddTest.map(lambda x: x.features))
#         labelsAndPredictions = rddTest.map(lambda lp: lp.label).zip(predictions)
#         testErr = labelsAndPredictions.filter(
#             lambda lp: lp[0] != lp[1]).count() / float(testData.count())
#         print('Test Error = ' + str(testErr))

        predictions = model.predict(rddTest.map(lambda x: x.features))
        labelsAndPredictions = rddTest.map(lambda lp: lp.label).zip(predictions)
        metrics = MulticlassMetrics(labelsAndPredictions.rdd.map(tuple))
        print(metrics.confusionMatrix().toArray())
        #testErr = labelsAndPredictions.filter(
        #    lambda lp: lp[0] != lp[1]).count() / float(testData.count())
        #print('Test Error = ' + str(testErr))
        
        time_consumed = (time.time() - start_time)/ 60
        print('Time elapsed = ' + str(time_consumed) + ' min')
        if testErr < lowestError:
            bestMD = mD
            bestMB = mB
            lowestError = testErr



mD = 16
mB = 32


NameError: name 'MulticlassMetrics' is not defined

In [7]:
predictions = model.predict(rddTest.map(lambda x: x.features))
labelsAndPredictions = rddTest.map(lambda lp: lp.label).zip(predictions)
metrics = MulticlassMetrics(labelsAndPredictions.map(tuple))
print(metrics.confusionMatrix().toArray())

[[132548.  27095.]
 [  2493.   7468.]]


In [9]:
print(metrics.accuracy)
print(metrics.precision())
print(metrics.recall())
print(metrics.fMeasure())

0.8267493691186528
0.8267493691186528
0.8267493691186528
0.8267493691186528


In [11]:
print(metrics.weightedPrecision)
print(metrics.weightedRecall)
print(metrics.weightedFMeasure())

0.9376193107850334
0.8267493691186528
0.8674670931379368


In [12]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics


splits = scaled_df.randomSplit([0.7, 0.3], 1234)
trainData = splits[0]
testData = splits[1]


maxDepths = [16, 18, 20]
maxBins = [32]

start_time = time.time()

decision_tree = DecisionTreeClassifier(labelCol="label", featuresCol="features")

dt_param_grid = (ParamGridBuilder()
             #.addGrid(decision_tree.maxDepth, maxDepths)
             #.addGrid(decision_tree.maxBins, maxBins)
             .build())

dtevaluator = MulticlassClassificationEvaluator(metricName="accuracy")

dtcv = CrossValidator(estimator = decision_tree,
                      estimatorParamMaps = dt_param_grid,
                      evaluator = dtevaluator,
                      numFolds = 2)

decision_tree_model = dtcv.fit(trainData)


time_consumed = (time.time() - start_time)/ 60
print('Time elapsed = ' + str(time_consumed) + ' min')
print(decision_tree_model)

Time elapsed = 10.529191140333811 min
CrossValidatorModel_fdfbc5e5c644


In [13]:
predictions = decision_tree_model.transform(testData)
print('Accuracy:', dtevaluator.evaluate(predictions))

Accuracy: 0.8061130633711469


In [14]:
predictions_rdd = predictions.withColumn("label", predictions["label"].cast("double")).rdd.map(lambda r: (r.prediction, r.label))
metrics = MulticlassMetrics(predictions_rdd)

[(0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0), (0.0, 0.0)]


In [17]:
print(metrics.precision(1))
print(metrics.recall(1))

0.6777231777231777
0.09560947429231657


In [13]:
print("Summary Stats")
print("Precision = %s" % metrics.precision())
print("Recall = %s" % metrics.recall())
print("F1 Score = %s" % metrics.fMeasure())
print("Weighted recall = %s" % metrics.weightedRecall)
print("Weighted precision = %s" % metrics.weightedPrecision)
print("Weighted F(1) Score = %s" % metrics.weightedFMeasure())
print("Weighted F(0.5) Score = %s" % metrics.weightedFMeasure(beta=0.5))
print("Weighted false positive rate = %s" % metrics.weightedFalsePositiveRate)

Summary Stats
Precision = 0.8289486097026013
Recall = 0.8289486097026013
F1 Score = 0.8289486097026013
Weighted recall = 0.8289486097026014
Weighted precision = 0.8191250891203395
Weighted F(1) Score = 0.7905829682329455
Weighted F(0.5) Score = 0.7912704930613581
Weighted false positive rate = 0.6130257862500119


In [14]:
print(metrics.confusionMatrix().toArray())

[[132474.   2510.]
 [ 26501.   8119.]]


In [None]:
sc.stop()