In [1]:
from pyspark.sql import SQLContext
sqlContext=SQLContext(sc)

In [21]:
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from notebooks import utils
%matplotlib inline

In [4]:
df=sqlContext.read.load('file:///home/cloudera/Downloads/big-data-4/minute_weather.csv',format='com.databricks.spark.csv',
                       header='true',inferSchema='true')

In [7]:
import pandas as pd
pd.DataFrame(df.take(3),columns=df.columns)

Unnamed: 0,rowID,hpwren_timestamp,air_pressure,air_temp,avg_wind_direction,avg_wind_speed,max_wind_direction,max_wind_speed,min_wind_direction,min_wind_speed,rain_accumulation,rain_duration,relative_humidity
0,0,2011-09-10 00:00:49,912.3,64.76,97.0,1.2,106.0,1.6,85.0,1.0,,,60.5
1,1,2011-09-10 00:01:49,912.3,63.86,161.0,0.8,215.0,1.5,43.0,0.2,0.0,0.0,39.9
2,2,2011-09-10 00:02:49,912.3,64.22,77.0,0.7,143.0,1.2,324.0,0.3,0.0,0.0,43.0


In [8]:
df.printSchema()

root
 |-- rowID: integer (nullable = true)
 |-- hpwren_timestamp: timestamp (nullable = true)
 |-- air_pressure: double (nullable = true)
 |-- air_temp: double (nullable = true)
 |-- avg_wind_direction: double (nullable = true)
 |-- avg_wind_speed: double (nullable = true)
 |-- max_wind_direction: double (nullable = true)
 |-- max_wind_speed: double (nullable = true)
 |-- min_wind_direction: double (nullable = true)
 |-- min_wind_speed: double (nullable = true)
 |-- rain_accumulation: double (nullable = true)
 |-- rain_duration: double (nullable = true)
 |-- relative_humidity: double (nullable = true)



In [9]:
df.count()

1587257

In [52]:
df1=df.filter(df.rowID%10==0)

In [53]:
df1.count()

158726

In [62]:
pd.DataFrame(df1.take(3),columns=df1.columns)

Unnamed: 0,rowID,hpwren_timestamp,air_pressure,air_temp,avg_wind_direction,avg_wind_speed,max_wind_direction,max_wind_speed,min_wind_direction,min_wind_speed,relative_humidity
0,0,2011-09-10 00:00:49,912.3,64.76,97.0,1.2,106.0,1.6,85.0,1.0,60.5
1,10,2011-09-10 00:10:49,912.3,62.24,144.0,1.2,167.0,1.8,115.0,0.6,38.5
2,20,2011-09-10 00:20:49,912.2,63.32,100.0,2.0,122.0,2.5,91.0,1.5,58.3


In [54]:
df1=df1.drop('rain_accumulation').drop('rain_duration')

df1=df1.na.drop()

# Pipeline

In [129]:
stage=[]
feature_columns=[
 'air_pressure',
 'air_temp',
 'avg_wind_direction',
 'avg_wind_speed',
 'max_wind_direction',
 'max_wind_speed',
 'relative_humidity']

assembler=VectorAssembler(inputCols=feature_columns,outputCol='feature')
stage=stage+[assembler]

scaler=StandardScaler(inputCol='feature',outputCol='feature_scale',withStd=True,withMean=True)
stage=stage+[scaler]



#Add custom transformer 
from pyspark.ml import Transformer

class ColumnKeeper(Transformer):
    def __init__(self, column_keep):
        #super(ColumnKeeper, self).__init__()
        self.column_keep = column_keep
    def _transform(self, df):
        df = df.select(self.column_keep)
        return df
keeper=ColumnKeeper('feature_scale')
stage=stage+[keeper]

modeler=KMeans(k=3,seed=613,featuresCol='feature_scale',predictionCol='cluster')
stage=stage+[modeler]

In [130]:
pipeline=Pipeline(stages=stage)
model=pipeline.fit(df1)

df2=model.transform(df1)

In [131]:
df2.take(3)

[Row(feature_scale=DenseVector([-1.4846, 0.2454, -0.6839, -0.7656, -0.6215, -0.7444, 0.4923]), cluster=1),
 Row(feature_scale=DenseVector([-1.4846, 0.0325, -0.1906, -0.7656, 0.0383, -0.6617, -0.3471]), cluster=0),
 Row(feature_scale=DenseVector([-1.5173, 0.1237, -0.6524, -0.3768, -0.4485, -0.3723, 0.4084]), cluster=2)]

# WSS elbow method

In [123]:
df3=assembler.transform(df1)
fit=scaler.fit(df3)
df3=fit.transform(df3)

In [128]:
df3=df3.select('features')
df3.persist()  #keep data in memory 
cluster=range(3,10,2)
utils.elbow(df3,cluster)

Training for cluster size 3 
......................WSSE = 310428.7431401998 
Training for cluster size 5 
......................WSSE = 274560.65811044944 
Training for cluster size 7 
......................WSSE = 245261.85723243063 
Training for cluster size 9 
......................WSSE = 227025.58599483027 


[310428.7431401998, 274560.65811044944, 245261.85723243063, 227025.58599483027]

# Silhouette method 

In [115]:
# required Spark 2.4 above

from pyspark.ml import evaluation 
evaluator = evaluation.ClusteringEvaluator()
silhouette = evaluator.evaluate(df2)

AttributeError: module 'pyspark.ml.evaluation' has no attribute 'ClusteringEvaluator'