# Mapper Function for KMeans

Optimized computation of distance between points and centroids

In [2]:
import sklearn
import numpy as np
import pandas as pd
import time

## Read data from file

In [3]:
#data_file = "/data/kmeans/dataset_200_2d.in"
data_file = "/data/kmeans/dataset_10M_2d.in"

In [4]:
!head -n 2 $data_file

-4594.04586276
12194.3621136


In [5]:
start = time.time()
data = np.loadtxt(data_file)
print "Loading Time: %.2f sec"%(time.time()-start)

Loading Time: 42.77 sec


In [6]:
num_dimensions = 2
num_clusters = 5000
num_points = len(data)/num_dimensions 

## Reshape read in data to n dimensions

In [7]:
data=data.reshape(num_points, num_dimensions)

In [8]:
print "Data Shape: " + str(data.shape)
print "First point: " + str(data[0])

Data Shape: (10000000, 2)
First point: [ -4594.04586276  12194.3621136 ]


## Extract n random points as initial centroids from data

In [9]:
clusters = data[np.random.choice(data.shape[0], num_clusters, replace=False),:]

In [10]:
print "First Centroid: " + str(clusters[0])

First Centroid: [-17427.9065159    6115.02101404]


In [11]:
print "First Distance: " + str(np.sqrt(sum((data[0] - clusters[0]) ** 2)))

First Distance: 14200.9284017


## Compute Distance between all points and centroids

Sklearn Documentation: http://scikit-learn.org/stable/modules/generated/sklearn.metrics.pairwise.euclidean_distances.html

In [None]:
start = time.time()
distance = sklearn.metrics.pairwise.euclidean_distances(data, clusters)
print "Distance Computation Time (sklearn): %.2f sec"%(time.time()-start)

In [None]:
distance.shape

# Dask Testing

Dask Paper: http://nipy.bic.berkeley.edu:5000/download/24

Documentation: http://dask.pydata.org/en/latest/array-creation.html

In [12]:
import dask.array as da

In [13]:
dask_data = da.from_array(data, chunks=10000)

In [14]:
dask_clusters = da.from_array(clusters, chunks=100)

In [15]:
import multiprocessing
from multiprocessing.pool import ThreadPool
#pool = ThreadPool()

pool = multiprocessing.Pool(12)
da.set_options(pool=pool) 

#start = time.time()
#distance = sklearn.metrics.pairwise.euclidean_distances(dask_data, dask_clusters)
#print "Distance Computation Time (sklearn/dask): %.2f sec"%(time.time()-start)

<dask.context.set_options at 0x7f9a6844dad0>

In [None]:
result = da.sqrt((dask_data[:, :, None] - dask_clusters.T[None, :, :])**2).sum(axis=1)

In [None]:
start = time.time()
dist_np=np.array(result) 
print "Distance Computation Time (sklearn/dask): %.2f sec"%(time.time()-start)

In [None]:
dist_np[1]

## BigJob Implementation

In [200]:
from pilot import PilotComputeService, ComputeDataService, State
COORDINATION_URL = "redis://EiFEvdHRy3mNBZDjsypraXGNQqJcAYKaTnHCZxgqLsykDoKXb@localhost:6379"

pilot_compute_service = PilotComputeService(coordination_url=COORDINATION_URL)

pilot_compute_description = {
                         "service_url": 'fork://localhost',
                         "number_of_processes": 1,                             
                        }

pilotjob = pilot_compute_service.create_pilot(pilot_compute_description=pilot_compute_description)

## Create Pilot Data

In [201]:
from pilot import PilotDataService
pilot_data_service = PilotDataService(coordination_url=COORDINATION_URL)
pilot_data_description={"service_url": "ssh://localhost/tmp/pilot-data/"}
pilot_data = pilot_data_service.create_pilot(pilot_data_description=pilot_data_description)

In [219]:
pilot_data

ssh://localhost/tmp/pilot-data/

## Load Data

In [294]:
iris_data = "/data/kmeans/iris/iris.csv"
data = np.loadtxt(iris_data, delimiter=",", skiprows=1, usecols=(0,1,2,3))

In [295]:
clusters = data[np.random.choice(data.shape[0], 3, replace=False),:]

In [296]:
clusters

array([[ 5.6,  2.7,  4.2,  1.3],
       [ 5. ,  3.5,  1.6,  0.6],
       [ 5.3,  3.7,  1.5,  0.2]])

In [205]:
def mapper(data, centroids):
    # compute distances between all points and centroids
    distance = sklearn.metrics.pairwise.euclidean_distances(data, clusters)
    # compute cluster with min distance
    cluster_id = np.argmin(distance, axis=1)
    # reshape to row vector
    cluster_id = cluster_id[:, np.newaxis]
    # join data and cluster ids
    data=np.column_stack((data, cluster_id))
    return data
    
points_cluster = mapper(data, clusters)


In [206]:
import pandas as pd
def reduce(distances):
    df = pd.DataFrame(distances)
    df[4] =  df[4].astype(int)
    df = df.groupby(4)[0,1,2,3].mean()
    centroids_np = new_centroids.as_matrix()
    return centroids_np
    
reduce(distances)
    

array([[ 5.99512195,  2.81707317,  4.51219512,  1.46097561],
       [ 6.76956522,  3.03695652,  5.6       ,  2.00869565],
       [ 5.06825397,  3.22063492,  1.92380952,  0.43650794]])

In [20]:
np.argmin(distances, axis=1)

array([1, 0, 0, 0, 1, 1, 0, 1, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0,
       1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1, 1, 0, 0, 1, 1, 0,
       1, 0, 1, 1, 2, 2, 2, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 1, 2, 2, 2, 2, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2,
       2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2])

In [297]:
import datetime
class PythonCUExecutor(object):
    
    from sklearn.metrics.pairwise import euclidean_distances
    
    def __init__(self, function=None, arg1=None, arg2=None):
        self.function = function
        self.arg1 = arg1
        self.arg2 = arg2

    def execute(self):
        return self.function(self.arg1, self.arg2)
        


### Serialize Input/output data

In [298]:
pcu = PythonCUExecutor(mapper, data, clusters)
pcu_cp = cloudpickle.dumps(pcu)
d =datetime.datetime.now()
pcu_cp_file = "PCU-" + d.strftime("%Y%m%d-%H%M%S") + ".cloudpickle"
with open(pcu_cp_file, "w") as f:
    f.write(pcu_cp)       

In [299]:
!ls

KMeans Distance Computation.ipynb  PCU-20150908-025028.cloudpickle
PCU-20150907-000118.cloudpickle    PCU-20150908-025449.cloudpickle
PCU-20150907-000335.cloudpickle    PCU-20150908-025847.cloudpickle
PCU-20150908-015323.cloudpickle    PCU-20150908-025941.cloudpickle
PCU-20150908-015349.cloudpickle    PythonCUExecutor.py
PCU-20150908-015412.cloudpickle


In [300]:
pcu_cp_file

'PCU-20150908-025941.cloudpickle'

In [301]:
%run PythonCUExecutor.py $pcu_cp_file

[[5.1, 3.5, 1.4, 0.2, 2.0], [4.9, 3.0, 1.4, 0.2, 1.0], [4.7, 3.2, 1.3, 0.2, 1.0], [4.6, 3.1, 1.5, 0.2, 1.0], [5.0, 3.6, 1.4, 0.2, 2.0], [5.4, 3.9, 1.7, 0.4, 2.0], [4.6, 3.4, 1.4, 0.3, 1.0], [5.0, 3.4, 1.5, 0.2, 2.0], [4.4, 2.9, 1.4, 0.2, 1.0], [4.9, 3.1, 1.5, 0.1, 1.0], [5.4, 3.7, 1.5, 0.2, 2.0], [4.8, 3.4, 1.6, 0.2, 1.0], [4.8, 3.0, 1.4, 0.1, 1.0], [4.3, 3.0, 1.1, 0.1, 1.0], [5.8, 4.0, 1.2, 0.2, 2.0], [5.7, 4.4, 1.5, 0.4, 2.0], [5.4, 3.9, 1.3, 0.4, 2.0], [5.1, 3.5, 1.4, 0.3, 2.0], [5.7, 3.8, 1.7, 0.3, 2.0], [5.1, 3.8, 1.5, 0.3, 2.0], [5.4, 3.4, 1.7, 0.2, 2.0], [5.1, 3.7, 1.5, 0.4, 2.0], [4.6, 3.6, 1.0, 0.2, 1.0], [5.1, 3.3, 1.7, 0.5, 1.0], [4.8, 3.4, 1.9, 0.2, 1.0], [5.0, 3.0, 1.6, 0.2, 1.0], [5.0, 3.4, 1.6, 0.4, 1.0], [5.2, 3.5, 1.5, 0.2, 2.0], [5.2, 3.4, 1.4, 0.2, 2.0], [4.7, 3.2, 1.6, 0.2, 1.0], [4.8, 3.1, 1.6, 0.2, 1.0], [5.4, 3.4, 1.5, 0.4, 2.0], [5.2, 4.1, 1.5, 0.1, 2.0], [5.5, 4.2, 1.4, 0.2, 2.0], [4.9, 3.1, 1.5, 0.1, 1.0], [5.0, 3.2, 1.2, 0.2, 1.0], [5.5, 3.5, 1.3, 0.2, 2.0], 

### Run Distance Computation as CU inside Pilot-Job

In [275]:
data_unit_description = {"file_urls": [os.path.join(os.getcwd(), "PythonCUExecutor.py"),
                                       os.path.join(os.getcwd(), pcu_cp_file)]
                         }    
data_unit = pilot_data.submit_data_unit(data_unit_description)

In [302]:
import uuid
output_filename="stdout-" + str(uuid.uuid1()) +".txt"

compute_unit_description = {
        "executable": os.path.join(os.getcwd(), "PythonCUExecutor.py"),
        "arguments": [os.path.join(os.getcwd(), pcu_cp_file)],
        "number_of_processes": 1,   
        "input_data" : [data_unit.get_url()],
        "output_data": [
                            {
                             data_unit.get_url(): 
                             [output_filename]
                            }
                           ],  
        "output": "stdout.txt",
        "error": "stderr.txt",   
}   
compute_unit = pilotjob.submit_compute_unit(compute_unit_description)
compute_unit.wait()

In [308]:
import urlparse
import ast
    
def get_output(data_unit, output_file):
    full_url = pilot_data.url_for_du(data_unit) + "/stdout.txt"
    print "open " + full_url
    file_path = urlparse.urlparse(path).path
    with open(file_path) as f:
        output_data = f.read()
    result = ast.literal_eval(output_data)
    return result
    
result = get_output(data_unit, output_filename)

open ssh://localhost/tmp/pilot-data//du-6461c982-55d4-11e5-970a-44a842265a41/stdout.txt
[[ 5.1  3.5  1.4  0.2  1. ]
 [ 4.9  3.   1.4  0.2  1. ]
 [ 4.7  3.2  1.3  0.2  1. ]
 [ 4.6  3.1  1.5  0.2  1. ]
 [ 5.   3.6  1.4  0.2  1. ]
 [ 5.4  3.9  1.7  0.4  0. ]
 [ 4.6  3.4  1.4  0.3  1. ]
 [ 5.   3.4  1.5  0.2  1. ]
 [ 4.4  2.9  1.4  0.2  1. ]
 [ 4.9  3.1  1.5  0.1  1. ]
 [ 5.4  3.7  1.5  0.2  0. ]
 [ 4.8  3.4  1.6  0.2  1. ]
 [ 4.8  3.   1.4  0.1  1. ]
 [ 4.3  3.   1.1  0.1  1. ]
 [ 5.8  4.   1.2  0.2  0. ]
 [ 5.7  4.4  1.5  0.4  0. ]
 [ 5.4  3.9  1.3  0.4  0. ]
 [ 5.1  3.5  1.4  0.3  1. ]
 [ 5.7  3.8  1.7  0.3  0. ]
 [ 5.1  3.8  1.5  0.3  1. ]
 [ 5.4  3.4  1.7  0.2  1. ]
 [ 5.1  3.7  1.5  0.4  1. ]
 [ 4.6  3.6  1.   0.2  1. ]
 [ 5.1  3.3  1.7  0.5  1. ]
 [ 4.8  3.4  1.9  0.2  1. ]
 [ 5.   3.   1.6  0.2  1. ]
 [ 5.   3.4  1.6  0.4  1. ]
 [ 5.2  3.5  1.5  0.2  1. ]
 [ 5.2  3.4  1.4  0.2  1. ]
 [ 4.7  3.2  1.6  0.2  1. ]
 [ 4.8  3.1  1.6  0.2  1. ]
 [ 5.4  3.4  1.5  0.4  1. ]
 [ 5.2  4.1  1.5

SyntaxError: invalid syntax (<unknown>, line 1)

### Compute new Centroid Centers

In [310]:
reduce(result)

array([[ 5.99512195,  2.81707317,  4.51219512,  1.46097561],
       [ 6.76956522,  3.03695652,  5.6       ,  2.00869565],
       [ 5.06825397,  3.22063492,  1.92380952,  0.43650794]])

In [265]:
data_unit.id

'du-94701d02-55cc-11e5-970a-44a842265a41'

In [223]:
for i in data_unit.list_files():
    print i

PythonCUExecutor.py
PCU-20150908-015412.cloudpickle
stdout.txt


In [190]:
pilotjob.get_details()

{'bigjob_id': 'bigjob:bj-569535d4-54f1-11e5-970a-44a842265a41:localhost',
 'description': "{'external_queue': 'PilotComputeServiceQueue-pcs-5695015e-54f1-11e5-970a-44a842265a41', 'service_url': 'fork://localhost', 'coordination_host': 'redis://EiFEvdHRy3mNBZDjsypraXGNQqJcAYKaTnHCZxgqLsykDoKXb@localhost:6379', 'number_of_processes': 1, 'pilot_url': 'bigjob:bj-569535d4-54f1-11e5-970a-44a842265a41:localhost'}",
 'end_queue_time': '1441583124.77',
 'last_contact': '1441585987.78',
 'nodes': "['localhost\\n']",
 'start_time': '1441583122.11',
 'state': 'Running',
 'stopped': 'False'}

In [217]:
compute_unit.get_details()

{'Arguments': "['/home/jupyter/supercomputing-2015/supercomputing2015-tutorial/03_kmeans/PCU-20150907-000335.cloudpickle']",
 'Error': 'stderr.txt',
 'Executable': '/home/jupyter/supercomputing-2015/supercomputing2015-tutorial/03_kmeans/PythonCUExecutor.py',
 'InputData': "['redis://localhost/bigdata:du-94701d02-55cc-11e5-970a-44a842265a41']",
 'NumberOfProcesses': '1',
 'Output': 'stdout.txt',
 'OutputData': "[{'redis://localhost/bigdata:du-94701d02-55cc-11e5-970a-44a842265a41': ['stdout.txt']}]",
 'SPMDVariation': 'single',
 'agent_start_time': '1441677192.27',
 'end_queue_time': '1441677288.44',
 'end_time': '1441677294.5',
 'job-id': 'sj-94d8fe08-55cc-11e5-970a-44a842265a41',
 'run_host': 'radical-5',
 'start_staging_time': '1441677287.07',
 'start_time': '1441677286.36',
 'state': 'Done'}