# Cluster Analysis - PySpark Example

## SDSC Summer Institute

## Set up environment

In [None]:
# Import modules

import pyspark
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
import utils
%matplotlib inline


In [None]:
# Start Spark session

from pyspark.sql import SparkSession

conf = pyspark.SparkConf().setAll([('spark.master', 'local[*]'),
                                   ('spark.app.name', 'PySpark Cluster Analysis')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()

print (spark.version)
print (pyspark.version)

## Read in data

In [None]:
# Load minute weather data

inputfile = <<FILL-IN>>
df = spark.read.csv (inputfile, inferSchema=True, header=True).cache()

## Explore data

In [None]:
# Examine schema

<<FILL-IN>>

In [None]:
# Count rows

<<FILL-IN>>

In [None]:
# Filter rows 

<<FILL-IN>>

In [None]:
# Show summary statistics

<<FILL-IN>>

## Prepare data

In [None]:
# Drop null data

<<FILL-IN>>

In [None]:
# Create feature vector

featuresUsed = ['air_pressure', 'air_temp', 'avg_wind_direction', 'avg_wind_speed', 'max_wind_direction', 
        'max_wind_speed','relative_humidity']
assembler = VectorAssembler(inputCols=featuresUsed, outputCol="features_unscaled")
assembled = assembler.transform(workingDF)

In [None]:
assembled.show(1)

In [None]:
# Scale data

scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", 
                        withStd=True, withMean=True)
scalerModel = scaler.fit(assembled)
scaledData = scalerModel.transform(assembled)

In [None]:
scaledData.show(1)

## Perform cluster analysis

In [None]:
# Use one-third data for elbow plot

scaledData = scaledData.select("features", "rowID")

elbowset = scaledData.filter((scaledData.rowID % 3) == 0).select("features")
elbowset.persist()
elbowset.count()

In [None]:
# Generate clusters for elbow plot

clusters = range(5,15)
wsseList = utils.elbow(elbowset, clusters)

In [None]:
# Show elbow plot

utils.elbow_plot(wsseList, clusters)

In [None]:
# Run KMeans for k = 9

scaledDataFeat = scaledData.select("features")
scaledDataFeat.persist()

kmeans = KMeans(k=9, seed=1)
model = <<FILL-IN>>(scaledDataFeat)            # Fit model to features

In [None]:
# Extract cluster centers

centers = model.clusterCenters()
centers

## Generate cluster profile plots

In [None]:
centersNamed = utils.pd_centers(featuresUsed,centers)
print(centersNamed.columns.values)
print(centersNamed.shape)

### Profiles for all clusters

In [None]:
numClusters = len(centersNamed.index)
colors_used = utils.parallel_plot(centersNamed, numClusters)

### Clusters Capturing Dry Days

In [None]:
utils.parallel_plot(centersNamed[centersNamed['relative_humidity'] < -0.5], 
                   numClusters, colors=colors_used);

### Clusters Capturing Humid Days

In [None]:
<<FILL-IN>>

### Clusters Capturing Hot Days

In [None]:
<<FILL-IN>>

### Clusters Capturing Windy Days

In [None]:
<<FILL-IN>>

## Stop Spark session

In [None]:
<<FILL-IN>>