In [19]:
#  Find K Means of Loudacre device status locations
# 
# Input data: file(s) with device status data (delimited by ',')
# including latitude (4th field) and longitude (5th field) of device locations 
# (lat,lon of 0,0 indicates unknown location)

# for a point p and an array of points, return the index in the array of the point closest to p
def closestPoint(p, points):
    bestIndex = 0
    closest = float("+inf")
    # for each point in the array, calculate the distance to the test point, then return
    # the index of the array point with the smallest distance
    for i in range(len(points)):
        dist = distanceSquared(p,points[i])
        if dist < closest:
            closest = dist
            bestIndex = i
    return bestIndex
    
# The squared distances between two points
def distanceSquared(p1,p2):  
    return (p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2

# The sum of two points
def addPoints(p1,p2):
    return [p1[0] + p2[0], p1[1] + p2[1]]

sc.setLogLevel("WARN")

# The files with device status data
filename = "/loudacre/devicestatus_etl/*"
    
# K is the number of means (center points of clusters) to find
K = 5

# ConvergeDist -- the threshold "distance" between iterations at which we decide we are done
convergeDist = .1


In [20]:
# Split by delimiter, parse device status records into [latitude,longitude]
# Filter out records where lat/long is unavailable -- ie: 0/0 points
# TODO
data = sc.textFile(filename)\
    .map(lambda line: line.split(','))\
    .map(lambda fields: [float(fields[3]), float(fields[4])])\
    .filter(lambda point: sum(point) != 0)

data.persist()

PythonRDD[63] at RDD at PythonRDD.scala:43

In [21]:
# start with K randomly selected points from the dataset
kPoints = data.takeSample(False, K, 42)

print kPoints

[[38.9948504687, -121.143310222], [34.4547031924, -119.401974175], [34.5422423413, -117.220946311], [34.4663292098, -117.356371941], [37.9377630394, -120.049375775]]


In [22]:
# loop until the total distance between one iteration's points and the next is less than the convergence distance specified
tempDist = float("+inf")
while tempDist > convergeDist:
    # for each point, find the index of the closest kpoint.  map to (index, (point,1))
    closest = data.map(lambda point: (closestPoint(point, kPoints), (point, 1)))
    
    # For each key (k-point index), reduce by adding the coordinates and number of points
    pointStats = closest.reduceByKey(lambda (p1, v1),(p2,v2): (addPoints(p1,p2), v1+v2))
    
    # For each key (k-point index), find a new point by calculating the average of each closest point
    newPoints = pointStats.map(lambda (i, (p,v)): (i, (p[0]/v, p[1]/v))).collect()
    
    print "New Points:",newPoints
    
    # calculate the total of the distance between the current points and new points
    tempDist = 0
    for (i, point) in newPoints:
        tempDist += distanceSquared(kPoints[i], point)
    
    print "Distance between iterations:",tempDist
    
    # Copy the new points to the kPoints array for the next iteration
    for (i, point) in newPoints:
        kPoints[i] = point
    
# Display the final points
print "Final center points: ",str(kPoints)

New Points: [(0, (40.1999574315597, -121.67296832795374)), (1, (35.07898372067377, -119.34511903963728)), (2, (34.70668834236743, -113.61942449461408)), (3, (34.11289318985722, -117.7607070177809)), (4, (37.976502391686815, -119.53172623300165))]
Distance between iterations: 15.681646727
New Points: [(0, (40.86120053140143, -121.70250534673397)), (1, (35.47676583827796, -119.71682875447316)), (2, (35.03499273350572, -112.66709702333176)), (3, (34.15114904969298, -117.62285483461865)), (4, (37.99114145914194, -120.5420507156219))]
Distance between iterations: 2.79066158125
New Points: [(0, (43.22938065827063, -121.38277267611755)), (1, (35.736757194335446, -119.77273043738784)), (2, (35.084610542101686, -112.57140921184987)), (3, (34.174397667475574, -117.60476094852889)), (4, (38.18001684669112, -121.29558025491399))]
Distance between iterations: 6.39719324708
New Points: [(0, (43.84979744826286, -121.29020013479062)), (1, (36.00358819723915, -119.83855791374319)), (2, (35.084610542101