# Co-clustering with Dask

## Dask

In [1]:
from dask.distributed import Client, LocalCluster

In [2]:
cluster = LocalCluster(threads_per_worker=1,
                       n_workers=2,
                       processes=True)
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://127.0.0.1:63642  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 17.18 GB


## Co-Clustering

In [3]:
import time
import numpy as np
import dask.array as da
from dask.distributed import as_completed, get_client

In [4]:
def idivdist2(Z, X, Y, epsilon):
    """ Cost function """
    Y = Y + epsilon
    d = da.dot(X, Y) - da.dot(Z, da.log(Y))
    return d

In [5]:
def initialize_clusters(n_el, n_clusters, chuncks=None):
    """ Initialize cluster occupation matrix """
    cluster_idx = np.mod(np.arange(n_el), n_clusters)
    cluster_idx = np.random.permutation(cluster_idx)
    eye = da.eye(n_clusters, dtype=np.int)  # specify chunck here?
    return eye[cluster_idx]

In [6]:
def coclustering(Z, k, l, errobj, niters, epsilon):
    """ 
    Run the co-clustering 
    
    :param Z: m x n data matrix
    :param k: num row clusters
    :param l: num col clusters
    :param errobj: precision of obj fun for convergence
    :param niters: max iterations
    :param epsilon: precision of matrix elements
    :return: final row clustering, final column clustering
    """
    client = get_client()

    [m, n] = Z.shape

    R = initialize_clusters(m, k)
    C = initialize_clusters(n, l)
    # the following lines return the original initialization for the toy model
#     R = da.eye(k, dtype=np.int)
#     R = R[[1, 1, 1, 0, 0, 0]]
#     C = da.eye(l, dtype=np.int)
#     C = C[[0, 1, 0, 2, 0, 0, 2, 2]]

    e = 2 * errobj
    old_e = 0
    s = 1

    while (abs(e - old_e) > errobj) & (s <= niters):

        CoCavg = (da.dot(da.dot(R.T, Z), C) + Z.mean() * epsilon) / (da.dot(da.dot(R.T, da.ones((m, n))), C) + epsilon)

        d_row = idivdist2(Z, da.ones((m, n)), da.dot(C, CoCavg.T), epsilon)
        idx_row = da.argmin(d_row, axis=1)
        R = da.eye(k, dtype=np.int)[idx_row]

        d_col = idivdist2(Z.T, da.ones((n, m)), da.dot(R, CoCavg), epsilon)
        idx_col = da.argmin(d_col, axis=1)
        C = da.eye(l, dtype=np.int)[idx_col]

        minvals_da = da.min(d_col, axis=1)

        old_e = e
        e = da.sum(da.power(minvals_da, 1))  # power 1 divergence, power 2 euclidean
        R, C, e = client.persist([R, C, e])

        e = e.compute()
        # the following lines are a workaround to e.compute(), required if:
        # - the function is submitted to the worker
        # - multi-threaded workers are employed
        # this is because of a bug in Dask: https://github.com/dask/distributed/issues/3827
#         e = client.compute(e)
#         secede()
#         e = e.result()
#         rejoin()

        s = s + 1
    return R, C, e

In [7]:
def multiple_runs_memory(Z, k, l, errobj, niters, epsilon, nruns):
    """ Memory efficient loop over runs """
    R_min, C_min, e_min = None, None, 0.
    for r in range(nruns):
        R, C, e = coclustering(Z, k, l, errobj, niters, epsilon)
        if e < e_min:
            R_min, C_min, e_min = R, C, e
    return R_min, C_min, e_min

In [8]:
def multiple_runs_performance(Z, k, l, errobj, niters, epsilon, nruns):
    """ Faster implementation of loop over runs """
    # we need to specify pure=False
    futures = [client.submit(coclustering, Z, k, l, errobj, niters, epsilon, pure=False)  
               for r in range(nruns)]
    R_min, C_min, e_min = None, None, 0.
    for future, result in as_completed(futures,
                                       with_results=True,
                                       raise_errors=False):
        R, C, e = result
        if e < e_min:
            R_min, C_min, e_min = R, C, e
    return R_min, C_min, e_min

## Toy examples

In [9]:
k = 2  # num clusters in rows
l = 3  # num clusters in columns
errobj, niters, nruns, epsilon = 0.00001, 100, 15, 10e-8
Z = np.array([[24.,  44.,   5.,   6.,  26., 100.,  20.,  95.],
       [38.,  14.,  10.,  36.,  56.,  75.,   0.,  26.],
       [0.,  54.,  59.,  45.,  87.,  65.,  28.,  36.],
       [34.,  45.,  95.,  54.,  78.,  56.,  82.,  63.],
       [4.,  63.,  82.,  56.,  36.,  96.,  45.,  34.],
       [24.,  25.,   9.,   2.,  28.,  57.,  76.,  23.]])
Z = da.from_array(Z, chunks=(2, 4))

In [10]:
# original implementation
start_time = time.time()
R, C, e = multiple_runs_performance(Z, k, l, errobj, niters, epsilon, nruns)
print("total elapsed", time.time() - start_time)
print(e)
print(R.compute())
print(C.compute())

total elapsed 6.7966179847717285
-6110.440775184063
[[0 1]
 [0 1]
 [1 0]
 [1 0]
 [1 0]
 [0 1]]
[[1 0 0]
 [0 0 1]
 [0 0 1]
 [0 0 1]
 [0 0 1]
 [0 1 0]
 [0 0 1]
 [0 1 0]]


In [11]:
# memory-efficient implementation
start_time = time.time()
R, C, e = multiple_runs_memory(Z, k, l, errobj, niters, epsilon, nruns)
print("total elapsed", time.time() - start_time)
print(e)
print(R.compute())
print(C.compute())

total elapsed 17.543510913848877
-6110.440775184063
[[0 1]
 [0 1]
 [1 0]
 [1 0]
 [1 0]
 [0 1]]
[[0 1 0]
 [0 0 1]
 [0 0 1]
 [0 0 1]
 [0 0 1]
 [1 0 0]
 [0 0 1]
 [1 0 0]]
