In [14]:
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
The K-means algorithm written from scratch against PySpark. In practice,
one may prefer to use the KMeans algorithm in ML, as shown in
examples/src/main/python/ml/kmeans_example.py.

This example requires NumPy (http://www.numpy.org/).
"""
from __future__ import print_function

import sys

import numpy as np
from datetime import datetime
from pyspark.sql import SparkSession

In [3]:
def parseVector(line):
    return np.array([float(x) for x in line.split(',')])


def closestPoint(p, centers):
    bestIndex = 0
    closest = float("+inf")
    for i in range(len(centers)):
        tempDist = np.sum((p - centers[i]) ** 2)
        if tempDist < closest:
            closest = tempDist
            bestIndex = i
    return bestIndex

In [24]:
spark = SparkSession\
    .builder\
    .appName("PythonKMeans")\
    .getOrCreate()

lines = spark.read.text('smallpointdata2018.txt').rdd.map(lambda r: r[0])
data = lines.map(parseVector).cache()
K = 4
convergeDist = 0.01
kPoints = data.takeSample(False, K, int(datetime.timestamp(datetime.now())))
print(kPoints)
tempDist = 1.0

while tempDist > convergeDist:
    closest = data.map(lambda p: (closestPoint(p, kPoints), (p, 1)))
    pointStats = closest.reduceByKey(lambda p1_c1, p2_c2:
                                     (p1_c1[0] + p2_c2[0], p1_c1[1] + p2_c2[1]))
    print(pointStats.collect()[0])
    newPoints = pointStats.map(lambda st: (st[0], st[1][0] / st[1][1])).collect()

    tempDist = sum(np.sum((kPoints[iK] - p) ** 2) for (iK, p) in newPoints)

    for (iK, p) in newPoints:
        kPoints[iK] = p

print("Final centers: " + str(kPoints))

spark.stop()

[array([101.6641916 , 101.61331392, 104.24230268, 104.81470621,
       101.85639535]), array([63.28653276, 62.85648073, 61.4991053 , 60.38797759, 62.36608266]), array([62.34458697, 64.05572803, 61.89781377, 62.00375749, 64.69842699]), array([2.67201326, 4.74148347, 5.17001341, 3.57540109, 2.24477869])]
(3, (array([4317.11870508, 4317.31509828, 4358.99042373, 4363.63928187,
       4328.48210604]), 334))
(3, (array([4317.11870508, 4317.31509828, 4358.99042373, 4363.63928187,
       4328.48210604]), 334))
(3, (array([4317.11870508, 4317.31509828, 4358.99042373, 4363.63928187,
       4328.48210604]), 334))
(3, (array([4317.11870508, 4317.31509828, 4358.99042373, 4363.63928187,
       4328.48210604]), 334))
Final centers: [array([102.57345288, 102.68358449, 102.61150623, 102.19693767,
       102.54354406]), array([52.61451094, 52.45972538, 52.50091443, 52.34306416, 52.47587122]), array([82.36600114, 82.48996889, 82.56931268, 82.69644794, 82.46102416]), array([12.92550511, 12.92609311, 13.05