# PySpark KMeans

This notebook demonstrates KMeans, a clustering algorithm, in PySpark.

Several [Spark examples](/tree/examples/spark) are included with TAP.

More examples are available on the Spark website: http://spark.apache.org/examples.html

PySpark API documentation: http://spark.apache.org/docs/latest/api/python/

In [1]:
# Import the needed libraries
import pyspark
from pyspark.mllib.linalg import Vectors
from pyspark.ml.clustering import KMeans

# Create a SparkContext in local mode
sc = pyspark.SparkContext("local")

# Create a SqlContext from the SparkContext
sqlContext = pyspark.SQLContext(sc)

In [2]:
# Setup data for some East Coast and West Coast cities
data = [ 
    # City, Latitude, Longitude
    ( 'San Francisco,CA', Vectors.dense(37.62, 122.38) ),
    ( 'San Jose,CA', Vectors.dense(37.37, 121.92) ),
    ( 'Portland,OR', Vectors.dense(45.60, 122.60) ),
    ( 'Seattle,WA', Vectors.dense(47.45, 122.30) ),
    ( 'New York,NY', Vectors.dense(40.77, 73.98) ),
    ( 'Atlantic City,NJ', Vectors.dense(39.45, 74.57) ),
    ( 'Philadelphia,PA', Vectors.dense(39.88, 75.25) ),
    ( 'Boston,MA', Vectors.dense(42.37, 71.03) ),
    ( 'Santa Rosa,CA', Vectors.dense(38.52, 122.82) )
]

# Create a DataFrame
df = sqlContext.createDataFrame(data, ['city', 'features'])

# Convert to a Pandas DataFrame for easy display
df.toPandas()

Unnamed: 0,city,features
0,"San Francisco,CA","[37.62, 122.38]"
1,"San Jose,CA","[37.37, 121.92]"
2,"Portland,OR","[45.6, 122.6]"
3,"Seattle,WA","[47.45, 122.3]"
4,"New York,NY","[40.77, 73.98]"
5,"Atlantic City,NJ","[39.45, 74.57]"
6,"Philadelphia,PA","[39.88, 75.25]"
7,"Boston,MA","[42.37, 71.03]"
8,"Santa Rosa,CA","[38.52, 122.82]"


## With k=2, can KMeans separate the East Coast from West Coast?

In [3]:
# Setup KMeans, where k is the number of cluster centers we want
kmeans = KMeans(k=2, seed=1)

# Train the model
model = kmeans.fit(df)

# Print the cluster centers
print "cluster centers: " + str(model.clusterCenters())

# Use the model to cluster the original frame
results = model.transform(df).select("city", "features", "prediction")

# Convert results to Pandas DataFrame for easy display
results.toPandas()

cluster centers: [array([ 40.6175,  73.7075]), array([  41.312,  122.404])]


Unnamed: 0,city,features,prediction
0,"San Francisco,CA","[37.62, 122.38]",1
1,"San Jose,CA","[37.37, 121.92]",1
2,"Portland,OR","[45.6, 122.6]",1
3,"Seattle,WA","[47.45, 122.3]",1
4,"New York,NY","[40.77, 73.98]",0
5,"Atlantic City,NJ","[39.45, 74.57]",0
6,"Philadelphia,PA","[39.88, 75.25]",0
7,"Boston,MA","[42.37, 71.03]",0
8,"Santa Rosa,CA","[38.52, 122.82]",1


## With k=3, we'd expect to see the Bay Area, PNW, and North East clustered

In [4]:
# Setup KMeans, where k is the number of cluster centers we want
kmeans = KMeans(k=3, seed=1)

# Train the Model
model = kmeans.fit(df)

# Use the model to cluster the original frame
results = model.transform(df).select("city", "features", "prediction")

# Convert results to Pandas DataFrame for easy display
results.toPandas()

Unnamed: 0,city,features,prediction
0,"San Francisco,CA","[37.62, 122.38]",1
1,"San Jose,CA","[37.37, 121.92]",1
2,"Portland,OR","[45.6, 122.6]",2
3,"Seattle,WA","[47.45, 122.3]",2
4,"New York,NY","[40.77, 73.98]",0
5,"Atlantic City,NJ","[39.45, 74.57]",0
6,"Philadelphia,PA","[39.88, 75.25]",0
7,"Boston,MA","[42.37, 71.03]",0
8,"Santa Rosa,CA","[38.52, 122.82]",1


## Stop the Spark Context

In [5]:
# Stop the context when you are done with it. When you stop the SparkContext resources 
# are released and no further operations can be performed within that context
sc.stop()