In [1]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


In [2]:
!pip install pyspark==3.0.0

Collecting pyspark==3.0.0
  Downloading pyspark-3.0.0.tar.gz (204.7 MB)
[K     |████████████████████████████████| 204.7 MB 4.7 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 51.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044182 sha256=9cf3b56bdb6908fcddbad53bab492b021fd0add6ab85b3ee479f79d49be5e864
  Stored in directory: /root/.cache/pip/wheels/4e/c5/36/aef1bb711963a619063119cc032176106827a129c0be20e301
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.0


In [1]:
import sys
import numpy as np
from math import sqrt

from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans


S3_DATA_SOURCE_PATH = "/content/adultdata.txt"
s3_DATA_OUTPUT_PATH = "/content/data-output"
if __name__ == "__main__":
    sc = SparkContext(appName="KMeansExample")

    data = sc.textFile(S3_DATA_SOURCE_PATH)
    parsedData = data.map(lambda line: np.array([x for x in line.split(', ')])[np.array([0,2,12])].astype(float))
    
    clusters = KMeans.train(parsedData, 2, maxIterations=20, initializationMode="random")
    cluster_center=clusters.centers
    print("Centers:",clusters.centers,file=sys.stdout)
    
    results = sc.parallelize(cluster_center)
    def error(point):
        center = clusters.centers[clusters.predict(point)]
        return sqrt(sum([x**2 for x in (point - center)]))

    WSSSE = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y)
    print("Within Set Sum of Squared Error = " + str(WSSSE),file=sys.stdout)

    sc.stop()


Centers: [array([3.69546542e+01, 3.27532491e+05, 4.02616655e+01]), array([3.91434946e+01, 1.42207860e+05, 4.04981614e+01])]
Within Set Sum of Squared Error = 1698914475.0772185
