Skip to content
Find file
Fetching contributors…
Cannot retrieve contributors at this time
156 lines (129 sloc) 5.44 KB
"""
Python implementation of the K-Means clustering example with BSP.
According presentation: http://www.slideshare.net/tjungblut/kmeans-with-bsp
"""
from BSP import BSP
from math import sqrt
class KMeansBSP(BSP):
# read the first n clusters
def setup(self, peer):
self.numCenters = int(peer.config.get("kmeans.num.centers"))
self.maxIterations = -1
if not peer.config.get("kmeans.max.iterations"):
self.maxIterations = int(peer.config.get("kmeans.max.iterations"))
self.centers = []
for i in range(self.numCenters):
val = peer.readNext()
self.centers.append(self.toVector(val[1])) # the value (index 1) is the text line
peer.reopenInput()
def bsp(self, peer):
while True:
self.assignCenters(peer)
peer.sync()
converged = self.updateCenters(peer)
peer.reopenInput()
if converged == 0:
break
# cool simplification of maxIterations > 0 && maxIterations < peer.getSuperstepCount()
if 0 < self.maxIterations < peer.getSuperstepCount():
break
self.recalculateAssignmentsAndWrite(peer)
def assignCenters(self, peer):
newCenterArray = [None] * self.numCenters
summationCount = [None] * self.numCenters
while True:
line = peer.readNext()
if not line: break
self.assignCentersInternal(newCenterArray, summationCount, self.toVector(line[1]))
# now send messages about the local updates to each other peer
for i in range(self.numCenters):
if newCenterArray[i] is not None:
for peerName in peer.getAllPeerNames():
# send message: "i_sum_vector" where i is the index in centers,
# sum is the summation count and the rest is the vector, everything is space separated
peer.send(peerName,
"%s %s %s" % (str(i), str(summationCount[i]), " ".join(map(str, newCenterArray[i]))))
def getNearestCenter(self, vector):
lowestIndex = 0
lowest = float("inf")
for i in range(self.numCenters):
dist = self.distance(vector, self.centers[i])
if lowest > dist:
lowest = dist
lowestIndex = i
return lowestIndex
def assignCentersInternal(self, newCenterArray, summationCount, vector):
lowestCenterIndex = self.getNearestCenter(vector)
center = self.centers[lowestCenterIndex]
if newCenterArray[lowestCenterIndex] is None:
newCenterArray[lowestCenterIndex] = center
summationCount[lowestCenterIndex] = 0
else:
newCenterArray[lowestCenterIndex] = self.sum(newCenterArray[lowestCenterIndex], vector)
summationCount[lowestCenterIndex] += 1
def updateCenters(self, peer):
msgCenters = [None] * self.numCenters
incrementSum = [None] * self.numCenters
self.fill(incrementSum, 0)
for msg in peer.getAllMessages():
split = msg.split()
centerIndex = int(split[0])
oldCenter = msgCenters[centerIndex]
incrementSum[centerIndex] += int(split[1])
newCenter = self.toVector(" ".join(split[2:])) # join the vector part back to string
if oldCenter is None:
msgCenters[centerIndex] = newCenter
else:
msgCenters[centerIndex] = self.sum(oldCenter, newCenter)
# update and convergence checks
for i in range(self.numCenters):
if msgCenters[i] is not None:
msgCenters[i] = [x / incrementSum[i] for x in msgCenters[i]] # average the center messages
# finally check for convergence by the absolute difference
convergedCounter = 0;
for i in range(self.numCenters):
oldCenter = self.centers[i]
if msgCenters[i] is not None:
calculateError = self.subtractAbsSum(oldCenter, msgCenters[i])
if calculateError > 0.0:
self.centers[i] = msgCenters[i]
convergedCounter += 1
return convergedCounter
def recalculateAssignmentsAndWrite(self, peer):
# write the centers first
for center in self.centers:
peer.write(center, "")
peer.write("\n", "")
while True:
line = peer.readNext()
if not line: break
lowestDistantCenter = self.getNearestCenter(self.toVector(line[1]))
peer.write(str(lowestDistantCenter), line[1])
pass
def fill(self, incrementSum, x):
for i in range(len(incrementSum)):
incrementSum[i] = x
def sum(self, vector, vector2):
vec = []
for i in range(len(vector)):
vec.append(vector[i] + vector2[i])
return vec
def subtractAbsSum(self, vector, vector2):
diff = 0
for i in range(len(vector)):
diff += abs(vector[i] - vector2[i])
return diff
# distance between two vectors
def distance(self, a, b):
sum = 0
for i in range(len(a)):
diff = b[i] - a[i]
sum += (diff * diff)
return sqrt(sum)
# splits a line by space and puts it into a vector (list)
def toVector(self, line):
vec = []
split = line.split()
for part in split:
vec.append(float(part))
return vec
Jump to Line
Something went wrong with that request. Please try again.