In [1]:
!pip install pyspark 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [3]:
#creating a spark session 
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [4]:
import pyspark.sql.functions as pyf
import pyspark.sql.types as pyt

Imporing the data

In [5]:
path = 'gdrive/My Drive/MMD/DATA/room_occupancy.csv'
df = spark.read.csv(path, inferSchema=True, header=True)

Data processing and analysis 

In [6]:
df.printSchema()

root
 |-- Temperature: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Light: double (nullable = true)
 |-- CO2: double (nullable = true)
 |-- HumidityRatio: double (nullable = true)
 |-- Occupancy: integer (nullable = true)



In [7]:
df.show()

+------------------+--------+----------------+----------------+--------------------+---------+
|       Temperature|Humidity|           Light|             CO2|       HumidityRatio|Occupancy|
+------------------+--------+----------------+----------------+--------------------+---------+
|              23.7|  26.272|           585.2|           749.2|0.004764163024164...|        1|
|23.718000000000004|   26.29|           578.4|           760.4| 0.00477266099212519|        1|
|             23.73|   26.23|572.666666666667|769.666666666667| 0.00476515255246541|        1|
|           23.7225|  26.125|          493.75|          774.75| 0.00474377335599685|        1|
|            23.754|    26.2|           488.6|           779.0| 0.00476659399998615|        1|
|             23.76|   26.26|568.666666666667|           790.0| 0.00477933243163454|        1|
|             23.73|   26.29|536.333333333333|           798.0|0.004776136332748921|        1|
|            23.754|   26.29|           509.0|    

In [8]:
df.columns #features

['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio', 'Occupancy']

In [9]:
len(df.columns) #number of features

6

In [10]:
df.count() #number of instances

2665

In [11]:
df.summary().show()

+-------+------------------+------------------+------------------+-----------------+--------------------+-------------------+
|summary|       Temperature|          Humidity|             Light|              CO2|       HumidityRatio|          Occupancy|
+-------+------------------+------------------+------------------+-----------------+--------------------+-------------------+
|  count|              2665|              2665|              2665|             2665|                2665|               2665|
|   mean| 21.43387628875156|25.353936799785547|193.22755561511644|717.9064701152506|0.004027010287333552| 0.3647279549718574|
| stddev|1.0280241758910023| 2.436842325144458|250.21090577180237|292.6817183937758|6.105726883437671E-4|0.48144412849428336|
|    min|              20.2|              22.1|               0.0|            427.5| 0.00330331447223472|                  0|
|    25%|             20.65|             23.26|               0.0|            466.0|0.003529481592297...|             

In [12]:
df1 = df.drop('HumidityRatio')

In [13]:
df1.printSchema()

root
 |-- Temperature: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Light: double (nullable = true)
 |-- CO2: double (nullable = true)
 |-- Occupancy: integer (nullable = true)



In [14]:
df1.select("CO2", "Occupancy").show()

+----------------+---------+
|             CO2|Occupancy|
+----------------+---------+
|           749.2|        1|
|           760.4|        1|
|769.666666666667|        1|
|          774.75|        1|
|           779.0|        1|
|           790.0|        1|
|           798.0|        1|
|           797.0|        1|
|           803.2|        1|
|           809.0|        1|
|          815.25|        1|
|           824.0|        1|
|           832.0|        1|
|845.333333333333|        1|
|           852.4|        1|
|           861.0|        1|
|           880.0|        1|
|           891.0|        1|
|           897.6|        1|
|           900.5|        1|
+----------------+---------+
only showing top 20 rows



In [15]:
df1.filter(df['Occupancy'].isNull()).show()

+-----------+--------+-----+---+---------+
|Temperature|Humidity|Light|CO2|Occupancy|
+-----------+--------+-----+---+---------+
+-----------+--------+-----+---+---------+



In [16]:
df1.filter(df['CO2'].isNull()).show()

+-----------+--------+-----+---+---------+
|Temperature|Humidity|Light|CO2|Occupancy|
+-----------+--------+-----+---+---------+
+-----------+--------+-----+---+---------+



In [17]:
df1.filter(df['Occupancy'] == 0 ).count()

1693

In [18]:
df1.filter(df['Occupancy'] == 0 ).show(10)

+-----------+----------------+----------------+----------------+---------+
|Temperature|        Humidity|           Light|             CO2|Occupancy|
+-----------+----------------+----------------+----------------+---------+
|       22.6|25.0666666666667|428.333333333333|849.333333333333|        0|
|       22.6|            25.2|           422.5|           853.0|        0|
|       22.6|            25.2|423.666666666667|           853.0|        0|
|      22.54|           25.16|           424.6|           852.0|        0|
|      22.54|           25.16|           427.4|           853.6|        0|
|      22.54|           25.14|           421.8|           845.6|        0|
|     22.525|            25.1|           426.0|          834.25|        0|
|       22.5|           25.04|           421.8|           826.4|        0|
|       22.5|            25.0|           419.0|           819.6|        0|
|       22.5|            25.0|           419.0|           829.6|        0|
+-----------+------------

In [19]:
df_new = df1.select('CO2','Occupancy').\
                  withColumn('CO2_Levels', \
                             pyf.when(df['CO2']<450,'Low_CO2')\
                             .when(df['CO2']<1000,'Average_CO2')\
                             .otherwise('High_CO2'))

In [20]:
df_new = df_new.withColumn('CO2',df_new['CO2'].cast("float"))

In [21]:
df_new.sort(df_new.CO2.asc()).show()

+---------+---------+----------+
|      CO2|Occupancy|CO2_Levels|
+---------+---------+----------+
|    427.5|        0|   Low_CO2|
|    427.6|        0|   Low_CO2|
|   428.25|        0|   Low_CO2|
|    429.2|        0|   Low_CO2|
|429.33334|        0|   Low_CO2|
|    429.5|        0|   Low_CO2|
|    429.6|        0|   Low_CO2|
|   429.75|        0|   Low_CO2|
|    429.8|        0|   Low_CO2|
|    430.0|        0|   Low_CO2|
|    430.0|        0|   Low_CO2|
|    430.0|        0|   Low_CO2|
|430.33334|        0|   Low_CO2|
|   430.75|        0|   Low_CO2|
|    430.8|        0|   Low_CO2|
|    431.0|        0|   Low_CO2|
|    431.0|        0|   Low_CO2|
|    431.0|        0|   Low_CO2|
|431.16666|        0|   Low_CO2|
|    431.2|        0|   Low_CO2|
+---------+---------+----------+
only showing top 20 rows



In [22]:
df_new.sort(df_new.CO2.desc()).show()

+---------+---------+----------+
|      CO2|Occupancy|CO2_Levels|
+---------+---------+----------+
|  1402.25|        1|  High_CO2|
|1401.1666|        1|  High_CO2|
|   1401.0|        1|  High_CO2|
|   1400.5|        1|  High_CO2|
|  1398.25|        1|  High_CO2|
|   1398.0|        1|  High_CO2|
|   1397.5|        1|  High_CO2|
|  1396.25|        1|  High_CO2|
|1395.3334|        1|  High_CO2|
|   1394.6|        1|  High_CO2|
|   1394.2|        1|  High_CO2|
|   1394.0|        1|  High_CO2|
|  1393.75|        1|  High_CO2|
|1393.4286|        1|  High_CO2|
|1393.1666|        1|  High_CO2|
|   1393.0|        1|  High_CO2|
|   1392.6|        1|  High_CO2|
|   1392.5|        1|  High_CO2|
|   1390.5|        1|  High_CO2|
|1389.6666|        1|  High_CO2|
+---------+---------+----------+
only showing top 20 rows



In [23]:
df_new.show()

+--------+---------+-----------+
|     CO2|Occupancy| CO2_Levels|
+--------+---------+-----------+
|   749.2|        1|Average_CO2|
|   760.4|        1|Average_CO2|
|769.6667|        1|Average_CO2|
|  774.75|        1|Average_CO2|
|   779.0|        1|Average_CO2|
|   790.0|        1|Average_CO2|
|   798.0|        1|Average_CO2|
|   797.0|        1|Average_CO2|
|   803.2|        1|Average_CO2|
|   809.0|        1|Average_CO2|
|  815.25|        1|Average_CO2|
|   824.0|        1|Average_CO2|
|   832.0|        1|Average_CO2|
|845.3333|        1|Average_CO2|
|   852.4|        1|Average_CO2|
|   861.0|        1|Average_CO2|
|   880.0|        1|Average_CO2|
|   891.0|        1|Average_CO2|
|   897.6|        1|Average_CO2|
|   900.5|        1|Average_CO2|
+--------+---------+-----------+
only showing top 20 rows



In [24]:
import pyspark.pandas as ps
pdf = ps.DataFrame(df_new)



In [25]:
plot = pdf.plot.scatter(x='CO2', y='Occupancy', c='DarkBlue')
plot

Linear Regression

In [26]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [27]:
df

DataFrame[Temperature: double, Humidity: double, Light: double, CO2: double, HumidityRatio: double, Occupancy: int]

In [28]:
assembler = VectorAssembler(inputCols=['Temperature', 'Humidity','Light', 'CO2','HumidityRatio'],\
                                   outputCol='features')
df = assembler.transform(df)

In [29]:
df.printSchema()

root
 |-- Temperature: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Light: double (nullable = true)
 |-- CO2: double (nullable = true)
 |-- HumidityRatio: double (nullable = true)
 |-- Occupancy: integer (nullable = true)
 |-- features: vector (nullable = true)



In [30]:
df.select(['features','Occupancy']).show(10,False)

+-------------------------------------------------------------------+---------+
|features                                                           |Occupancy|
+-------------------------------------------------------------------+---------+
|[23.7,26.272,585.2,749.2,0.0047641630241641395]                    |1        |
|[23.718000000000004,26.29,578.4,760.4,0.00477266099212519]         |1        |
|[23.73,26.23,572.666666666667,769.666666666667,0.00476515255246541]|1        |
|[23.7225,26.125,493.75,774.75,0.00474377335599685]                 |1        |
|[23.754,26.2,488.6,779.0,0.00476659399998615]                      |1        |
|[23.76,26.26,568.666666666667,790.0,0.00477933243163454]           |1        |
|[23.73,26.29,536.333333333333,798.0,0.004776136332748921]          |1        |
|[23.754,26.29,509.0,797.0,0.00478309370839038]                     |1        |
|[23.754,26.35,476.0,803.2,0.0047940939966204105]                   |1        |
|[23.736,26.39,510.0,809.0,0.00479618871

In [31]:
logistic_regression_model = df.select(['features','Occupancy'])

In [32]:
#split 
train, test = logistic_regression_model.randomSplit([0.7,0.3])

In [33]:
print(train.count(),test.count())


1854 811


In [34]:
train.groupBy('Occupancy').count().show()

+---------+-----+
|Occupancy|count|
+---------+-----+
|        1|  676|
|        0| 1178|
+---------+-----+



In [35]:
test.groupBy('Occupancy').count().show()

+---------+-----+
|Occupancy|count|
+---------+-----+
|        1|  296|
|        0|  515|
+---------+-----+



In [36]:
log_regression = LogisticRegression(labelCol='Occupancy', maxIter=10)

In [37]:
model = log_regression.fit(train)
predict_train = model.transform(train)
predict_test = model.transform(test)

In [38]:
#evaluation
evaluator = BinaryClassificationEvaluator(labelCol="Occupancy")

In [39]:
predict_test.select('Occupancy','prediction').show(10)

+---------+----------+
|Occupancy|prediction|
+---------+----------+
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
+---------+----------+
only showing top 10 rows



In [40]:
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

The area under ROC for train set is 0.993120925045961
The area under ROC for test set is 0.9952309105221726


In [41]:
results = predict_test.select('Occupancy', 'prediction')

In [42]:
true_postives = results[(results.Occupancy == 1) & (results.prediction== 1)].count()
true_negatives = results[(results.Occupancy == 0) & (results.prediction== 0)].count()
false_positives = results[(results.Occupancy == 0) & (results.prediction== 1)].count()
false_negatives = results[(results.Occupancy == 1) & (results.prediction== 0)].count()

In [43]:
accuracy = float((true_postives+true_negatives) /(results.count()))
print(accuracy)

0.9852034525277436


In [44]:
recall = float(true_postives)/(true_postives + false_negatives)
print(recall)

0.9966216216216216


In [45]:
precision = float((true_postives) / (true_postives + false_positives))
print(precision)

0.9640522875816994


In [46]:
f1 = 2*(precision * recall) / (precision + recall)
f1

0.9800664451827242

Hyperparameter Optimization for Logistic Regression

In [47]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [48]:
print(log_regression.explainParams())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label, current: Occupancy)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The bounds vector 

In [49]:
paramGridLogistic = ParamGridBuilder()\
    .addGrid(log_regression.maxIter,[10, 100, 1000])\
    .addGrid(log_regression.fitIntercept,[False, True])\
    .addGrid(log_regression.elasticNetParam,[0.0, 0.5, 1.0]).addGrid(log_regression.regParam,[0.01, 0.5, 2.0]).build()

In [50]:
cvLog = CrossValidator(estimator=log_regression, estimatorParamMaps=paramGridLogistic, evaluator=evaluator, numFolds=5)

In [51]:
cvModelLog = cvLog.fit(train)

In [52]:
predict_train=cvModelLog.transform(train)
predict_test=cvModelLog.transform(test)
print("The area under ROC for train set after CV  is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set after CV  is {}".format(evaluator.evaluate(predict_test)))

The area under ROC for train set after CV  is 0.9921037562411468
The area under ROC for test set after CV  is 0.9948766727892941


In [53]:
bestModelLog = cvModelLog.bestModel
bestModelLog 

LogisticRegressionModel: uid=LogisticRegression_c9ddaa3f8f84, numClasses=2, numFeatures=5

Decision Tree

In [54]:
from pyspark.ml.classification import DecisionTreeClassifier

In [55]:
dt_classifier = DecisionTreeClassifier(featuresCol='features', labelCol='Occupancy')
model2 = dt_classifier.fit(train)

In [56]:
predict_train2 = model2.transform(train)
predict_test2 = model2.transform(test)


In [57]:
predict_test2.show(10, False)

+-----------------------------------------------------------------+---------+-------------+-----------+----------+
|features                                                         |Occupancy|rawPrediction|probability|prediction|
+-----------------------------------------------------------------+---------+-------------+-----------+----------+
|[20.2,22.79,0.0,434.5,0.0033303186824580697]                     |0        |[1133.0,0.0] |[1.0,0.0]  |0.0       |
|[20.2,22.79,0.0,439.0,0.0033303186824580697]                     |0        |[1133.0,0.0] |[1.0,0.0]  |0.0       |
|[20.2,22.89,0.0,433.75,0.00334501033919613]                      |0        |[1133.0,0.0] |[1.0,0.0]  |0.0       |
|[20.2,22.89,0.0,439.25,0.00334501033919613]                      |0        |[1133.0,0.0] |[1.0,0.0]  |0.0       |
|[20.218,22.81,0.0,431.4,0.00333698951696175]                     |0        |[1133.0,0.0] |[1.0,0.0]  |0.0       |
|[20.236,22.83,0.0,431.4,0.00334367068060774]                     |0        |[11

In [58]:
predict_test2.groupBy('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|       0.0|  514|
|       1.0|  297|
+----------+-----+



In [59]:
evaluator2 = BinaryClassificationEvaluator(labelCol="Occupancy")

In [60]:
predict_test2.select('Occupancy','prediction').show(10)

+---------+----------+
|Occupancy|prediction|
+---------+----------+
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
|        0|       0.0|
+---------+----------+
only showing top 10 rows



In [61]:
print("The area under ROC for train set is {}".format(evaluator2.evaluate(predict_train2)))
print("The area under ROC for test set is {}".format(evaluator2.evaluate(predict_test2)))

The area under ROC for train set is 0.9948425774304055
The area under ROC for test set is 0.9927151666229337


In [62]:
results2 = predict_test2.select('Occupancy', 'prediction')

In [63]:
true_postives2 = results2[(results2.Occupancy == 1) & (results2.prediction== 1)].count()
true_negatives2 = results2[(results2.Occupancy == 0) & (results2.prediction== 0)].count()
false_positives2 = results2[(results2.Occupancy == 0) & (results2.prediction== 1)].count()
false_negatives2 = results2[(results2.Occupancy == 1) & (results2.prediction== 0)].count()

In [64]:
accuracy2 = float((true_postives2+true_negatives2) /(results2.count()))
print(accuracy2)

0.9864364981504316


In [65]:
recall2 = float(true_postives2)/(true_postives2 + false_negatives2)
print(recall2)

0.9831081081081081


In [66]:
precision2 = float((true_postives2) / (true_postives2 + false_positives2))
print(precision2)

0.9797979797979798


In [67]:
f1_2 = 2*(precision2 * recall2) / (precision2 + recall2)
f1_2

0.981450252951096

In [68]:
model2.featureImportances

SparseVector(5, {0: 0.0244, 1: 0.01, 2: 0.9657})

In [69]:
print(model2.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_ce13fde527a4, depth=5, numNodes=15, numClasses=2, numFeatures=5
  If (feature 2 <= 207.1833333333335)
   Predict: 0.0
  Else (feature 2 > 207.1833333333335)
   If (feature 1 <= 25.59)
    If (feature 2 <= 432.475)
     If (feature 0 <= 22.403750000000002)
      If (feature 0 <= 20.380000000000003)
       Predict: 0.0
      Else (feature 0 > 20.380000000000003)
       Predict: 1.0
     Else (feature 0 > 22.403750000000002)
      If (feature 0 <= 22.59)
       Predict: 0.0
      Else (feature 0 > 22.59)
       Predict: 1.0
    Else (feature 2 > 432.475)
     Predict: 1.0
   Else (feature 1 > 25.59)
    If (feature 2 <= 367.8666666666665)
     Predict: 0.0
    Else (feature 2 > 367.8666666666665)
     Predict: 1.0



Hyperparameter Optimization for Decision Tree

In [70]:
print(dt_classifier.explainParams())

cacheNodeIds: If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval. (default: False)
checkpointInterval: set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext. (default: 10)
featuresCol: features column name. (default: features, current: features)
impurity: Criterion used for information gain calculation (case-insensitive). Supported options: entropy, gini (default: gini)
labelCol: label column name. (default: label, current: Occupancy)
leafCol: Leaf indices column name. Predicted leaf index of each instance in each tree by preorder. (default: )
maxBins: Max number of bins for disc

In [71]:
paramGrid = ParamGridBuilder()\
    .addGrid(dt_classifier.impurity,['gini', 'entropy'])\
    .addGrid(dt_classifier.maxDepth,[2, 3, 4, 5, 6])\
    .build()

In [72]:
cv = CrossValidator(estimator=dt_classifier, estimatorParamMaps=paramGrid, evaluator=evaluator2, numFolds=3)

In [73]:
cvModel = cv.fit(train)

In [74]:
predict_train2=cvModel.transform(train)
predict_test2=cvModel.transform(test)
print("The area under ROC for train set after CV  is {}".format(evaluator2.evaluate(predict_train2)))
print("The area under ROC for test set after CV  is {}".format(evaluator2.evaluate(predict_test2)))

The area under ROC for train set after CV  is 0.9955118996192523
The area under ROC for test set after CV  is 0.9918951718709


In [75]:
bestModel = cvModel.bestModel
bestModel

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_ce13fde527a4, depth=5, numNodes=11, numClasses=2, numFeatures=5

In [76]:
accuracy 

0.9852034525277436

In [77]:
accuracy2

0.9864364981504316

In [78]:
precision

0.9640522875816994

In [79]:
precision2

0.9797979797979798

In [80]:
recall

0.9966216216216216

In [81]:
recall2

0.9831081081081081

In [82]:
f1

0.9800664451827242

In [83]:
f1_2

0.981450252951096

Conclusion and Findings 

Both Decision Tree and Logistic Regression are able to give good predicitons. However there are slight differences among them:

**Precison** is slighly higher with  *Decision* *Tree* 

**Accuracy** is higher with  *Decision* *Tree* 

**F1 Score** is higher with  *Decision* *Tree* 

**Recall**  is higher with *Logistic* *Regression*
