In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import DateType, IntegerType, StringType, FloatType
from datetime import datetime
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import sys
                      
spark = SparkSession.builder.appName("SupervisedLearning").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


You are working with 1 core(s)


In [2]:
sensor = spark.read.parquet('gs://smarthome-326501/sensor_full.parquet')
sensor.printSchema()

                                                                                

root
 |-- sensor_id: integer (nullable = true)
 |-- value_id: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- value: double (nullable = true)
 |-- node_id: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- name: string (nullable = true)



In [3]:
partition_counts = sensor.rdd.getNumPartitions()
partition_counts

34

In [4]:
# spark.sql("set spark.sql.files.maxPartitionBytes=1000000").show()
spark.sql("set spark.sql.files.maxPartitionBytes=6000000").show()

+--------------------+-------+
|                 key|  value|
+--------------------+-------+
|spark.sql.files.m...|6000000|
+--------------------+-------+



In [5]:
sensor = spark.read.parquet('gs://smarthome-326501/sensor_full.parquet')

                                                                                

In [6]:
# sensor = sensor.repartition(1000)

In [7]:
spark.sql("set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000").show()
spark.sql("set spark.sql.adaptive.advisoryPartitionSizeInBytes=209715200").show()
spark.sql("set spark.default.parallelism=1000").show()
# spark.sql("set spark.sql.shuffle.partitions=1000").show()

+--------------------+-----+
|                 key|value|
+--------------------+-----+
|spark.sql.adaptiv...| 1000|
+--------------------+-----+

+--------------------+---------+
|                 key|    value|
+--------------------+---------+
|spark.sql.adaptiv...|209715200|
+--------------------+---------+

+--------------------+-----+
|                 key|value|
+--------------------+-----+
|spark.default.par...| 1000|
+--------------------+-----+



In [8]:
partition_counts = sensor.rdd.getNumPartitions()
partition_counts

686

In [9]:
smarthome_df = sensor.withColumn("timestamp", date_format(sensor.timestamp,"yyyy-MM-dd HH:mm:ss"))

In [10]:
pivotDF = smarthome_df.groupBy("timestamp").pivot("name").sum("value")

                                                                                

In [11]:
pivotDF = pivotDF.withColumn("timestamp", to_timestamp(pivotDF.timestamp)).sort("timestamp")

In [12]:
pivotDF = pivotDF.withColumn('ts_day', date_format('timestamp', 'yyyy-MM-dd'))

In [13]:
from pyspark.sql import Window
from pyspark.sql.functions import last

# define the window
window = Window.partitionBy('ts_day')\
               .orderBy('timestamp')\
               .rowsBetween(-sys.maxsize, 0)


# do the fill
pivotDF = pivotDF.withColumn('entrance/door/contact', last(pivotDF['entrance/door/contact'], ignorenulls=True).over(window))


In [14]:
from pyspark.sql import Window
from pyspark.sql.functions import first

# define the window
window = Window.partitionBy('ts_day')\
               .orderBy('timestamp')\
               .rowsBetween(0,sys.maxsize)

# do the fill
pivotDF = pivotDF.withColumn('entrance/door/contact', first(pivotDF['entrance/door/contact'], ignorenulls=True).over(window))


In [15]:
pivotDF = pivotDF.fillna({'balcon/door/contact':0,
                          'bathroom/ambience/motion':0,
                          'bedroom/ambience/motion':0,
                          'bedroom/ambience_under_the_bed/motion':0,
                          'corridor/ambience/motion':0,
#                           'entrance/door/contact':0,
                          'kitchen/ambience/motion':0,
                          'kitchen/fridge/contact':0,
                          'livingroom/ambience/motion':0,
                          "kitchen/coffeemaker/current":0,
                          "kitchen/sandwichmaker/current":0, 
                          "kitchen/dishwasher/current":0,
                          "kitchen/kettle/current":0, 
                          "bathroom/washingmachine/current":0, 
                          "kitchen/microwave/current":0,
                          "corridor/ilifeRobot/current":0
                         })

In [16]:
pivotDF = pivotDF.fillna({"bathroom/ambience/light": 0,
           "bedroom/bed/pressure" : 480,
           "livingroom/tv/light": 3,
           "bedroom/weightscale/pressure" : 0,
           "bathroom/ambience/humidity" : 15.2,
           "bathroom/ambience/temperature": 21.13,
           "kitchen/stove/light": 0,
           "livingroom/couch/pressure": 0
            })

In [17]:
pivotDF = pivotDF.withColumn('kitchen/coffeemaker/current',(when(pivotDF["kitchen/coffeemaker/current"] > 1, 1).otherwise(0)))
pivotDF = pivotDF.withColumn('kitchen/sandwichmaker/current',(when(pivotDF["kitchen/sandwichmaker/current"] > 1, 1).otherwise(0)))
pivotDF = pivotDF.withColumn('kitchen/dishwasher/current',(when(pivotDF["kitchen/dishwasher/current"] > 1, 1).otherwise(0)))
pivotDF = pivotDF.withColumn('kitchen/kettle/current',(when(pivotDF["kitchen/kettle/current"] > 1, 1).otherwise(0)))
pivotDF = pivotDF.withColumn('bathroom/washingmachine/current',(when(pivotDF["bathroom/washingmachine/current"] > 1, 1).otherwise(0)))
pivotDF = pivotDF.withColumn('kitchen/microwave/current',(when(pivotDF["kitchen/microwave/current"] > 1, 1).otherwise(0)))
pivotDF = pivotDF.withColumn('corridor/ilifeRobot/current',(when(pivotDF["corridor/ilifeRobot/current"] > 1, 1).otherwise(0)))

In [18]:
# pivotDF.limit(5).toPandas()

In [19]:
old_value = 0
counter = 0
status = 1

def getStatus(new_value):
    global old_value
    global counter
    global status

    if counter != 0:
        counter = counter - 1
        old_value = new_value
        return status

    if old_value != new_value:
        status = 1 - status
        counter = 30

    old_value = new_value

    return status




# Define the method as a UDF
udfOccupancy = udf(getStatus)

# Create a new column using your UDF
pivotDF = pivotDF.withColumn('occupied', udfOccupancy(pivotDF["entrance/door/contact"]))

In [20]:
pivotDF.limit(5).toPandas()

21/10/19 09:24:27 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Unnamed: 0,timestamp,balcon/door/contact,bathroom/ambience/humidity,bathroom/ambience/light,bathroom/ambience/motion,bathroom/ambience/temperature,bathroom/washingmachine/current,bedroom/ambience/motion,bedroom/ambience_under_the_bed/motion,bedroom/bed/pressure,...,kitchen/fridge/contact,kitchen/kettle/current,kitchen/microwave/current,kitchen/sandwichmaker/current,kitchen/stove/light,livingroom/ambience/motion,livingroom/couch/pressure,livingroom/tv/light,ts_day,occupied
0,2020-03-01 00:00:00,0.0,15.2,0.0,0.0,21.13,0,0.0,0.0,619.0,...,0.0,0,0,0,1024.0,0.0,268.0,1024.0,2020-03-01,1
1,2020-03-01 00:00:01,0.0,15.2,0.0,0.0,21.13,0,0.0,0.0,480.0,...,0.0,0,0,0,1024.0,0.0,268.0,1024.0,2020-03-01,1
2,2020-03-01 00:00:02,0.0,15.2,0.0,0.0,21.13,0,0.0,0.0,619.0,...,0.0,0,0,0,1024.0,0.0,0.0,1024.0,2020-03-01,1
3,2020-03-01 00:00:03,0.0,15.2,0.0,0.0,21.13,0,0.0,0.0,619.0,...,0.0,0,0,0,1024.0,0.0,268.0,1024.0,2020-03-01,1
4,2020-03-01 00:00:04,0.0,15.2,0.0,0.0,21.13,0,0.0,0.0,619.0,...,0.0,0,0,0,1024.0,0.0,268.0,1024.0,2020-03-01,1


In [21]:
pivotDF = pivotDF.withColumn('home_or_away',(when(pivotDF["occupied"] == 1, "home").otherwise("away")))

In [22]:
# pivotDF.repartition(1).write.format("csv").option("header", "true").save("gs://smarthome-326501/occupied")

In [23]:
cols_to_drop = ['ts_day', 'occupied']
df = pivotDF.drop(*cols_to_drop)

In [24]:
df.printSchema()

root
 |-- timestamp: timestamp (nullable = true)
 |-- balcon/door/contact: double (nullable = false)
 |-- bathroom/ambience/humidity: double (nullable = false)
 |-- bathroom/ambience/light: double (nullable = false)
 |-- bathroom/ambience/motion: double (nullable = false)
 |-- bathroom/ambience/temperature: double (nullable = false)
 |-- bathroom/washingmachine/current: integer (nullable = false)
 |-- bedroom/ambience/motion: double (nullable = false)
 |-- bedroom/ambience_under_the_bed/motion: double (nullable = false)
 |-- bedroom/bed/pressure: double (nullable = false)
 |-- bedroom/weightscale/pressure: double (nullable = false)
 |-- corridor/ambience/motion: double (nullable = false)
 |-- corridor/ilifeRobot/current: integer (nullable = false)
 |-- entrance/door/contact: double (nullable = true)
 |-- kitchen/ambience/motion: double (nullable = false)
 |-- kitchen/coffeemaker/current: integer (nullable = false)
 |-- kitchen/dishwasher/current: integer (nullable = false)
 |-- kitchen

In [25]:
# Read in functions we will need
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.types import * 
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import MinMaxScaler

In [26]:
input_columns = df.columns # Collect the column names as a list
input_columns = input_columns[1:-1] # keep only relevant columns: from column 1 to 

dependent_var = 'home_or_away'

In [None]:
# change label (class variable) to string type to prep for reindexing
# Pyspark is expecting a zero indexed integer for the label column. 
# Just in case our data is not in that format... we will treat it by using the StringIndexer built in method
renamed = df.withColumn("label_str", df[dependent_var].cast(StringType())) #Rename and change to string type
indexer = StringIndexer(inputCol="label_str", outputCol="label") #Pyspark is expecting the this naming convention 
indexed = indexer.fit(renamed).transform(renamed)

[Stage 43:>                                                         (0 + 8) / 9]

In [None]:
# Convert all string type data in the input column list to numeric
# Otherwise the Algorithm will not be able to process it

# Also we will use these lists later on
numeric_inputs = []
string_inputs = []
for column in input_columns:
    # First identify the string vars in your input column list
    if str(indexed.schema[column].dataType) == 'StringType':
        # Set up your String Indexer function
        indexer = StringIndexer(inputCol=column, outputCol=column+"_num") 
        # Then call on the indexer you created here
        indexed = indexer.fit(indexed).transform(indexed)
        # Rename the column to a new name so you can disinguish it from the original
        new_col_name = column+"_num"
        # Add the new column name to the string inputs list
        string_inputs.append(new_col_name)
    else:
        # If no change was needed, take no action 
        # And add the numeric var to the num list
        numeric_inputs.append(column)

In [None]:
# Treat for skewness
# Flooring and capping
# Plus if right skew take the log +1
# if left skew do exp transformation
# This is best practice

# create empty dictionary d
d = {}
# Create a dictionary of quantiles from your numeric cols
# I'm doing the top and bottom 1% but you can adjust if needed
for col in numeric_inputs: 
    d[col] = indexed.approxQuantile(col,[0.01,0.99],0.25) #if you want to make it go faster increase the last number

#Now check for skewness for all numeric cols
for col in numeric_inputs:
    skew = indexed.agg(skewness(indexed[col])).collect() #check for skewness
    skew = skew[0][0]
    # If skewness is found,
    # This function will make the appropriate corrections
    if skew > 1: # If right skew, floor, cap and log(x+1)
        indexed = indexed.withColumn(col, \
        log(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] ) +1).alias(col))
        print(col+" has been treated for positive (right) skewness. (skew =)",skew,")")
    elif skew < -1: # If left skew floor, cap and exp(x)
        indexed = indexed.withColumn(col, \
        exp(when(df[col] < d[col][0],d[col][0])\
        .when(indexed[col] > d[col][1], d[col][1])\
        .otherwise(indexed[col] )).alias(col))
        print(col+" has been treated for negative (left) skewness. (skew =",skew,")")

In [None]:
# Now check for negative values in the dataframe. 
# Produce a warning if there are negative values in the dataframe that Naive Bayes cannot be used. 
# Note: we only need to check the numeric input values since anything that is indexed won't have negative values

# Calculate the mins for all columns in the df
minimums = df.select([min(c).alias(c) for c in df.columns if c in numeric_inputs]) 
# Create an array for all mins and select only the input cols
min_array = minimums.select(array(numeric_inputs).alias("mins")) 
# Collect golobal min as Python object
df_minimum = min_array.select(array_min(min_array.mins)).collect() 
# Slice to get the number itself
df_minimum = df_minimum[0][0] 

# If there are ANY negative vals found in the df, print a warning message
if df_minimum < 0:
    print("WARNING: The Naive Bayes Classifier will not be able to process your dataframe as it contains negative values")
else:
    print("No negative values were found in your dataframe.")

In [None]:
# Before we correct for negative values that may have been found above, 
# We need to vectorize our df
# becauase the function that we use to make that correction requires a vector. 
# Now create your final features list
features_list = numeric_inputs + string_inputs
# Create your vector assembler object
assembler = VectorAssembler(inputCols=features_list,outputCol='features')
# And call on the vector assembler to transform your dataframe
output = assembler.transform(indexed).select('features','label')

In [None]:
# Create the mix max scaler object 
# This is what will correct for negative values
# I like to use a high range like 1,000 
#     because I only see one decimal place in the final_data.show() call
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures",min=0,max=1000)
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(output)

# rescale each feature to range [min, max].
scaled_data = scalerModel.transform(output)
final_data = scaled_data.select('label','scaledFeatures')
# Rename to default value
final_data = final_data.withColumnRenamed("scaledFeatures","features")
final_data.show()

In [None]:
train,test = final_data.randomSplit([0.7,0.3])

In [None]:
# Set up our evaluation objects
Bin_evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction') #labelCol='label'
# Bin_evaluator = BinaryClassificationEvaluator() #labelCol='label'
MC_evaluator = MulticlassClassificationEvaluator(metricName="accuracy") # redictionCol="prediction"

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
predictions = fitModel.transform(test)
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

In [None]:
# This is the most simplistic approach which does not use cross validation
# Let's go ahead and train a Logistic Regression Algorithm
classifier = LogisticRegression()
fitModel = classifier.fit(train)

# Evaluation method for binary classification problem
predictionAndLabels = fitModel.transform(test)
auc = Bin_evaluator.evaluate(predictionAndLabels)
print("AUC:",auc)

# Evaluation for a multiclass classification problem
predictions = fitModel.transform(test)
accuracy = (MC_evaluator.evaluate(predictions))*100
print("Accuracy: {0:.2f}".format(accuracy),"%") #     print("Test Error = %g " % (1.0 - accuracy))
print(" ")

In [None]:
# First tell Spark which classifier you want to use
classifier = LogisticRegression()

# Then Set up your parameter grid for the cross validator to conduct hyperparameter tuning
paramGrid = (ParamGridBuilder().addGrid(classifier.maxIter, [10, 15,20]).build())

# Then set up the Cross Validator which requires all of the following parameters:
crossval = CrossValidator(estimator=classifier,
                          estimatorParamMaps=paramGrid,
                          evaluator=MC_evaluator,
                          numFolds=2) # 3 + is best practice

# Then fit your model
fitModel = crossval.fit(train)

# Collect the best model and
# print the coefficient matrix
# These values should be compared relative to eachother
# And intercepts can be prepared to other models
BestModel = fitModel.bestModel
print("Intercept: " + str(BestModel.interceptVector))
print("Coefficients: \n" + str(BestModel.coefficientMatrix))

# You can extract the best model from this run like this if you want
LR_BestModel = BestModel

# Next you need to generate predictions on the test dataset
# fitModel automatically uses the best model 
# so we don't need to use BestModel here
predictions = fitModel.transform(test)

# Now print the accuracy rate of the model or AUC for a binary classifier
accuracy = (MC_evaluator.evaluate(predictions))*100
print(accuracy)

In [None]:
# zip input_columns qith feature importance scores and create df

# First convert featureimportance scores from numpy array to list
coeff_array = BestModel.coefficientMatrix.toArray()
coeff_scores = []
for x in coeff_array[0]:
    coeff_scores.append(float(x))
# Then zip with input_columns list and create a df

# data_schema = [StructField("feature", StringType(), True),StructField("coeff", DecimalType(), True)]
# final_struc = StructType(fields=data_schema)
# result = spark.createDataFrame(zip(input_columns,coeff_scores), schema=final_struc)

result = spark.createDataFrame(zip(input_columns,coeff_scores), schema=['feature','coeff'])
result.show(100)

In [None]:
# Load the Summary
trainingSummary = LR_BestModel.summary

# General Describe
trainingSummary.predictions.describe().show()

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print(" ")
print("objectiveHistory: (scaled loss + regularization) at each iteration")
for objective in objectiveHistory:
    print(objective)

# for multiclass, we can inspect metrics on a per-label basis
print(" ")
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print(" ")
print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print(" ")
print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print(" ")
print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print(" ")
print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

# Generate confusion matrix and print (includes accuracy)
accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print(" ")
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))