# Introduction

This notebook will be divided in big parts four parts:

1. Load Data - Load data from google drive, unzip and read what's necessary in a pyspark dataframe. 

2. Data Preparation - Create columns, group by, pivot, clean and split data.

3. Build PySpark Pipeline - Put all the pre-processing steps with the algorithm in a pipeline

4. Predict - predict Test Data all the pipeline. 




# Imports and Install

## Installs

In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 64kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 42.3MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=758dddf1eafaf07c4150832984ebd8caafddaaf0664b44423fa189bccf75e446
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


## Imports

In [2]:
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types  import * 
import os

# Pyspark Configurations

In [3]:
spark = SparkSession.builder.appName('app1').getOrCreate()

# 1 - Load Data

## Unzip

In [4]:
!mkdir '/content/Data'
!unzip '/content/drive/MyDrive/Specialization/Projects/Big Data/AirQualityData.zip' -d '/content/Data'

Archive:  /content/drive/MyDrive/Specialization/Projects/Big Data/AirQualityData.zip
  inflating: /content/Data/2017-07_bme280sof.csv  
  inflating: /content/Data/2017-07_sds011sof.csv  
  inflating: /content/Data/2017-08_bme280sof.csv  
  inflating: /content/Data/2017-08_sds011sof.csv  
  inflating: /content/Data/2017-09_bme280sof.csv  
  inflating: /content/Data/2017-09_sds011sof.csv  
  inflating: /content/Data/2017-10_bme280sof.csv  
  inflating: /content/Data/2017-10_sds011sof.csv  
  inflating: /content/Data/2017-11_bme280sof.csv  
  inflating: /content/Data/2017-11_sds011sof.csv  
  inflating: /content/Data/2017-12_bme280sof.csv  
  inflating: /content/Data/2017-12_sds011sof.csv  
  inflating: /content/Data/2018-01_bme280sof.csv  
  inflating: /content/Data/2018-01_sds011sof.csv  
  inflating: /content/Data/2018-02_bme280sof.csv  
  inflating: /content/Data/2018-02_sds011sof.csv  
  inflating: /content/Data/2018-03_bme280sof.csv  
  inflating: /content/Data/2018-03_sds011sof.csv

## Reading the usefull data in a PySpark DataFrame

In [5]:
!ls '/content/Data'

2017-07_bme280sof.csv  2018-03_sds011sof.csv  2018-12_bme280sof.csv
2017-07_sds011sof.csv  2018-04_bme280sof.csv  2018-12_sds011sof.csv
2017-08_bme280sof.csv  2018-04_sds011sof.csv  2019-01_bme280sof.csv
2017-08_sds011sof.csv  2018-05_bme280sof.csv  2019-01_sds011sof.csv
2017-09_bme280sof.csv  2018-05_sds011sof.csv  2019-02_bme280sof.csv
2017-09_sds011sof.csv  2018-06_bme280sof.csv  2019-02_sds011sof.csv
2017-10_bme280sof.csv  2018-06_sds011sof.csv  2019-03_bme280sof.csv
2017-10_sds011sof.csv  2018-07_bme280sof.csv  2019-03_sds011sof.csv
2017-11_bme280sof.csv  2018-07_sds011sof.csv  2019-04_bme280sof.csv
2017-11_sds011sof.csv  2018-08_bme280sof.csv  2019-04_sds011sof.csv
2017-12_bme280sof.csv  2018-08_sds011sof.csv  2019-05_bme280sof.csv
2017-12_sds011sof.csv  2018-09_bme280sof.csv  2019-05_sds011sof.csv
2018-01_bme280sof.csv  2018-09_sds011sof.csv  2019-06_bme280sof.csv
2018-01_sds011sof.csv  2018-10_bme280sof.csv  2019-06_sds011sof.csv
2018-02_bme280sof.csv  2018-10_sds011sof.csv  20

In [6]:
schema = StructType() \
      .add("_c0",IntegerType(),True) \
      .add("sensor_id",IntegerType(),True) \
      .add("location",StringType(),True) \
      .add("lat",DoubleType(),True) \
      .add("lon",DoubleType(),True) \
      .add("timestamp",TimestampType(),True) \
      .add("pressure",DoubleType(),True) \
      .add("temperature",DoubleType(),True) \
      .add("humidity",DoubleType(),True)

In [7]:
path = '/content/Data/'
ref = 0
for file_ in os.listdir(path):
  if 'bme' in file_:
    if ref == 0: 
      df = spark.read.format('csv').option('header',True).load(path+file_, schema = schema)
      ref = 1
    else:
      df = df.union(spark.read.format('csv').option('header',True).load(path+file_, schema = schema))
  else:
    pass 
df = df.drop('_c0')

In [8]:
df.show()

+---------+--------+------------------+------------------+-------------------+--------------+-----------+--------+
|sensor_id|location|               lat|               lon|          timestamp|      pressure|temperature|humidity|
+---------+--------+------------------+------------------+-------------------+--------------+-----------+--------+
|    10954|    5529|            42.676|            23.266|2019-06-01 00:00:00|       94926.8|      12.59|   85.09|
|    19750|   10033|            42.662|             23.27|2019-06-01 00:00:01|      94509.04|      15.14|   72.72|
|     8280|    8335|            42.646|             23.38|2019-06-01 00:00:01|      94759.52|      13.52|   93.57|
|    11971|    6045|42.681999999999995|            23.294|2019-06-01 00:00:01|      94892.28|      13.83|   82.66|
|    22930|   11630|             42.66|23.291999999999998|2019-06-01 00:00:01|       94914.3|       13.5|   79.06|
|     8059|    4072|            42.668|            23.396|2019-06-01 00:00:01|  

# 2 - Data Preparation

The main idea is to calculate the average by location and month for the humidity, temperature and pressure.

In summary the result will going to be three PySpark DataFrames, with each row as a location and each column as a month of the year.

The Locations that is no information for a month will be discarded, of course they are much better ways to do this, and it could be implemented in the future improvements.


## Creating Month column

In [9]:
df = df.withColumn('month',month(df.timestamp))
df.show()

+---------+--------+------------------+------------------+-------------------+--------------+-----------+--------+-----+
|sensor_id|location|               lat|               lon|          timestamp|      pressure|temperature|humidity|month|
+---------+--------+------------------+------------------+-------------------+--------------+-----------+--------+-----+
|    10954|    5529|            42.676|            23.266|2019-06-01 00:00:00|       94926.8|      12.59|   85.09|    6|
|    19750|   10033|            42.662|             23.27|2019-06-01 00:00:01|      94509.04|      15.14|   72.72|    6|
|     8280|    8335|            42.646|             23.38|2019-06-01 00:00:01|      94759.52|      13.52|   93.57|    6|
|    11971|    6045|42.681999999999995|            23.294|2019-06-01 00:00:01|      94892.28|      13.83|   82.66|    6|
|    22930|   11630|             42.66|23.291999999999998|2019-06-01 00:00:01|       94914.3|       13.5|   79.06|    6|
|     8059|    4072|            

## Flattening pressure of each location throught time

In [10]:
%%time
df_pressure = df.groupBy('location').pivot("month").mean('pressure')
print('number of locations before clean',df_pressure.count())
df_pressure = df_pressure.na.drop(how="any")
print('number of locations after clean',df_pressure.count())
df_pressure.show()

number of locations before clean 485
number of locations after clean 282
+--------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|location|                1|                2|                3|                4|                5|                6|                7|                8|                9|               10|               11|               12|
+--------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|    3249|94413.14942097697|94659.26990831942|94261.59885675635|94600.09524029732|94390.92453766098|94553.01256445797|94435.12259930206|94770.50252015439|95002.42913810138|94900.29099327704| 94892.3114372398|94773.34127841219|
|    3266|  94725.2

## Flattening the temperature of each location throught time

In [11]:
%%time
df_temp = df.groupBy('location').pivot("month").mean('temperature')
print('number of locations before clean',df_temp.count())
df_temp = df_temp.na.drop(how="any")
print('number of locations after clean',df_temp.count())
df_temp.show()

number of locations before clean 485
number of locations after clean 282
+--------+--------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|location|                   1|                  2|                  3|                 4|                  5|                 6|                 7|                 8|                 9|                10|                11|                 12|
+--------+--------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+
|    3249|  1.6123813218390783|  3.400035928885588|  8.311802094722905|14.367954106421575| 17.654054313189132|21.331531653784886|23.011794796351552| 22.78055099894845|18.224454402607233| 12.93

## Flattening the humidity of each location throught time

In [12]:
%%time
df_humidity = df.groupBy('location').pivot('month').mean('humidity')
print('number of locations before clean',df_humidity.count())
df_humidity = df_humidity.na.drop(how="any")
print('number of locations after clean',df_humidity.count())
df_humidity.show()

number of locations before clean 485
number of locations after clean 282
+--------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+
|location|                 1|                 2|                 3|                 4|                 5|                 6|                 7|                 8|                 9|                10|                11|               12|
+--------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+
|    3249| 72.17810114942529| 68.92432385554118|57.795672767034944|57.382669320376905| 61.13510589097887| 68.06843442090981| 61.99838468328367| 59.66972169645976|56.991662689867894| 59.86257481347878|  71.12796593

## Split Data  

In [13]:
df_pressure_train,df_pressure_test = df_pressure.randomSplit([0.8,0.2], seed = 7)

train_locations = list(df_pressure_train.select('location').distinct())
test_locations = list(df_pressure_test.select('location').distinct())



df_temp_train = df_temp.filter(col('location').isin(train_locations))
df_temp_test = df_temp.filter(col('location').isin(test_locations))


df_humidity_train = df_humidity.filter(col('location').isin(train_locations))
df_humidity_test = df_humidity.filter(col('location').isin(test_locations))


# 3 - Build PySpark Pipeline

For now, just the DataFrame related to the temperature will be used

In [14]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

## Vector Assembler

In [15]:
assemblers = VectorAssembler(inputCols=['1','2','3','4','5','6','7','8','9','10','11','12'], outputCol="x_vec")

## Min Max Scaler

In [16]:
scaler = MinMaxScaler(inputCol="x_vec", outputCol="features")

## KMeans

### Finding the correct number of Cluster

In [17]:
%%time
df_temp_train_assembler = assemblers.transform(df_temp_train)
scaler_model = scaler.fit(df_temp_train_assembler)
df_temp_train_scaled = scaler_model.transform(df_temp_train_assembler)

CPU times: user 63.5 ms, sys: 16.2 ms, total: 79.7 ms
Wall time: 5min 48s


In [None]:
%%time
silhouette  = [] 
evaluator = ClusteringEvaluator()
for k in range(2,10):
  kmeans = KMeans().setK(k).setSeed(7)
  model = kmeans.fit(df_temp_train_scaled)
  transformed = model.transform(df_temp_train_scaled)
  silhouette.append(evaluator.evaluate(transformed))
  del kmeans, model, transformed

In [20]:
silhouette

[0.23527306717568228,
 0.5413584870388323,
 0.5268146078119481,
 0.40910699064379696,
 0.8760153954973869,
 0.5778267406876275,
 0.42318617095477123,
 0.3065542223982795]

The highest number of the silhouette score is the optimal number of cluster.

https://medium.com/@cmukesh8688/silhouette-analysis-in-k-means-clustering-cefa9a7ad111#:~:text=The%20value%20of%20the%20silhouette,near%200%20denote%20overlapping%20clusters.

-------
For this application the optimal number of cluster is k = 6, give by the calculated metric in the silhouette list.

This mean that instead of have to buil 282 forecasting algorithm will be possible to build only 6.



## Pipeline

After knowing the optimal number of cluster, it's possible do build the pipeline

In [48]:
del assemblers, scaler, kmeans

In [49]:
assemblers = VectorAssembler(inputCols=['1','2','3','4','5','6','7','8','9','10','11','12'], outputCol="x")
scaler = MinMaxScaler(inputCol="x", outputCol="output")
kmeans = KMeans(featuresCol='output').setK(6).setSeed(7)

In [50]:
pipeline = Pipeline(stages=[assemblers, scaler, kmeans])

In [51]:
%%time
pipeline_model  = pipeline.fit(df_temp_train_scaled)
transformed = pipeline_model.transform(df_temp_train_scaled)
transformed.show()

+--------+--------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+--------------------+--------------------+--------------------+--------------------+----------+
|location|                   1|                  2|                  3|                 4|                  5|                 6|                 7|                 8|                 9|                10|                11|                 12|               x_vec|            features|                   x|              output|prediction|
+--------+--------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------

In [52]:
transformed.select('prediction').distinct().show()

+----------+
|prediction|
+----------+
|         1|
|         3|
|         5|
|         4|
|         2|
|         0|
+----------+



# 4 - Predict

Predict the test data with the pipeline. 

In [53]:
transformed = pipeline_model.transform(df_temp_train_scaled)
transformed.show()

+--------+--------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+--------------------+--------------------+--------------------+--------------------+----------+
|location|                   1|                  2|                  3|                 4|                  5|                 6|                 7|                 8|                 9|                10|                11|                 12|               x_vec|            features|                   x|              output|prediction|
+--------+--------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+--------------------+--------------------+--------------------+-------------

In [54]:
transformed.select('prediction').distinct().show()

+----------+
|prediction|
+----------+
|         1|
|         3|
|         5|
|         4|
|         2|
|         0|
+----------+

