In [1]:
from pyspark.sql import SparkSession, functions as F
import pandas as pd
import warnings
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline
from sklearn.model_selection import train_test_split
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import rand
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.classification import (LogisticRegression, DecisionTreeClassifier, 
                                        GBTClassifier, NaiveBayes)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import RandomForestClassifier

In [2]:
warnings.simplefilter(action='ignore')

# display
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

## Spark Session

In [3]:
spark = SparkSession.builder \
    .appName("sensors_realtime_prediction") \
    .master("local[2]") \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel('ERROR')

In [4]:
pandasDF = pd.read_csv("../test_df/sensor-data.csv")

In [5]:
pandasDF["label"].value_counts()

0    126289
1      9097
Name: label, dtype: int64

In [6]:
df_0 = pandasDF.loc[pandasDF["label"] == 0].sample(50000)
df_1 = pandasDF.loc[pandasDF["label"] == 1].sample(9097)
df_sampled = pd.concat([df_0, df_1], axis=0)

In [7]:
df_sampled = df_sampled.sort_values(by=['time'])

In [8]:
df_sampled["label"].value_counts()

0    50000
1     9097
Name: label, dtype: int64

In [9]:
df_sampled.head(3)

Unnamed: 0,co2_value,temp_value,light_value,humidity_value,time,room,label
2,465.0,22.8,165.0,52.4,2013-08-23 23:04:57,644,0
1,579.0,24.37,176.0,49.9,2013-08-23 23:04:57,656A,1
4,434.0,24.08,11.0,49.94,2013-08-23 23:05:01,564,1


In [10]:
# df_sampled.to_csv("test-data-sampled.csv",index=False)

In [11]:


train, test = train_test_split(df_sampled, test_size=0.2)

In [12]:
print(train["label"].value_counts())
print(test["label"].value_counts())

0    39988
1     7289
Name: label, dtype: int64
0    10012
1     1808
Name: label, dtype: int64


In [13]:
test_df = spark.createDataFrame(test)
train_df = spark.createDataFrame(train)

<h3>Data Preparing for ML Prediction</h3>

In [14]:
label_col = ["pir_value"]

# StringIndexer
string_indexer_objs = StringIndexer(inputCol="room",
                                    outputCol="roomIdx",
                                    handleInvalid='error')

In [15]:
# One Hot Encoder
encoder = OneHotEncoder(inputCols=["roomIdx"],
                        outputCols=["ohe_col"],
                        handleInvalid='error')

In [16]:
# Vector Assembler
# Vector assembler should not have a target.
assembler = VectorAssembler(inputCols=['co2_value', "temp_value", 
                                       "light_value", "humidity_value", 'ohe_col'],
                            outputCol='features',
                            handleInvalid='skip')

<h3>Create Model</h3>

In [17]:
# Define classification models
logistic_regression = LogisticRegression(labelCol="label", featuresCol="features")
decision_tree = DecisionTreeClassifier(labelCol="label", featuresCol="features")
naive_bayes = NaiveBayes(labelCol="label", featuresCol="features")
random_forest = RandomForestClassifier(labelCol="label", featuresCol="features")

In [18]:
# Pipelines
lr_pipeline = Pipeline().setStages([string_indexer_objs, encoder, assembler, logistic_regression])
dt_pipeline = Pipeline().setStages([string_indexer_objs, encoder, assembler, decision_tree])
nb_pipeline = Pipeline().setStages([string_indexer_objs, encoder, assembler, naive_bayes])
rf_pipeline = Pipeline().setStages([string_indexer_objs, encoder, assembler, random_forest])

In [19]:
lr_param_grid = ParamGridBuilder().addGrid(logistic_regression.regParam, [0.01, 0.1]).build()
dt_param_grid = ParamGridBuilder().addGrid(decision_tree.maxDepth, [5, 10]).build()
rf_param_grid = ParamGridBuilder().addGrid(random_forest.maxDepth, [5, 10]).build()
nb_param_grid = ParamGridBuilder().addGrid(naive_bayes.smoothing, [0.1, 1.0]).build()

<h3>Performance Evaluation</h3>

In [20]:
cv = CrossValidator(estimatorParamMaps=[lr_param_grid, dt_param_grid, rf_param_grid, nb_param_grid],
                          estimator=[lr_pipeline, dt_pipeline, rf_pipeline, nb_pipeline], 
                          evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"),
                          numFolds=2)

In [21]:
for i in cv.getEstimator():
    print(i.getStages()[-1])
    break

LogisticRegression_f5d7a6857e8f


In [22]:
accuracies_and_metrics={}
cv_models = []

In [23]:
evaluator = BinaryClassificationEvaluator(labelCol="label")

In [24]:
# cv = CrossValidator(
#     estimator=[lr_pipeline, dt_pipeline, pipeline_obj, nb_pipeline],
#     estimatorParamMaps=param_grid,
#     evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"),
#     numFolds=5
# )
for cv in cv.getEstimator():
    cv_model = cv.fit(train_df)
    cv_models.append(cv_model)
    predictions = cv_model.transform(test_df)
    accuracy = evaluator.evaluate(predictions)
    print(accuracy)
    accuracies_and_metrics[f"accuracy-{cv.getStages()[-1]}"] = accuracy
    accuracies_and_metrics[f"metric-{cv.getStages()[-1]}"] = evaluator.getMetricName()

0.9617059639052632
0.9011042666941264
0.9574387946853157
0.1670487118997026


In [25]:
accuracies_and_metrics

{'accuracy-LogisticRegression_f5d7a6857e8f': 0.9617059639052632,
 'metric-LogisticRegression_f5d7a6857e8f': 'areaUnderROC',
 'accuracy-DecisionTreeClassifier_63b8bdc6a349': 0.9011042666941264,
 'metric-DecisionTreeClassifier_63b8bdc6a349': 'areaUnderROC',
 'accuracy-RandomForestClassifier_99862de98d21': 0.9574387946853157,
 'metric-RandomForestClassifier_99862de98d21': 'areaUnderROC',
 'accuracy-NaiveBayes_a70f15c4d390': 0.1670487118997026,
 'metric-NaiveBayes_a70f15c4d390': 'areaUnderROC'}

In [26]:
for cv in cv_models:
    best_parameters = {param.name: value for param, value in zip(cv.stages[-1].extractParamMap().keys(), cv.stages[-1].extractParamMap().values())}
    print(best_parameters)

{'aggregationDepth': 2, 'elasticNetParam': 0.0, 'family': 'auto', 'featuresCol': 'features', 'fitIntercept': True, 'labelCol': 'label', 'maxBlockSizeInMB': 0.0, 'maxIter': 100, 'predictionCol': 'prediction', 'probabilityCol': 'probability', 'rawPredictionCol': 'rawPrediction', 'regParam': 0.0, 'standardization': True, 'threshold': 0.5, 'tol': 1e-06}
{'cacheNodeIds': False, 'checkpointInterval': 10, 'featuresCol': 'features', 'impurity': 'gini', 'labelCol': 'label', 'leafCol': '', 'maxBins': 32, 'maxDepth': 5, 'maxMemoryInMB': 256, 'minInfoGain': 0.0, 'minInstancesPerNode': 1, 'minWeightFractionPerNode': 0.0, 'predictionCol': 'prediction', 'probabilityCol': 'probability', 'rawPredictionCol': 'rawPrediction', 'seed': -5388977191008616012}
{'bootstrap': True, 'cacheNodeIds': False, 'checkpointInterval': 10, 'featureSubsetStrategy': 'auto', 'featuresCol': 'features', 'impurity': 'gini', 'labelCol': 'label', 'leafCol': '', 'maxBins': 32, 'maxDepth': 5, 'maxMemoryInMB': 256, 'minInfoGain': 0

In [27]:
logistic_regression_final = LogisticRegression()
lr_final_pipeline = Pipeline().setStages([string_indexer_objs, encoder, assembler, logistic_regression_final])
lr_final_param_grid = ParamGridBuilder().addGrid(logistic_regression_final.regParam, [0.01, 0.1, 1]).build()
cv_final = CrossValidator(estimatorParamMaps=lr_final_param_grid, 
                    estimator=lr_final_pipeline, 
                    evaluator=BinaryClassificationEvaluator(metricName="areaUnderROC"),
                    numFolds=5)

In [28]:
cv_final_model = cv_final.fit(train_df)
final_predictions = cv_final_model.transform(test_df)

In [29]:
evaluator = BinaryClassificationEvaluator(labelCol="label")

accuracy = evaluator.evaluate(final_predictions)
print(accuracy)
evaluator.getMetricName()

0.9572868752187648


'areaUnderROC'

<h3>Saving The Model to Disk</h3>

In [31]:
cv_model.write().overwrite().save('/home/selcuk/bitirme/cv_model/pipeline_model')

In [32]:
pipeline_model_loaded = PipelineModel.load(
    "/home/selcuk/bitirme/cv_model/pipeline_model")

In [33]:
best_parameters = {param.name: value for param, value in zip(pipeline_model_loaded.stages[-1].extractParamMap().keys(), pipeline_model_loaded.stages[-1].extractParamMap().values())}
best_parameters

{'featuresCol': 'features',
 'labelCol': 'label',
 'modelType': 'multinomial',
 'predictionCol': 'prediction',
 'probabilityCol': 'probability',
 'rawPredictionCol': 'rawPrediction',
 'smoothing': 1.0}

In [None]:
# # ##### 
# pipeline_model.write().overwrite().save(
#     "/home/selcuk/bitirme/saved_model2/pipeline_model")


# pipeline_model_loaded = PipelineModel.load(
#     "/home/selcuk/bitirme/saved_model2/pipeline_model")



In [None]:
# test_df.coalesce(1).write \
#     .format("csv") \
#     .mode("overwrite") \
#     .option("header", "true") \
#     .save("..../test_df")