In [1]:
# Pyspark in Jupyter : findSpark package to make a Spark Context available in your code
import findspark
findspark.init("/usr/hdp/current/spark2-client")

In [2]:
# Spark Context Initialization 

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.types import *
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.sql.functions import lit ,row_number,col, monotonically_increasing_id, when
from sklearn.ensemble import IsolationForest
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

sc = SparkContext()
sqlc = SQLContext(sc)

In [3]:
# Visualization Tools 
import seaborn as sns
from matplotlib import style
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from matplotlib.gridspec import GridSpec
from plotly.subplots import make_subplots
from plotly.offline import init_notebook_mode
import plotly.graph_objects as go
import plotly.express as px

init_notebook_mode(connected=True)
sns.set()
style.use('fivethirtyeight')

In [4]:
import numpy as np
import pandas as pd
import datetime


In [5]:
# Define Constants 
day = [(2,116),(81,1445) , (217,2855), (3634,4410), (5156,5885), (6554,7189), (7833,8476), (9078,9624)] 
night = [(117,810), (1446,2169), (2856,3633), (4411,5155), (5886,6553), (7190,7832), (8477,9077)]
dry = [(2,6148), (7462,9624)]
rainy = [(6149,7461)] 
#proportionality constant and taken as; 1.8615
proConstant = 1.8615
standardTemp = 23.17
standardPH = 7.0
standardTurb = 198
headers = ['Temperature','pH','Turbidity','Quality']
variables = ['Humidity','Night']
colors = ['lightcoral','deepskyblue',
          'orchid',    'tomato',
          'teal',      'darkcyan',
          'limegreen', 'darkorange']

In [6]:
# Calculating water quality sum(wi*value), wi=I/Si
def cal_water_quality_index(rawData):
    rawData = rawData.withColumn("sTemp", lit(standardTemp))
    rawData = rawData.withColumn("sPH", lit(standardPH))
    rawData = rawData.withColumn("sTurb", lit(standardTurb))
    qualityUdf =F.udf(cal_quality, FloatType())
    rawData=rawData.withColumn('Quality', qualityUdf(rawData.Temperature,rawData.pH,rawData.Turbidity,rawData.sTemp,rawData.sPH,rawData.sTurb))

    rawData = rawData.drop('index','sTemp','sPH','sTurb')
    return rawData

In [7]:
# Row based processing using UDF function at pyspark  
def cal_quality(Temp,PH,Trub,sTemp,sPH,sTurb):
    quality = Temp*(proConstant/sTemp) + PH*(proConstant/sPH) + Trub*(proConstant/sTurb)
    return quality

In [8]:
# Adding Humidity:Dry=0/Rainy=1, Time: Day=0/Night=1, Quality Columns to raw data
def add_new_columns(rawData):
    rawData = rawData.withColumn("Humidity", lit(0))
    rawData = rawData.withColumn("Night", lit(0))
    rawData = rawData.withColumn("index", monotonically_increasing_id()+2)
    for row in night:
        rawData = rawData.withColumn("Night", when(rawData.index.between(int(row[0]),int(row[1])),lit(1))\
                                     .otherwise(rawData.Night))
    for row in rainy:
        rawData = rawData.withColumn("Humidity", when(rawData.index.between(int(row[0]),int(row[1])),lit(1))\
                                     .otherwise(rawData.Humidity))
    return rawData

In [9]:
# Plot Anomalies 
def plot_anomalies(plotData, variable):
    if head == 'Quality':
        return
    fig = px.line(plotData, x="Date_Time", y=[variable], title='Water Quality', template = 'plotly_dark')
    anomalyData = plotData.loc[plotData["Prediction"] == -1]
    anomalyData = anomalyData.set_index('Date_Time')
    fig.add_trace(go.Scatter(x=anomalyData.index, y=anomalyData[variable], mode = 'markers', 
                name = 'Anomaly', 
                marker=dict(color='red',size=10)))
        
    fig.show()

In [10]:
# Data Downsampleing per minutes
def downsample(rawData,minutes):
    resampledData = rawData.select(rawData.columns[:7]).toPandas()
    resampledData = resampledData.set_index('Date_Time')
    resampledData = resampledData.resample(str(minutes)+'T').mean()
    resampledData['Humidity'] = resampledData['Humidity'].fillna(0).round().astype(int)
    resampledData['Night'] = resampledData['Night'].fillna(0).round().astype(int)
    return resampledData

In [11]:
# Preprocessing Steps
def preprocessing (rawData):
    # rename colname Temperature (°C) to Temperature 
    rawData = rawData.withColumnRenamed(rawData.columns[0], 'Date_Time')
    rawData = rawData.withColumnRenamed(rawData.columns[1], 'Temperature')
    rawData = rawData.withColumnRenamed(rawData.columns[3], 'Turbidity')
    # Adding Humidity:Dry=0/Rainy=1, Time: Day=0/Night=1, Quality Columns to raw data
    rawData = add_new_columns(rawData)
    rawData = cal_water_quality_index(rawData)
    return rawData

In [12]:
# Reading Raw Data file from HDFS 
# Please change dataPath with your directory path
dataPath = 'hdfs://node1.sepahtan:8020/data/'
dataPathSens30 = dataPath + "Sensor_data_for_30_cm.csv"
rawDataSens30= sqlc.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(dataPathSens30)


In [13]:
# Preprocessing
rawDataSens30 = preprocessing(rawDataSens30)
downsampleData =  downsample (rawDataSens30,60).reset_index()
downsampleData = sqlc.createDataFrame(downsampleData)
downsampleData = downsampleData.withColumn("id", monotonically_increasing_id())
train = downsampleData.select('id','Temperature','pH','Turbidity')
train.show(200)

+----------+------------------+------------------+------------------+
|        id|       Temperature|                pH|         Turbidity|
+----------+------------------+------------------+------------------+
|         0|            20.731|7.7874999999999925|             197.0|
|         1|20.401636363636364| 7.735636363636369|             197.0|
|         2|20.158301886792458|7.7207547169811415|             197.0|
|         3| 19.65824561403509| 7.810877192982455| 196.3684210526316|
|         4|  18.7028813559322|  8.06135593220339|             196.0|
|         5|18.315254237288137| 8.188644067796606|195.77966101694915|
|         6|18.166874999999994| 8.226718750000007|             195.0|
|         7|17.915999999999993| 8.229538461538466|             195.0|
|         8|17.735846153846147| 8.256461538461531|             195.0|
|         9|17.521475409836068| 8.293770491803281|             195.0|
|        10|17.498103448275863| 8.308965517241381|194.17241379310346|
|        11|17.46326

In [14]:
# Vector Assembling
vecAssembler = VectorAssembler(inputCols= train.columns[1:], outputCol="features")
trainDF = vecAssembler.transform(train).select('id', 'features')
trainDF.show(200,truncate=False)

+----------+----------------------------------------------------------+
|id        |features                                                  |
+----------+----------------------------------------------------------+
|0         |[20.731,7.7874999999999925,197.0]                         |
|1         |[20.401636363636364,7.735636363636369,197.0]              |
|2         |[20.158301886792458,7.7207547169811415,197.0]             |
|3         |[19.65824561403509,7.810877192982455,196.3684210526316]   |
|4         |[18.7028813559322,8.06135593220339,196.0]                 |
|5         |[18.315254237288137,8.188644067796606,195.77966101694915] |
|6         |[18.166874999999994,8.226718750000007,195.0]              |
|7         |[17.915999999999993,8.229538461538466,195.0]              |
|8         |[17.735846153846147,8.256461538461531,195.0]              |
|9         |[17.521475409836068,8.293770491803281,195.0]              |
|10        |[17.498103448275863,8.308965517241381,194.1724137931

In [15]:
# Train_model
kmeans = KMeans().setK(3).setSeed(5)
model = kmeans.fit(trainDF)

In [16]:
# Model Evaluation 
predictionsEvaluate = model.transform(trainDF)
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictionsEvaluate)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.7889561639745398


In [17]:
# Clustring data
predictions = model.transform(trainDF).select('id', 'prediction')
resultRows = predictions.collect()
predictionDF = sqlc.createDataFrame(resultRows)
print(predictionDF.count())
predictionDF = downsampleData.join(predictionDF,'id').drop('id').sort('Date_Time')
predictionDF.show(200,truncate=False)

169
+-------------------+------------------+------------------+------------------+--------+-----+------------------+----------+
|Date_Time          |Temperature       |pH                |Turbidity         |Humidity|Night|Quality           |prediction|
+-------------------+------------------+------------------+------------------+--------+-----+------------------+----------+
|2020-01-15 16:00:00|20.731            |7.7874999999999925|197.0             |0       |0    |5.588565826416016 |2         |
|2020-01-15 17:00:00|20.401636363636364|7.735636363636369 |197.0             |0       |0    |5.548312187194824 |2         |
|2020-01-15 18:00:00|20.158301886792458|7.7207547169811415|197.0             |0       |1    |5.524805068969727 |2         |
|2020-01-15 19:00:00|19.65824561403509 |7.810877192982455 |196.3684210526316 |0       |1    |5.502658367156982 |2         |
|2020-01-15 20:00:00|18.7028813559322  |8.06135593220339  |196.0             |0       |1    |5.489049434661865 |2         |
|202

In [18]:
# Cluster Centers 
centers = model.clusterCenters()
print(centers)

[array([ 20.02542647,   7.67621439, 213.98159473]), array([ 19.86443129,   7.84666842, 241.25012599]), array([ 19.98756454,   7.76960577, 198.66727167])]


In [19]:
# Save clustering result at hdfs csv file 
predictionDF.coalesce(1).write.mode('overwrite').option('header','true').csv('hdfs://node1.sepahtan:8020/data/results/'+'Clustering_Result.csv',sep=',')

In [20]:
# Drawing the clusters 
fig = px.scatter_3d(predictionDF.toPandas(), x='Temperature', y='pH', z='Turbidity',title='Water Quality',color='prediction')#, symbol='species')
fig.show()