In [7]:
import zarr
import numpy as np
import anndata
import scipy as sp
import numpy as np
import matplotlib.pyplot as plt
import dask
from pyseq import image_analysis as ia
import warnings
warnings.filterwarnings('ignore')
from dask.distributed import Client
import numba
from os import makedirs, getcwd
import joblib
from dask_jobqueue import SLURMCluster
import skimage
import time
from os.path import exists, join
from joblib import Parallel, delayed
from joblib import parallel_backend
from dask.distributed import progress
#%run /gpfs/commons/home/jsingh/util_pyseq.ipynb

In [9]:
#Loading Data
im = ia.get_HiSeqImages(image_path = '/gpfs/commons/home/jsingh/zarrs/m387ntga2.zarr')
labels = skimage.io.imread('/gpfs/commons/groups/nygcfaculty/PySeq/20210428_mouse_genotype_2/segmented_sections/m387ntga2_labels.tiff')
labels = dask.array.asarray(labels)

ImageAnalysis::Opened m387ntga2 


In [10]:
#format = one_z_plane_obj_step_channel_cycle
array_object_list = []
name_object_list = []
for i in im.im['channel'].values:
    for j in im.im['cycle'].values:
        #for k #in im.im['obj_step'].values[1]:
            k = 8028
            name = "one_x_plane_"+str(i)+"_"+str(j)+"_"+str(k)
            name_object_list.append(name)
            nme = im.im.sel(obj_step = k, cycle=j, channel = i)
            array_object_list.append(nme)
        

In [11]:
############ INITIATING CLUSTER ####################

In [13]:
def get_cluster(queue_name = 'pe2', log_dir=None):
    """ Make dask cluster w/ workers = 2 cores, 32 G mem, and 1 hr wall time.

        return cluster, client
    """
    if log_dir is None:
        log_dir = join(getcwd(),'dask_logs')
        makedirs(log_dir, exist_ok=True)

    cluster = SLURMCluster(
                queue = queue_name, 
                cores = 12 ,
                memory = '128G',
                walltime='1:00:00',
                log_directory=log_dir)
                #extra=["--lifetime", "55m", "--lifetime-stagger", "4m"])
    client = Client(cluster, timeout="50s")

    return cluster, client

cluster, client = get_cluster()

In [14]:
def scale_cluster(count): 
    cluster.scale(count)
    return cluster.dashboard_link
scale_cluster(5)

'http://10.4.200.50:46832/status'

In [15]:
#Or specify cores or memory directly

In [16]:
plane = array_object_list[0] #sample plane 

In [None]:
plane.values[labels == 0]

In [10]:
#Way 1: Using Data As Is (No Persit) with Dask Futures
plane = array_object_list[0]
def get_pixels(lab):
    m = plane.values[labels == lab+1].mean()
    return m

futures = client.submit(get_pixels, range(5))
progress(futures)

result_list = np.zeros(20)
for i in range(len(futures)):
    result_list[i] = futures[i].result()
    
    

TypeError: object of type 'Future' has no len()

In [15]:
#Way 2: Using with Dask Persist with Dask Futures
plane_persisted = array_object_list[0].persist()
def get_pixels(lab):
    m = plane_persisted.values[labels == lab+1].mean()
    return m
from dask.distributed import progress
futures = client.map(get_pixels, range(100))
#progress(futures)


result_list = np.zeros(100)
for i in range(len(futures)):
    result_list[i] = futures[i].result()


KeyboardInterrupt



In [None]:
%%time
#Way 3: Computing the same using dask compute without persisting Data
def get_pixels(lab):
    m = plane.values[labels == lab+1].mean()
    return m

with parallel_backend('dask',scheduler_host=cluster.scheduler._address):
    results = Parallel(n_jobs=-1)(delayed(get_pixels)(lab) for lab in range(100))

In [None]:
%%time
#Way 4: Computing way 3 with persist
def get_pixels(lab):
    m = plane_persisted.values[labels == lab+1].mean()
    return m

with parallel_backend('dask',scheduler_host=cluster.scheduler._address):
    results = Parallel(n_jobs=-1)(delayed(get_pixels)(lab) for lab in range(100))

In [None]:
dk = dask.array.asarray(plane.values)

In [None]:
#Way 5: Dask Masked Array
dk = dask.array.asarray(plane.values)
def get_pixels(lab):
    m = dask.array.ma.masked_array(dk,np.squeeze([labels == lab]))
    mv = dask.array.mean(m)
    return mv

futures = client.map(get_pixels, range(5))

result_list = np.zeros(20)
for i in range(len(futures)):
    result_list[i] = futures[i].result()
    
    


In [None]:
#Way 6: Dask Masked Array With Persist 
dk = dask.array.asarray(persisted_plane.values)
def get_pixels(lab):
    m = dask.array.ma.masked_array(dk,np.squeeze([labels == lab]))
    mv = dask.array.mean(m)
    return mv

futures = client.map(get_pixels, range(5))

result_list = np.zeros(20)
for i in range(len(futures)):
    result_list[i] = futures[i].result()
    



In [None]:
#Way 5 using joblib 
def get_pixels(lab):
    m = dask.array.ma.masked_array(dk,np.squeeze([labels == lab]))
    mv = dask.array.mean(m)
    return mv

with parallel_backend('dask',scheduler_host=cluster.scheduler._address):
    results = Parallel(n_jobs=-1)(delayed(get_pixels)(lab) for lab in range(100))







In [None]:
#Way x : Dask image functions
import dask_image
import dask_image.ndmeasure

index = list(range(11000))
del index[0]

ar = dask_image.ndmeasure.labeled_comprehension(image = plane, 
                                                label_image = labels, index = index, func = dask_image.ndmeasure.mean,
                                                out_dtype = float, default = None, pass_positions = False)
v = ar.compute()

In [21]:
v

array([ 137.62089726,  114.19090909,  115.02439024,  125.22727273,
       1151.01442716,  380.        ,  385.        ,  136.9754902 ,
        240.        ,  134.63862928,  146.06842338,  137.85227273,
        134.875     ,  331.425     ,  136.73469388,  132.19198312,
        410.8       ,  350.85861183,  371.57823129,  404.5       ,
        336.        ,  437.83830673,  294.06445312,  218.54237288,
        323.56      ,  316.2173913 ,  306.51856594,  313.83333333,
        310.53061224,  293.20774648,  345.        ,  360.5       ,
        307.        ,  555.1       ,  299.07142857,  490.27358491,
        343.01433121,  280.18867925,  279.95454545,  317.25294118,
        408.75      ,  277.8       ,  341.34324943,  261.75      ,
        388.5       ,  410.02564103,  343.25      ,  404.66666667,
        321.33692308,  438.42753623,  279.38111888,  480.75      ,
        391.2       ,  338.        ,  535.81031866,  385.17741935,
        431.5       ,  409.28205128,  395.25      ,  396.44759

In [None]:
#ADJUSTING TIMEOUT

In [None]:
client.close()
cluster.close()