In [3]:
import sys
sys.path.append('/nrs/flyem/bergs/tmp/fish')

In [4]:
import numpy as np
import os, sys, time
from collections import Iterable

import dask.array as da
import dask_jobqueue
from dask_jobqueue import LSFCluster
from fish.util.distributed import get_jobqueue_cluster
from dask.distributed import Client
#import B.general_admin as ga

In [5]:
def load_data():
    ''' hardcoded function to load data (≈6GB) ''' 
    folder = '/nrs/ahrens/Virginia_nrs/25_LS_wG/190703_HuCH2B_gCaMP7F_8dpf_forG/exp0/sca/'
    fname = folder + 'data_ntk_demeaned.npy'
    return np.load(fname)

# np.save('/nrs/flyem/bergs/tmp/data_ntk_demeaned-c-order.npy', np.asarray(load_data(), order='C'))
def load_data_c_order():
    data = np.load('/nrs/flyem/bergs/tmp/data_ntk_demeaned-c-order.npy')
    return data

def construct_adiff(j, data_ntk=None):
    """
    Process a single neuron, as specified by 'j'.
    """
    if data_ntk is None:
        data_ntk = load_data_c_order()

    n, t, k = data_ntk.shape # n = 31305, t = 50, k ≈ 800 
    jtk = data_ntk[j] # load data from neuron j
    mat_nt2 = np.zeros([n, t**2], np.float32) # prepare result matrix
    for i in range(n): # loop over all neurons...
        if np.mod(i, 5000) == 0:
            print('n: {0}'.format(i))
        itk = data_ntk[i]
        ai = itk.dot(jtk.T)/k # construct TxT covariance matrix      
        ai_diff = (ai - ai.T).reshape([1, t**2]) # construct small cdiff and reshape into 1x(T^2)
        mat_nt2[i] = ai_diff
    return np.expand_dims(mat_nt2.dot(mat_nt2.T),0)

In [6]:
# This version of construct_adiff() is a little simpler, but not (much) faster.
def construct_adiff_simpler(j, data_ntk=None):
    """
    Process a single neuron, as specified by 'j'.
    """
    print(f"Processing neuron {j}")
    if data_ntk is None:
        data_ntk = load_data_c_order()

    n, t, k = data_ntk.shape # n = 31305, t = 50, k ≈ 800 
    jtk = data_ntk[j] # load data from neuron j

    # Do all the multiplies in one step.
    ai = np.matmul(data_ntk, jtk.T) / k
    assert ai.shape == (n,t,t)

    mat_nt2 = (ai - ai.transpose(0,2,1)).reshape([n, t**2])
    result = mat_nt2.dot(mat_nt2.T)
    return result[None]

In [7]:
def construct_adiff_group_sum(group_start, group_size):
    """
    Process a group of neurons, starting with j=group_start.
    """
    if isinstance(group_start, Iterable):
        # You can call this function with an integer or an
        # integer wrapped in a list/array, when calling via map_blocks()
        assert len(group_start) == 1
        group_start = group_start[0]

    data_ntk = load_data_c_order()
    n, t, k = data_ntk.shape # n = 31305, t = 50, k ≈ 800 
    group_end = min(n, group_start+group_size)

    result = np.zeros((1,n,n), np.float32)
    for j in range(group_start, group_end):
        result += construct_adiff_simpler(j, data_ntk)
    return result

In [8]:
## Compare the two versions of construct_adiff()
#data_ntk = load_data_c_order()
#%time r1 = construct_adiff(0, data_ntk)
#%time r2 = construct_adiff_simpler(0, data_ntk)
#assert r1.shape == r2.shape
#assert np.allclose(r1, r2)

### Create Dask cluster

In [9]:
TOTAL_ACTIVE_CORES = 1000

# On each worker, you can reserve more cores than you plan to actively use,
# in case each active CPU needs more than its "fair share" of RAM.
# The inactive CPUs sit idle, but it's better than running out of RAM.
# Even if RAM isn't an issue, it's often a good idea to leave at least one CPU idle,
# just to allow background work to run (like garbage collection).

# As shown here, I used a 2-1 ratio of 'reserved' cores to 'active' CPUs,
# but I think you could get away with 32-to-24 or even 32-to-31.
# In the end, the computation didn't need as much RAM as I'd feared.
reserved_cores_per_worker = 32
active_cores_per_worker = 16

# Most of numpy's linear algebra functions such as dot()
# and matmul() can use more than one core, if you let them.
# If you specify a 2-1 ratio (or more),
# we may as well squeeze some value from the 'idle' cores
# if we let numpy use them when possible.
omp_threads = reserved_cores_per_worker // active_cores_per_worker
env_extra = [
    f"export NUM_MKL_THREADS={omp_threads}",
    f"export OPENBLAS_NUM_THREADS={omp_threads}",
    f"export OPENMP_NUM_THREADS={omp_threads}",
    f"export OMP_NUM_THREADS={omp_threads}",
]

cluster = get_jobqueue_cluster( walltime="12:00",
                                ncpus=reserved_cores_per_worker,
                                cores=active_cores_per_worker,
                                memory=f"{32*15}GB",
                                log_directory='worker-logs',
                                env_extra=env_extra )

client = Client(cluster)

workers_needed = int(np.ceil(TOTAL_ACTIVE_CORES / active_cores_per_worker))
client.cluster.scale(workers_needed)

# Adjust for how many cores we actually requested
TOTAL_ACTIVE_CORES = workers_needed * active_cores_per_worker

[2019-08-04 13:33:12,184] INFO mem specification for LSF not set, initializing it to 480000000000 bytes


In [10]:
client

0,1
Client  Scheduler: tcp://10.36.111.13:42011  Dashboard: http://10.36.111.13:8787/status,Cluster  Workers: 63  Cores: 1008  Memory: 30.24 TB


In [11]:
print(client.cluster.dashboard_link)

http://10.36.111.13:8787/status


In [12]:
#client.cluster.close()
#client.close()

### Execute with dask
Issue all groups in one big computation

In [21]:
%%time

n = 31305 # number of neurons (to determine the size of the result (nn x n x n) which we sum over the first dimension)

nn = n
#nn = 2000 # subset of neurons to test the code over

# Each worker processes one 'group' at a time,
# and the data needs to be loaded just once per group.
# Therefore bigger groups incur less overhead to read the data.
# Furthermore, one (large) intermediate sum needs to be stored per group,
# so (in theory) fewer large groups is more RAM efficient than many small groups.
# (In fact, if the groups are too small, they use too much RAM and the job fails.)
#
# However, every worker just gets one task (large groups), we don't get much value out of
# dask's progress monitoring.  (It will appear as if there's no progress for a long time,
# and then all of the workers start finishing at once.)
# For reassurance that *something* is happening, we'll use medium-sized groups.
# Even at this setting, you may need to wait for 5-10 minutes before the dask
# dashboard shows much activity.
GROUP_SIZE = 10 # should be ~3 tasks per worker

# Adjust if necessary for test scenarios (when nn is small).
GROUP_SIZE = min(nn // TOTAL_ACTIVE_CORES, GROUP_SIZE)
GROUP_SIZE = max(1, GROUP_SIZE)

num_groups = int(np.ceil(nn / GROUP_SIZE))
print(f"Processing {nn} neurons in {num_groups} groups of {GROUP_SIZE} across {TOTAL_ACTIVE_CORES} cores")

group_starts = da.arange(0, nn, GROUP_SIZE, chunks=1)
group_sums = group_starts.map_blocks( construct_adiff_group_sum,
                                      group_size=GROUP_SIZE,
                                      dtype='float32',
                                      new_axis=[1,2],
                                      chunks=(1,n,n) ) # bugfix: I changed this from (nn,n,n)
final_sum = group_sums.sum(0).compute()

Processing 31305 neurons in 3131 groups of 10 across 1008 cores
CPU times: user 4min 13s, sys: 19.2 s, total: 4min 32s
Wall time: 33min 26s


In [22]:
np.save('/nrs/flyem/bergs/tmp/SCA/final-sum.npy', final_sum)

### Alternative execution flow
Don't process all neurons in one computation.
Use smaller groups, and process the groups in batches.<br>
(I thought this might be necessary, or at least make it easier to monitor the job's progress, but now I think it's not better than the single-shot solution above.)

In [None]:
%%time

n = 31305 # number of neurons (to determine the size of the result (nn x n x n) which we sum over the first dimension)

nn = n
#nn = 10 # subset of neurons to test the code over

# Each worker processes one 'group' at a time,
# and the data needs to be loaded just once per group.
# Therefore bigger groups incur less overhead to read the data.
# But smaller group sizes make it easier to monitor progress in the dask dashboard.
GROUP_SIZE = 2

if nn > TOTAL_ACTIVE_CORES * GROUP_SIZE:
    BATCH_SIZE = TOTAL_ACTIVE_CORES * GROUP_SIZE
else:
    BATCH_SIZE = nn

#
# The computation requires too much RAM to perform all at once.
# Therefore, process the data in smaller batches than the whole set.
#
final_sum = np.zeros((n,n), np.float32)
for batch_number, batch_start in enumerate(range(0, nn, BATCH_SIZE)):
    print(f"Starting batch {batch_number}")
    
    group_starts = da.arange(batch_start, batch_start+BATCH_SIZE, GROUP_SIZE, chunks=1)
    group_sums = group_starts.map_blocks( construct_adiff_group_sum,
                                          group_size=GROUP_SIZE,
                                          dtype='float32',
                                          new_axis=[1,2],
                                          chunks=(1,n,n) ) # bugfix (I think): I changed this from (nn,n,n)
    batch_sum = group_sums.sum(0).compute()
    final_sum += batch_sum

In [None]:
np.save('/nrs/flyem/bergs/tmp/SCA/final-sum.npy', final_sum)

### Kill the cluster

In [23]:
client.cluster.stop_all_jobs()
client.cluster.close()
client.close()