In [1]:
import pyspark

In [2]:
sc = pyspark.SparkContext(master="spark://10.0.0.3:6060")
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

http://10.0.0.3:4040/

In [3]:
from pyspark.sql import functions as F
import pyspark
import numpy as np
import sys

def seedKernel(data, dataIdValue, centroids, k, metric):
    point = dataIdValue[1]
    d = sys.maxsize 
    for j in range(len(centroids)): 
        temp_dist = metric(point, data[centroids[j]]) 
        d = min(d, temp_dist) 
    return int(d)

def seedClusters(data, dataFrame, k, metric):
    centroids = list(np.random.choice(data.shape[0], 1, replace=False))
    for i in range(k - 1):
        print("clusterSeed", i)
        dist = []
        mK = dataFrame.rdd.map(lambda dataIdValue: seedKernel(data, dataIdValue, centroids, k, metric))
        mK_collect = mK.collect()
        dist = np.array(mK_collect) 
        next_centroid = np.argmax(dist)
        centroids.append(next_centroid) 
        dist = []
    return centroids 

def nearestCenteroidKernel(dataIdValue, centeroidIdValues, metric):
    dataId, dataValue = dataIdValue
    dataNp = np.asarray(dataValue)
    distances = []
    for centeroidId, centeroidValue in centeroidIdValues:
        centeroidNp = np.asarray(centeroidValue)
        distance = metric(dataNp, centeroidNp)
        distances.append(distance)
    distances = np.asarray(distances)
    closestCenteroid = np.argmin(distances)
    return int(closestCenteroid)

def optimiseClusterMembershipSpark(data, dataFrame, n, metric, intitalClusterIndices=None):
    dataShape = data.shape
    dataRDD = dataFrame.rdd
    lengthOfData = dataShape[0]
    if intitalClusterIndices is None:
        index = np.random.choice(lengthOfData, n, replace=False)
    else:
        index = intitalClusterIndices
    listIndex = [int(i) for i in list(index)]
    centeroidIdValues = [(i,data[index[i]]) for i in range(len(index))]
    dataRDD = dataRDD.filter(lambda dataIdValue: int(dataIdValue["id"]) not in listIndex)
    associatedClusterPoints = dataRDD.map(lambda dataIdValue: (dataIdValue[0],nearestCenteroidKernel(dataIdValue, centeroidIdValues, metric)))
    clusters = associatedClusterPoints.toDF(["id", "bestC"]).groupBy("bestC").agg(F.collect_list("id").alias("cluster"))
    return index, clusters

def costKernel(data, testCenteroid, clusterData, metric):
    cluster = np.asarray(clusterData)
    lenCluster = cluster.shape[0]
    lenFeature = data.shape[1]
    testCenteroidColumn = np.zeros(shape=(lenCluster, lenFeature), dtype=data.dtype)
    newClusterColumn = np.zeros(shape=(lenCluster, lenFeature), dtype=data.dtype)
    for i in range(0, lenCluster):
        newClusterColumn[i] = data[cluster[i]]
        testCenteroidColumn[i] = data[int(testCenteroid)] 
    pairwiseDistance =  metric(newClusterColumn, testCenteroidColumn)# (np.absolute(newClusterColumn-testCenteroidColumn).sum(axis=1))# metric(newClusterColumn, testCenteroidColumn)
    cost = np.sum(pairwiseDistance)
    return float(cost) #newClusterColumn.shape[1]

def optimiseCentroidSelectionSpark(data, dataFrame, centeroids, clustersFrames, metric):
    dataRDD = dataFrame.rdd
    dataShape = data.shape
    newCenteroidIds = []
    totalCost = 0
    for clusterIdx in range(len(centeroids)):
        print("clusterOpIdx", clusterIdx)
        oldCenteroid = centeroids[clusterIdx]
        clusterFrame = clustersFrames.filter(clustersFrames.bestC == clusterIdx).select(F.explode(clustersFrames.cluster))
        clusterData = clusterFrame.collect()
        if clusterData:
            clusterData = [clusterData[i].col for i in range(len(clusterData))]
        else:
            clusterData = []
        cluster = np.asarray(clusterData)
        costData = clusterFrame.rdd.map(lambda pointId: (pointId[0], costKernel(data, pointId[0], clusterData, metric)))
        cost = costData.map(lambda pointIdCost: pointIdCost[1]).sum()
        totalCost = totalCost + cost
        pointResult = costData.sortBy(lambda pointId_Cost: pointId_Cost[1]).take(1)
        if (pointResult):
            bestPoint = pointResult[0][0]
        else:
            bestPoint = oldCenteroid
        newCenteroidIds.append(bestPoint)
    return (newCenteroidIds, totalCost)

#vector metrics
def hammingVector(stack1, stack2):
    return (stack1 != stack2).sum(axis=1)
def euclideanVector(stack1, stack2):
    return (np.absolute(stack2-stack1)).sum(axis=1)
# point metrics
def euclideanPoint(p1, p2): 
    return np.sum((p1 - p2)**2) 
def hammingPoint(p1, p2): 
    return np.sum((p1 != p2))

def fit(sc, data, nRegions = 2, metric = "euclidean", seeding = "heuristic"):
    if metric == "euclidean":
        pointMetric = euclideanPoint
        vectorMetric = euclideanVector
    elif metric == "hamming":
        pointMetric = hammingPoint
        vectorMetric = hammingVector
    else:
        print("unsuported metric")
        return

    dataN = np.asarray(data)
    seeds = None
    dataFrame  = sc.parallelize(data).zipWithIndex().map(lambda xy: (xy[1],xy[0])).toDF(["id", "vector"]).cache()
    if (seeding == "heuristic"):
        seeds = list(seedClusters(dataN, dataFrame, nRegions, pointMetric))
    lastCenteroids, lastClusters = optimiseClusterMembershipSpark(dataN, dataFrame, nRegions, pointMetric, seeds)
    lastCost = float('inf')
    iteration = 0
    escape = False
    while not escape:
        iteration = iteration + 1
        currentCenteroids, currentCost = optimiseCentroidSelectionSpark(dataN, dataFrame, lastCenteroids, lastClusters, vectorMetric)
        currentCenteroids, currentClusters = optimiseClusterMembershipSpark(dataN, dataFrame, nRegions, pointMetric, currentCenteroids)
        print((currentCost<lastCost, currentCost, lastCost, currentCost - lastCost))
        if (currentCost<lastCost):
            print(("iteration",iteration,"cost improving...", currentCost, lastCost))
            lastCost = currentCost
            lastCenteroids = currentCenteroids
            lastClusters = currentClusters
        else:
            print(("iteration",iteration,"cost got worse or did not improve", currentCost, lastCost))
            escape = True
    bc = bestClusters.collect()
    unpackedClusters = [bc[i].cluster for i in range(len(bc))]
    return (lastCenteroids, unpackedClusters)

In [4]:
import numpy as np #maths
visualFeatureVocabulary = None
visualFeatureVocabularyList = None
with open("data/ORBvoc.txt", "r") as fin:
    extractedFeatures = list(map(lambda x: x.split(" ")[2:-2], fin.readlines()[1:]))
    dedupedFeatureStrings = set()
    for extractedFeature in extractedFeatures:
        strRep = ".".join(extractedFeature)
        dedupedFeatureStrings.add(strRep)
    finalFeatures = []
    for dedupedFeatureStr in list(dedupedFeatureStrings):
        finalFeatures.append([int(i) for i in dedupedFeatureStr.split(".")])
    visualFeatureVocabulary = np.asarray(finalFeatures, dtype=np.uint8)
    visualFeatureVocabularyList  = list(finalFeatures)
print(visualFeatureVocabulary.shape)

(1062686, 32)


In [5]:
%%time
#ret = fit(sc, visualFeatureVocabularyList, 4, "hamming")
#ret = KMedoids.fit(sc, visualFeatureVocabularyList, 4, "hamming")

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 4.05 µs


In [6]:
#ret[1].show()
#ret[0]

In [7]:
from pyclustering.cluster import cluster_visualizer
from pyclustering.utils import read_sample
from pyclustering.samples.definitions import FCPS_SAMPLES
from pyclustering.samples.definitions import SIMPLE_SAMPLES
sample = read_sample(FCPS_SAMPLES.SAMPLE_GOLF_BALL)


In [None]:
%%time
#visualizer = cluster_visualizer()
bestCentroids, bestClusters = fit(sc, visualFeatureVocabularyList, 100) #"hamming"

clusterSeed 0
clusterSeed 1
clusterSeed 2
clusterSeed 3
clusterSeed 4
clusterSeed 5
clusterSeed 6


In [None]:
#visualizer.append_clusters(bestClusters, sample)
#visualizer.show()
#print(bestClustersData)