In [10]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import os

In [11]:
kubernetes = True
hdfs = False

In [12]:
# Create Spark config for our Kubernetes based cluster manager
sparkConf = SparkConf()
sparkConf.setAppName("spark")
sparkConf.setMaster("local[*]")
sparkConf.set("spark.executor.instances", "7")
sparkConf.set("spark.executor.cores", "2")
sparkConf.set("spark.driver.memory", "512m")
sparkConf.set("spark.executor.memory", "512m")
spark_home = os.getenv('SPARK_HOME')
if kubernetes:
    spark_home = ''
    sparkConf.setMaster("k8s://https://kubernetes.default.svc.cluster.local:443")
    sparkConf.set("spark.kubernetes.pyspark.pythonVersion", "3")
    sparkConf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark")
    sparkConf.set("spark.kubernetes.authenticate.serviceAccountName", "spark")
    sparkConf.set("spark.kubernetes.container.image", "jgckruger/spark-py:v3.0.1")
    sparkConf.set("spark.kubernetes.namespace", "spark")
    sparkConf.set("spark.driver.port", "29413")
    sparkConf.set("spark.driver.host", "my-notebook-deployment.spark.svc.cluster.local")
    sparkConf.set('spark.kubernetes.driver.volumes.persistentVolumeClaim.rwxpvc.options.claimName','pv-claim')
    sparkConf.set('spark.kubernetes.driver.volumes.persistentVolumeClaim.rwxpvc.mount.path','/data')
    sparkConf.set('spark.kubernetes.executor.volumes.persistentVolumeClaim.rwxpvc.options.claimName','pv-claim')
    sparkConf.set('spark.kubernetes.executor.volumes.persistentVolumeClaim.rwxpvc.mount.path','/data')
if hdfs:
    sparkConf.set("fs.defaultFS", "hdfs://hadoop-hadoop-hdfs-nn.spark.svc.cluster.local:9000/")

In [13]:
# Initialize our Spark cluster,
# generate the worker nodes.
spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
sc = spark.sparkContext

In [14]:
dataset = spark.read.format("libsvm").load(spark_home+'/data/mllib/sample_kmeans_data.txt')

In [15]:
%%time
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)
predictions = model.transform(dataset)

CPU times: user 17.7 ms, sys: 3.75 ms, total: 21.5 ms
Wall time: 10.8 s


In [16]:
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.9997530305375207


In [17]:
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[9.1 9.1 9.1]
[0.1 0.1 0.1]


In [18]:
spark.stop()