In [1]:
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.ml.feature import StopWordsRemover, Word2Vec, RegexTokenizer, Tokenizer
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
from pyspark.streaming.kafka import KafkaUtils
import pyspark.sql.functions as f
import json
import re
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import *
import sys
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import from_json
from pyspark.sql.functions import lit
import pickle
import keras
from numpy import zeros

Using TensorFlow backend.


In [87]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier, DecisionTreeClassifier
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.ml.regression import IsotonicRegression
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [3]:
sc

In [4]:
spark

In [5]:
sql_context = SQLContext(sc)
sql_context

<pyspark.sql.context.SQLContext at 0x7fa868d70588>

In [29]:
loaded_data = spark.read.csv("hdfs:///user/spark/datafile/testdf.csv", header=True, inferSchema=True)

In [30]:
loaded_data.dtypes

[('TIMESTAMP', 'timestamp'),
 ('누적전력량', 'double'),
 ('무효전력평균', 'double'),
 ('상전압평균', 'double'),
 ('선간전압평균', 'double'),
 ('역률평균', 'double'),
 ('온도', 'double'),
 ('유효전력평균', 'double'),
 ('전류고조파평균', 'double'),
 ('전류평균', 'double'),
 ('전압고조파평균', 'double'),
 ('주파수', 'double'),
 ('DEVICE_ID', 'int'),
 ('LABEL_역률평균', 'int'),
 ('LABEL_전압고조파평균', 'int'),
 ('LABEL_전류고조파평균', 'int'),
 ('LABEL_NAME', 'int')]

In [15]:
import pickle
with open("/root/rounded_df", "rb") as file:
    loaded_data = pickle.load(file)

In [16]:
df = spark.createDataFrame(loaded_data)

In [22]:
df.show(1)

+----------+-----+---------+--------------+----------+------------+----+------------+--------------+--------+------------+--------+------+
|누적전력량|label|DEVICE_ID|전압고조파평균|상전압평균|선간전압평균|온도|무효전력평균|전류고조파평균|전류평균|유효전력평균|역률평균|주파수|
+----------+-----+---------+--------------+----------+------------+----+------------+--------------+--------+------------+--------+------+
|       0.0|    0|     5252|          6.45|    226.25|      391.75|35.0|     10440.5|         10.19|   19.38|      8100.5|    0.61| 59.92|
+----------+-----+---------+--------------+----------+------------+----+------------+--------------+--------+------------+--------+------+
only showing top 1 row



In [31]:
loaded_data.show(1)

+-------------------+----------+------------+----------+------------+--------+-----+------------+--------------+--------+--------------+---------+---------+--------------+--------------------+--------------------+----------+
|          TIMESTAMP|누적전력량|무효전력평균|상전압평균|선간전압평균|역률평균| 온도|유효전력평균|전류고조파평균|전류평균|전압고조파평균|   주파수|DEVICE_ID|LABEL_역률평균|LABEL_전압고조파평균|LABEL_전류고조파평균|LABEL_NAME|
+-------------------+----------+------------+----------+------------+--------+-----+------------+--------------+--------+--------------+---------+---------+--------------+--------------------+--------------------+----------+
|2021-01-19 00:00:48| 1523.2915|         0.0|132.916672|  230.083328|     0.0|38.75|         0.0|           0.0|     0.0|      3.287761|59.835415|     7303|             2|                   0|                   0|         1|
+-------------------+----------+------------+----------+------------+--------+-----+------------+--------------+--------+--------------+---------+---------+--------------+--

In [24]:
df.columns

['누적전력량',
 'label',
 'DEVICE_ID',
 '전압고조파평균',
 '상전압평균',
 '선간전압평균',
 '온도',
 '무효전력평균',
 '전류고조파평균',
 '전류평균',
 '유효전력평균',
 '역률평균',
 '주파수']

In [23]:
loaded_data.columns

['TIMESTAMP',
 '누적전력량',
 '무효전력평균',
 '상전압평균',
 '선간전압평균',
 '역률평균',
 '온도',
 '유효전력평균',
 '전류고조파평균',
 '전류평균',
 '전압고조파평균',
 '주파수',
 'DEVICE_ID',
 'LABEL_역률평균',
 'LABEL_전압고조파평균',
 'LABEL_전류고조파평균',
 'LABEL_NAME']

In [88]:
features = ['누적전력량', 'DEVICE_ID', '전압고조파평균', '상전압평균', '선간전압평균', '온도', '무효전력평균', '전류고조파평균', '전류평균', '유효전력평균', '역률평균', '주파수']

In [89]:
va = VectorAssembler(inputCols=features, outputCol='features')

In [90]:
va_df = va.transform(loaded_data)

In [91]:
va_df

DataFrame[TIMESTAMP: timestamp, 누적전력량: double, 무효전력평균: double, 상전압평균: double, 선간전압평균: double, 역률평균: double, 온도: double, 유효전력평균: double, 전류고조파평균: double, 전류평균: double, 전압고조파평균: double, 주파수: double, DEVICE_ID: int, LABEL_역률평균: int, LABEL_전압고조파평균: int, LABEL_전류고조파평균: int, LABEL_NAME: int, features: vector]

In [92]:
labelIndexer = StringIndexer(inputCol='LABEL_NAME', outputCol="indexedLabel").fit(va_df)

In [98]:
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=23).fit(va_df)

In [99]:
(train, test) = va_df.randomSplit([0.8, 0.2])

In [100]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [101]:
gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10, maxBins=77)
# paramGrid = ParamGridBuilder().addGrid(gbt.maxBins,77).addGrid(gbt.maxIter, [4, 6, 8]).addGrid(gbt.lossType, ["entropy", "gini"]).build()

In [102]:
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])

In [103]:
model = pipeline.fit(train)

In [104]:
predictions = model.transform(test)

In [105]:
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(100)

+-------------------+------------+--------------------+
|         prediction|indexedLabel|            features|
+-------------------+------------+--------------------+
| 0.4270438064724936|         0.0|[19235.63,1385.0,...|
|0.39699430823660115|         0.0|[19235.63,1385.0,...|
| 0.4026359744770577|         0.0|[19235.63,1385.0,...|
|0.41851471035913956|         0.0|[19235.63,1385.0,...|
| 0.4270438064724936|         0.0|[19235.63,1385.0,...|
|0.39699430823660115|         0.0|[19235.63,1385.0,...|
|0.39699430823660115|         0.0|[19235.63,1385.0,...|
|0.39699430823660115|         0.0|[19235.63,1385.0,...|
|0.39699430823660115|         0.0|[19235.63,1385.0,...|
| 0.4026359744770577|         0.0|[19235.63,1385.0,...|
| 0.3912753872513461|         0.0|[19235.63,1385.0,...|
| 0.3912753872513461|         0.0|[19235.63,1385.0,...|
|0.39699430823660115|         0.0|[19235.63,1385.0,...|
|0.39699430823660115|         0.0|[19235.63,1385.0,...|
|0.39699430823660115|         0.0|[19235.63,1385

In [106]:
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")

In [107]:
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

Test Error = 1


In [108]:
print(accuracy)

0.0


In [85]:
gbtModel = model.stages[2]
print(gbtModel)  # summary only

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_26344de776ba) of depth 5 with 41 nodes
