<a href="https://colab.research.google.com/github/steezy18/AIIP_Summative_Assessment/blob/master/BD_Summative_Spark_(1).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# GC Setup
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

In [0]:
from google.colab import files
uploaded = files.upload() # Upload BDSensors_Temp.csv

Saving BDSensors_Temp.csv to BDSensors_Temp.csv


We will explore fitting a regression model and k-means clustering in Spark ML. The data for Device 3 and 10 will be remove from the imported csv file

### Start spark app

In [0]:
import findspark
findspark.init()

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Spark_ml") \
    .getOrCreate()

### Load and clean data

In [0]:
data = spark.read.csv("BDSensors_Temp.csv",header=True)

In [0]:
data.head()

Row(Device_ID='7', Moving Avg='51.90649336463123', SD='0.0', Temp1='51.90649336463123', Temp2='67.09714270109436', Temp3='25.953246682315616', Temp_Ambient='21.81445597653822', Time='0')

In [0]:

#drop nas
data = data.dropna()

In [0]:
data.printSchema()

root
 |-- Device_ID: string (nullable = true)
 |-- Moving Avg: string (nullable = true)
 |-- SD: string (nullable = true)
 |-- Temp1: string (nullable = true)
 |-- Temp2: string (nullable = true)
 |-- Temp3: string (nullable = true)
 |-- Temp_Ambient: string (nullable = true)
 |-- Time: string (nullable = true)



In [0]:
# covert to numeric types
#import double type from spark sql
from pyspark.sql.types import FloatType, IntegerType

#convert all columns
for col_name in data.columns:
    data = data.withColumn(col_name, data[col_name].cast(FloatType()))

data = data.withColumn("Device_ID", data["Device_ID"].cast(IntegerType()))
data = data.withColumn("Time", data["Time"].cast(IntegerType()))
    


In [0]:
data.printSchema()

root
 |-- Device_ID: integer (nullable = true)
 |-- Moving Avg: float (nullable = true)
 |-- SD: float (nullable = true)
 |-- Temp1: float (nullable = true)
 |-- Temp2: float (nullable = true)
 |-- Temp3: float (nullable = true)
 |-- Temp_Ambient: float (nullable = true)
 |-- Time: integer (nullable = true)



**Filter out the data for Device 3 and 10 that have erroneuos readings for Temperature 3**

In [0]:
data1=data.filter((data.Device_ID !=3) & (data.Device_ID !=10))

In [0]:
data1.show(10)


+---------+----------+---------+---------+---------+---------+------------+----+
|Device_ID|Moving Avg|       SD|    Temp1|    Temp2|    Temp3|Temp_Ambient|Time|
+---------+----------+---------+---------+---------+---------+------------+----+
|        7| 51.906494|      0.0|51.906494|67.097145|25.953247|   21.814455|   0|
|        6|  57.74293|5.8364387|63.579372| 79.93731|31.789686|   22.253613|   0|
|        9|  57.38179| 4.792723|  56.6595| 72.32545| 28.32975|   22.088745|   0|
|        5| 49.695377|13.945262| 26.63615|39.299767|13.318075|   21.830154|   1|
|        5| 45.801144|14.704983|30.224207|43.246628|15.112103|    22.37136|   1|
|        7| 45.397617|14.350903| 57.76054|  73.5366| 28.88027|   21.898958|   1|
|        1| 51.736897|21.482782| 96.11184|115.72302| 48.05592|   21.937454|   0|
|        0| 54.444283| 21.65342| 76.10337| 93.71371|38.051685|   21.981083|   0|
|        4| 53.429123|20.766764|44.292683| 58.72195|22.146341|   21.917131|   0|
|        6|  53.58149|20.760

In [0]:
# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data1.randomSplit([0.7, 0.3])

### Prepare data for model

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
# assemble variables to one feature column
assembler = VectorAssembler(
    inputCols = ['Temp1',"Temp2","Temp_Ambient","Moving Avg"],
    outputCol = "features")

#define the estimator - decision tree
dtrmodel = DecisionTreeRegressor(labelCol="Temp3", featuresCol="features")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[assembler, dtrmodel])

### Fit pipeline and transform data

In [0]:
#fit the pipeline
PipelineModel = pipeline.fit(trainingData)

# transform using the pipeline
predictions = PipelineModel.transform(testData)

# evaluate model fit
predictions.select("prediction", "Temp3")
evaluator = RegressionEvaluator(
    labelCol="Temp3", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

In [0]:
predictions.show()

+---------+----------+----------+---------+---------+---------+------------+----+--------------------+------------------+
|Device_ID|Moving Avg|        SD|    Temp1|    Temp2|    Temp3|Temp_Ambient|Time|            features|        prediction|
+---------+----------+----------+---------+---------+---------+------------+----+--------------------+------------------+
|        0|  35.65844| 15.693521|  66.4446| 83.08906|  33.2223|   26.305725| 220|[66.4446029663086...| 33.29723166888139|
|        0| 40.368557| 19.175915|62.551487| 78.80663|31.275743|   25.521416|  79|[62.5514869689941...|30.819608137458157|
|        0| 40.570316| 14.332231| 66.53834| 83.19218| 33.26917|   26.811638| 183|[66.5383377075195...| 33.29723166888139|
|        0| 40.763687| 17.851013| 60.10045| 76.11049|30.050224|   27.053366| 157|[60.1004486083984...|29.931933704878436|
|        0|   41.5573| 11.620642| 62.28035|78.508385|31.140175|   25.592505|  86|[62.2803497314453...|30.819608137458157|
|        0| 42.050045| 1

In [0]:
##Root mean square error
print(rmse)

0.46258201749394345


In [0]:
#save the fitted pipeline for later use
PipelineModel.save("my1_pipeline")

**Filter all other data except the data for Device ID 3 and 10. Use the Model developed to predict the correct readings for Temperature 3 readings for Device 3 and 10**

In [0]:
data2=data.filter((data.Device_ID !=0) & (data.Device_ID !=1) & (data.Device_ID !=2) & (data.Device_ID !=4) & (data.Device_ID !=5) & (data.Device_ID !=6)& (data.Device_ID !=7) & (data.Device_ID !=8) & (data.Device_ID !=9))

In [0]:
data2.show()

+---------+----------+---------+---------+---------+---------+------------+----+
|Device_ID|Moving Avg|       SD|    Temp1|    Temp2|    Temp3|Temp_Ambient|Time|
+---------+----------+---------+---------+---------+---------+------------+----+
|       10|  43.33713|14.510475|31.017063| 44.11877|215.50853|   22.137428|   0|
|       10|   64.0277| 21.13513| 24.95669| 37.45236|212.47835|   21.877329|   0|
|       10| 61.671913|22.938387|34.202652|47.622917|217.10132|   22.051142|   1|
|        3| 52.479004|20.162497|47.760574|62.536633| 223.8803|   21.992851|   0|
|        3|   53.3086| 19.97843| 52.58869|67.847565|226.29434|   21.748182|   0|
|       10| 45.815186|15.556214|23.719845| 36.09183|211.85992|    22.45917|   1|
|       10| 53.346912|17.507368|29.668783|42.635662| 214.8344|   21.816574|   1|
|       10|  55.50349|21.735847|20.564281| 32.62071|210.28214|    21.84566|   1|
|       10|    51.855|23.703283| 23.18535|35.503887|211.59268|   22.388458|   2|
|       10| 48.004562|20.595

In [0]:
# get new Temp3 predictions for device 3 and 10 using the pipeline model developed above
predictions_3_10 = PipelineModel.transform(data2)


In [0]:
predictions_3_10.show()

+---------+----------+---------+---------+---------+---------+------------+----+--------------------+------------------+
|Device_ID|Moving Avg|       SD|    Temp1|    Temp2|    Temp3|Temp_Ambient|Time|            features|        prediction|
+---------+----------+---------+---------+---------+---------+------------+----+--------------------+------------------+
|       10|  43.33713|14.510475|31.017063| 44.11877|215.50853|   22.137428|   0|[31.0170631408691...|16.109317574449765|
|       10|   64.0277| 21.13513| 24.95669| 37.45236|212.47835|   21.877329|   0|[24.9566898345947...|11.988466242381504|
|       10| 61.671913|22.938387|34.202652|47.622917|217.10132|   22.051142|   1|[34.2026519775390...| 18.08628306374953|
|        3| 52.479004|20.162497|47.760574|62.536633| 223.8803|   21.992851|   0|[47.7605743408203...| 23.89463337714022|
|        3|   53.3086| 19.97843| 52.58869|67.847565|226.29434|   21.748182|   0|[52.5886917114257...|26.668721967159595|
|       10| 45.815186|15.556214|

The new predicted value for Temp 3 is reasonable and can be use to replace the erroneous values for devices 3 and 10. From the function to generate the values of Temp1, Temp2 and Temp3, Temp 3 is half of Temp1. The predicted Temps 3 is approximately half of the Temp1 for Devices 3 and 10.

In [0]:
Sensor3_10data=predictions_3_10

In [0]:
Sensor3_10data.columns

['Device_ID',
 'Moving Avg',
 'SD',
 'Temp1',
 'Temp2',
 'Temp3',
 'Temp_Ambient',
 'Time',
 'features',
 'prediction']

In [0]:
Sensor3_10data['Temp3'].values=Sensor3_10data['prediction'].values

In [0]:
Sensor3_10data.show()

+---------+----------+---------+---------+---------+---------+------------+----+--------------------+------------------+
|Device_ID|Moving Avg|       SD|    Temp1|    Temp2|    Temp3|Temp_Ambient|Time|            features|        prediction|
+---------+----------+---------+---------+---------+---------+------------+----+--------------------+------------------+
|       10|  43.33713|14.510475|31.017063| 44.11877|215.50853|   22.137428|   0|[31.0170631408691...|16.109317574449765|
|       10|   64.0277| 21.13513| 24.95669| 37.45236|212.47835|   21.877329|   0|[24.9566898345947...|11.988466242381504|
|       10| 61.671913|22.938387|34.202652|47.622917|217.10132|   22.051142|   1|[34.2026519775390...| 18.08628306374953|
|        3| 52.479004|20.162497|47.760574|62.536633| 223.8803|   21.992851|   0|[47.7605743408203...| 23.89463337714022|
|        3|   53.3086| 19.97843| 52.58869|67.847565|226.29434|   21.748182|   0|[52.5886917114257...|26.668721967159595|
|       10| 45.815186|15.556214|

### END

In [0]:
spark.stop()