# Unit Tests

In [79]:
%%file tests/KMeansParallel_func.py

#!/usr/bin/python
from __future__ import division
import os
import sys
import glob
import random
import sklearn
import sklearn.cluster
import numpy as np
from CostFunction_func import CostFunction
from SamplingProbability_func import SamplingProbability


def KMeansParallel(data, k, l):
    N = data.__len__()
    if k <= 0 or not(isinstance(k,int)) or l <= 0:
        sys.exit("illegal input")    
    # Then we start to Implement the algorithm
    # 1. Sample one point uniformly at random from X
    c = np.array(data[np.random.choice(range(N),1),])
    # 2. To Cost function
    phi = CostFunction(c, data)
    # 3. Looping
    for i in range(np.ceil(np.log(phi)).astype(int)):
        cPrime = data[SamplingProbability(c,data,l) > np.random.uniform(size = N),]
        c = np.concatenate((c, cPrime))
    # End looping
    # 7. For x in C, set w_x to be the number of pts closest to X
    cMini = [np.argmin(np.sum((c-pts)**2,axis=1)) for pts in data];
    closerPts = [cMini.count(i) for i in range(len(c))]
    weight = closerPts/np.sum(closerPts)
    # 8. Recluster the weighted points in C into k clusters
    allC = data[np.random.choice(range(len(c)),size=1,p=weight),]
    data_final = c
    for i in range(k-1):
        Probability = SamplingProbability(allC,data_final,l) * weight
        # choose next centroid
        cPrimeFin = data[np.random.choice(range(len(c)), size=1, p=Probability/np.sum(Probability)),]
        allC = np.concatenate((allC,cPrimeFin))
    KMeansPP = sklearn.cluster.KMeans(n_clusters=k, n_init=1, init=allC, max_iter=500, tol=0.0001)
    KMeansPP.fit(data);
    return KMeansPP

Overwriting tests/KMeansParallel_func.py


In [80]:
%%file tests/SamplingProbability_func.py

#!/usr/bin/python
import numpy as np
from CostFunction_func import CostFunction

#sample probability function
def SamplingProbability(c, data, l):
    cost = CostFunction(c, data)
    return np.array([(min(np.sum((c-pts)**2,axis=1))) * l / cost for pts in data])

Overwriting tests/SamplingProbability_func.py


In [81]:
%%file tests/CostFunction_func.py

#!/usr/bin/python
import numpy as np

##cost function
def CostFunction(c,data):
        return np.sum([min(np.sum((c-pts)**2,axis=1)) for pts in data]) 

Overwriting tests/CostFunction_func.py


In [82]:
%%file tests/test_KMeansParallel.py

import numpy as np
import sys
from numpy.testing import assert_almost_equal
from KMeansParallel_func import KMeansParallel


def test_feasible_k_zero():
    for i in range(20):
        data = np.random.normal(size=(20,2))
        k = 0
        assert sys.exit

def test_feasible1_k_float():
    for i in range(20):
        data = np.random.normal(size=(20,2))
        k = 1.5
        assert sys.exit
        
def test_feasible1_k_float():
    for i in range(20):
        data = np.random.normal(size=(20,2))
        l = -1
        assert sys.exit

def test_level_label():
    for i in range(20):
        data = np.random.normal(size=(20,2))
        k = 3
        assert len(set(KMeansParallel(data = data, k = k, l = 2*k).labels_)) == k

def test_num_label():
    for i in range(20):
        data = np.random.normal(size=(20,2))
        k = 3
        assert len(KMeansParallel(data = data, k = k, l = 2*k).labels_) == len(data)

Overwriting tests/test_KMeansParallel.py


In [83]:
%%file tests/test_SamplingProbability.py

import numpy as np
from numpy.testing import assert_almost_equal
from SamplingProbability_func import SamplingProbability
from CostFunction_func import CostFunction

def test_positive_probability():
    l = 2
    for i in range(20):
        data = np.random.normal(size=(20,2))
        c = data[np.random.choice(range(20),1),]
        assert np.alltrue(SamplingProbability(c,data,l) >= 0)

def test_sum_to_l():
    for i in range(20):
        l = i + 1
        data = np.random.normal(size=(20,2))
        c = data[np.random.choice(range(20),1),]
        assert (np.sum(SamplingProbability(c,data,l)) - l) <= 1e-6

def test_zero_probability():
    l = 2
    for i in range(20):
        data = np.random.normal(size=(20,2))
        choice = np.random.choice(range(20),1)
        c = data[choice,]
        Probability = SamplingProbability(c,data,l)
        assert np.alltrue(Probability[choice,] == 0)

Overwriting tests/test_SamplingProbability.py


In [84]:
%%file tests/test_CostFunction.py

import numpy as np
from numpy.testing import assert_almost_equal
from CostFunction_func import CostFunction

def test_postive_cost():
    for i in range(20):
        data = np.random.normal(size=(20,2))
        c = data[np.random.choice(range(20),1),]
        assert CostFunction(c, data) >= 0

def test_c_equals_data():
    for i in range(20):
        data = np.random.normal(size=(20,2))
        c = data
        assert CostFunction(c, data) == 0

def test_larger_c_smaller_cost():
     for i in range(20):
        data = np.random.normal(size=(20,2))
        larger_c = data[np.random.choice(range(20),5,replace=False),]
        c = larger_c[:4,]
        assert CostFunction(larger_c, data) <= CostFunction(c, data)

Overwriting tests/test_CostFunction.py


In [78]:
! cd tests
! py.test

platform darwin -- Python 3.4.3 -- py-1.4.26 -- pytest-2.6.4
collected 10 items 
[0m
tests/test_CostFunction.py ...
tests/test_KMeansParallel.py ....
tests/test_SamplingProbability.py ...

