# Week 5: Regression, Cluster Analysis, and Association Analysis


In [47]:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from notebook import utils
%matplotlib inline

In [11]:
sc =SparkContext.getOrCreate()

In [12]:
sqlContext = SQLContext(sc)
df = sqlContext.read.load('C:/Users/leicheng/Downloads/minute_weather.csv', 
                          format='com.databricks.spark.csv', 
                          header='true',inferSchema='true')

Step 3. Subset and remove unused data. Let's count the number of rows in the DataFrame:

In [13]:
df.count()

1587257

There are over 1.5 million rows in the DataFrame. Clustering this data on your computer in the Cloudera VM can take a long time, so let's only one-tenth of the data. We can subset by calling filter() and using the rowID column:

In [14]:
filterDF = df.filter((df.rowID % 10) == 0)
filterDF.count()

158726

Let's compute the summary statistics using describe():

In [15]:
filterDF.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
rowID,158726,793625.0,458203.9375103623,0,1587250
hpwren_timestamp,158726,,,2011-09-10 00:00:49,2014-09-10 23:53:29
air_pressure,158726,916.8301614102414,3.0517165528314516,905.0,929.5
air_temp,158726,61.8515891536364,11.833569210641642,31.64,99.5
avg_wind_direction,158680,162.15610032770354,95.27820101905921,0.0,359.0
avg_wind_speed,158680,2.7752148979077447,2.0576239697426435,0.0,31.9
max_wind_direction,158680,163.46214393748426,92.45213853838722,0.0,359.0
max_wind_speed,158680,3.4005577262415194,2.418801620809888,0.1,36.0
min_wind_direction,158680,166.77401688933702,97.44110914784571,0.0,359.0


The weather measurements in this dataset were collected during a drought in San Diego. We can count the how many values of rain accumulation and duration are 0:

In [16]:
filterDF.filter(filterDF.rain_accumulation == 0.0).count()

157812

In [17]:
filterDF.filter(filterDF.rain_duration == 0.0).count()

157237

Since most the values for these columns are 0, let's drop them from the DataFrame to speed up our analyses. We can also drop the hpwren_timestamp column since we do not use it.

In [18]:
workingDF = filterDF.drop("rain_accumulation").drop("rain_duration").drop("hpwren_timestamp")

In [19]:
before = workingDF.count()
workingDF = workingDF.na.drop()
after = workingDF.count()
before -after

46

Step 4. Scale the data. Since the features are on different scales (e.g., air pressure values are in the 900’s, while relative humidities range from 0 to 100), they need to be scaled.  We will scale them so that each feature will have a value of 0 for the mean, and a value of 1 for the standard deviation.
/First, we will combine the columns into a single vector column. Let's look at the columns in the DataFrame:



In [21]:
workingDF.columns

['rowID',
 'air_pressure',
 'air_temp',
 'avg_wind_direction',
 'avg_wind_speed',
 'max_wind_direction',
 'max_wind_speed',
 'min_wind_direction',
 'min_wind_speed',
 'relative_humidity']

We do not want to include rowID since it is the row number. The minimum wind measurements have a high correlation to the average wind measurements, so we will not include them either. Let's create an array of the columns we want to combine, and use VectorAssembler to create the vector column:


In [22]:
featureUsed = ['air_pressure',
 'air_temp',
 'avg_wind_direction',
 'avg_wind_speed',
 'max_wind_direction',
 'max_wind_speed',
 'relative_humidity' ]
assembler = VectorAssembler(inputCols=featureUsed, outputCol="feature_unscaled")
assembled = assembler.transform(workingDF) 

In [24]:
scaler = StandardScaler(inputCol="feature_unscaled", outputCol="features", withStd = True, withMean = True)
scalerModel = scaler.fit(assembled)
scalerData = scalerModel.transform(assembled)

The withMean argument specifies to center the data with the mean before scaling, and withStd specifies to scale the data to the unit standard deviation.



Step 5. Create elbow plot. The k-means algorithm requires that the value of k, the number of clusters, to be specified.  To determine a good value for k, we will use the “elbow” method.  This method involves applying k-means, using different values for k, and calculating the within-cluster sum-of-squared error (WSSE).  Since this means applying k-means multiple times, this process can be very compute-intensive.  To speed up the process, we will use only a subset of the dataset.  We will take every third sample from the dataset to create this subset:



In [25]:
scalerData = scalerData.select("features", "rowID")

elbowset = scalerData.filter((scalerData.rowID % 3) == 0).select("features")
elbowset.persist()

DataFrame[features: vector]

The last line calls the persist() method to tell Spark to keep the data in memory (if possible), which will speed up the computations.

Let's compute the k-means clusters for k = 2 to 30 to create an elbow plot:

In [34]:
clusters = range(2, 31)
wsseList = utils.elbow(elbowset, clusters)

AttributeError: module 'notebook.utils' has no attribute 'elbow'

Step 6. Cluster using selected k. Let's select the data we want to cluster:

In [35]:
scalerDataFeat = scalerData.select("features")
scalerDataFeat.persist()

DataFrame[features: vector]

In [36]:
kmeans =KMeans(k=12, seed=1)
model = kmeans.fit(scalerDataFeat)
transformed = model.transform(scalerDataFeat)

The first line creates a new KMeans instance with 12 clusters and a specific seed value. (As in previous hands-on activities, we use a specific seed value for reproducible results.) The second line fits the data to the model, and the third applies the model to the data.

Once the model is created, we can determine the center measurement of each cluster:

In [37]:
centers = model.clusterCenters()
centers

[array([ 0.23862654,  0.31134856,  1.88818402, -0.65263889, -1.5507189 ,
        -0.57731762, -0.27628725]),
 array([-0.77189796, -0.10325633,  0.44012451,  1.53154542,  0.52564327,
         1.46055857,  0.22969611]),
 array([ 1.189134  , -0.25128657, -1.15481277,  2.07893983, -1.05267645,
         2.1952909 , -1.13101972]),
 array([-1.55854953, -1.40903273,  0.39115013,  2.16691259,  0.50010737,
         2.13527566,  1.50575845]),
 array([ 0.53986028, -0.96950459,  0.83538105, -0.55276297,  1.04974319,
        -0.52675456,  0.95661038]),
 array([-0.71820835, -0.00888325,  0.0991225 , -0.65685135,  0.28292777,
        -0.66241855,  0.34981034]),
 array([-0.18319451,  0.85775867, -1.28472876, -0.59754225, -1.13652643,
        -0.61274245, -0.62565969]),
 array([-0.03959381,  0.72903715,  0.39829629,  0.37118846,  0.51223037,
         0.32973963, -0.30378681]),
 array([ 0.13219485, -0.83781548, -1.22239113, -0.5531094 , -1.07278337,
        -0.56782597,  0.89822913]),
 array([ 0.18177807

It is difficult to compare the cluster centers by just looking at these numbers. So we will use plots in the next step to visualize them. 



## Create parallel plots of clusters and analysis.

Step 7. Create parallel plots of clusters and analysis. A parallel coordinates plot is a great way to visualize multi-dimensional data. Each line plots the centroid of a cluster, and all of the features are plotted together. Recall that the feature values were scaled to have mean = 0 and standard deviation = 1. So the values on the y-axis of these parallel coordinates plots show the number of standard deviations from the mean.  For example, +1 means one standard deviation higher than the mean of all samples, and -1 means one standard deviation lower than the mean of all samples.

We'll create the plots with matplotlib using a Pandas DataFrame each row contains the cluster center coordinates and cluster label. (Matplotlib can plot Pandas DataFrames, but not Spark DataFrames.) Let's use the pd_centers() function in the utils.py library to create the Pandas DataFrame:

In [38]:
P = utils.pd_centers(featureUsed, centers)

AttributeError: module 'notebook.utils' has no attribute 'pd_centers'