<h2>EXTRACT TRANSFORM LOAD NOTEBOOK</h2>

The below code-block is to integrate the notebook with the appropriate *project token* so as to work with the project resources.

In [1]:
# The code was removed by Watson Studio for sharing.

Let's start by installing PySpark.

In [None]:
#Install PySpark
!pip install pyspark==2.4.5

Collecting pyspark==2.4.5
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 149kB/s eta 0:00:0169.6MB/s eta 0:00:13��█████████████████████▉         | 155.2MB 9.4MB/s eta 0:00:07
[?25hCollecting py4j==0.10.7 (from pyspark==2.4.5)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 40.4MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Stored in directory: /home/dsxuser/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


Now we can import the necessary libraries for ETL.

In [None]:
#Import necessary libraries

import matplotlib.pyplot as plt
import seaborn as sns
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

Apache Spark requires a session initialization before reading the data. Hence, let's create a Spark session.

In [None]:
#Create Spark instance

sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

Now let's download the data from my GitHub repo into the notebook path.

In [None]:
#Download data from GitHub as a parquet file

!wget https://github.com/soundarzozm/Iot-Sensor-Data/raw/master/df.parquet?raw=true
!mv df.parquet?raw=true df.parquet

Since the dataset is now present in the notebook path, it is time to read the dataset using Apache Spark and create a dataframe instance of the dataset.
Let's name the dataframe as df and the dataset as sensor_data.

In [None]:
#Create a spark dataframe out of the parquet file
df = spark.read.parquet('df.parquet')

#Create an instance of the dataset and name the table as sensor_data
df.createOrReplaceTempView('sensor_data')

#Print the statistical description of the dataframe
df.describe().show()

From the **Data Exploration** notebook we concluded that *Device 3* is most appropriate to work on.<br>
<br>
Hence, we create a new dataframe with only that particular device's values and perform further analysis.<br>
Let's also print the first 20 records of the dataframe to confirm.

In [None]:
#Create a new dataframe df_3 having data only from the third device
df_3 = spark.sql("SELECT * from sensor_data where device = 'b8:27:eb:bf:9d:51'")

#Print first 20 records of the dataframe
df_3.show()

Let's define the correlation matrix function and plot the matrix.

In [None]:
#Import necessary libraries
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

#Create label list
labels = ['co', 'humidity', 'light', 'lpg', 'motion', 'smoke', 'temp']

#Define function that accepts a spark dataframe and returns Correlation Matrix 
def correlation_matrix(dataframe, labels):
    
    #Create mew column called corr_features containing necessary features for the matrix
    assembler = VectorAssembler(inputCols=labels, outputCol="corr_features")
    
    #Call the assembler to create an instance
    df_vector = assembler.transform(dataframe).select("corr_features")

    #Get correlation matrix
    matrix = Correlation.corr(df_vector, "corr_features").collect()[0][0]
    
    #Convert to Python list fornat
    cor_mat = matrix.toArray().tolist()
    return cor_mat

In [None]:
#Plot the Correlation Matrix using Seaborn
sns.set(rc={'figure.figsize':(11.7,8.27), "axes.titlesize":20})
sns.heatmap(correlation_matrix(df_3, labels), cmap="Blues", xticklabels=labels, yticklabels=labels).set_title('Correlation Matrix\nDevice 3')

We notice that **co**, **lpg**, and **smoke** are very highly correlated to each other (as studied from the *Data Exploration Notebook*).<br>
<br>
Hence, we can go ahead and **drop** any two of these features since it would remove redundancy from the data.

In [None]:
#Delete the redundant columns from the dataframe 
df_3 = df_3.drop('co').drop('lpg')

Let's print the dataframe after dropping **co** and **lpg** from the dataframe.

In [None]:
#Display the first 20 records of the dataframe
df_3.show()

That looks great!<br>
<br>
Now, for the next step, we need to convert the *boolean* entries in **motion** and **light** to *integer* values (0 and 1) because *machine learning* requires data in numerical format.<br>
We do this by defining a function which takes a boolean argument and returns the corresponding integer value (0 for False, 1 for True), and then apply this function columnwise and store the output in a new column.

In [None]:
#Define function to convert boolean values to integer values
def bool_to_int(x):
    if x == False:
        return 0
    elif x == True:
        return 1
    else:
        return x

In [None]:
#Import necessary modules
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

#Create a user defined function to be applied on dataframe using the previously defined function
bool_to_int_udf = udf(lambda x: bool_to_int(x), IntegerType())

In [None]:
#Apply the user defined function to the dataframe's light column and create a new column called light_final having integer values
df = df_3.withColumn("light_final", bool_to_int_udf(df_3.light))

In [None]:
#Display first 20 records of the database
df.show()

In [None]:
#Apply the user defined function to the dataframe's motion column and create a new column called motion_final having integer values
df = df.withColumn("motion_final", bool_to_int_udf(df_3.motion))

In [None]:
#Display first 20 records of the dataframe
df.show()

Now that we have the required columns, we can **drop** the old ones. Hence we drop *light* and *motion*.

In [None]:
#Delete unnecessary columns from the dataframe
df = df.drop('ts').drop('device').drop('light').drop('motion')

Let's print the first 20 records of the final dataframe to confirm the application of the necessary transformations.

In [None]:
#Display first 20 records of the dataframe
df.show()

<h3>Perfect!</h3>
<br>
We observe that the dataframe is has numerical data throughout and the data makes complete sense.<br>
Hence, we can move ahead to feeding this data to machine learning models.<br>
<br>
Let's have a final analysis on the dataframe using statistical description and a correlation matrix. 


In [None]:
# The code was removed by Watson Studio for sharing.

In [None]:
#Display statistical description of the final dataframe
df.describe().show()

In [None]:
#Define new labels list
labels = ['humidity', 'light_final','motion_final', 'smoke', 'temp']

#Plot the Correlation Matrix using Seaborn
sns.set(rc={'figure.figsize':(11.7,8.27), "axes.titlesize":20})
sns.heatmap(correlation_matrix(df, labels), cmap="Blues", xticklabels=labels, yticklabels=labels).set_title('Correlation Matrix')

We observe that correlation between all the columns are very favourable.<br>
Let's export the dataframe to a parquet file in the notebook path since we'll be using it in the next notebook which is about model definition, training and deployment.

In [None]:
"""project.save_data(file_name = "df_final.parquet",data = df.write.parquet("df_final.parquet"), overwrite=True, set_project_asset = True)"""