In [None]:
'''
Author: Raquel Dias
The goal of this notebook is to benchmark multiple parallel processing libraries
The target task consists of editing values in chunks of a shared numpy array.
A certain proportion of the values in the array will be set to zero after picking their indexes randomly.
'''

In [2]:
import numpy as np
import random
import timeit
import time

In [3]:
#parallel processing libraries
import joblib
import multiprocessing
import threading, queue
import ray

In [4]:
#global variables
alt_signal_only=False
do_numpy_masking=True
par_mask_proc=8
s=6000 #simulating a batch of s observations (samples)
v=6500 #number of variables (variants)
a=2 #values per variable (alleles)
n_batches=100
my_input = np.ones((s,v*a))
my_input = my_input.reshape((s,int(v),a))

In [5]:
def measure_runtime(f,*args,nreps=10):
    #print(args)
    my_avg=0
    for i in range(nreps):
        
        start = timeit.default_timer()

        result = f(*args)
    
        stop = timeit.default_timer()
        
        time.sleep(1)
        print("rep",i,":",(stop - start))

        my_avg += (stop - start) / nreps
        
    print('Average time: ', my_avg)

In [6]:
def mask_data_per_sample_parallel(mydata, par_mask_method, mask_rate=0.9, categorical="False"):

    nmask = int(round(len(mydata[0])*mask_rate))
    my_mask=[0,0]

    if(categorical=="True"):
        my_mask=-1
    elif(alt_signal_only==True):
        my_mask=0

    def chunks(l, n):
        for i in range(0, len(l), n):
            yield l[i:i + n]

    if(do_numpy_masking==True):
        m=len(mydata[0])
        s=len(mydata)
        arr=np.arange(m)
        # random matrix of indexes
        if(par_mask_method=="joblib"):
            
            def mask_worker(data,nmask,replace=False):
                inds=np.random.choice(arr,size=nmask, replace=False)
                data[np.arange(len(data))[:, None], inds] = [0,0]
                return data
            
            result=joblib.Parallel(n_jobs=par_mask_proc)(joblib.delayed(mask_worker)(i,nmask) for i in chunks(mydata,par_mask_proc))
            mydata=np.array(result)
                       
        elif(par_mask_method=="thread"):
         
            proc = []
            q = queue.Queue()
            
            def mask_worker(data,q):
                inds = np.random.choice(arr,size=nmask, replace=False)
                data[np.arange(len(data))[:, None], inds] = [0,0]
                q.put(data)
            
            for i in chunks(mydata,par_mask_proc):
                p = threading.Thread(target=mask_worker, args=(i,q))
                p.Daemon = True
                proc.append(p)
                               
            for i in chunks(proc,par_mask_proc):
                for j in i:
                    j.start()
                for j in i:
                    j.join()
            
            mydata = [q.get() for i in proc]
            
            mydata = np.array(mydata)
            return mydata

           
        elif(par_mask_method=="threadpool" or par_mask_method=="pool"):
                       
            def mask_worker(data):
                inds=np.random.choice(arr,size=nmask, replace=False)
                data[np.arange(len(data))[:, None], inds] = [0,0]
                return data
            
            pool = multiprocessing.pool.ThreadPool(par_mask_proc)
            result = pool.map(mask_worker,chunks(mydata,par_mask_proc))
            pool.close()
            pool.join()    
            result = [val for sublist in result for val in sublist]
            mydata = np.array(result)
            
        elif(par_mask_method=="ray"):
            #a little slower if you restart ray every time
            #ray.shutdown()
            #ray.init()
            @ray.remote
            def mask_worker(data):
                inds=np.random.choice(arr,size=nmask, replace=False)
                #fails if we try to overwrite, solution bellow
                data.setflags(write=1)
                data[np.arange(len(data))[:, None], inds] = [0,0]

            futures = [mask_worker.remote(i) for i in chunks(mydata,par_mask_proc)]
            result = ray.get(futures)
            result = np.array(result)
            mydata = result
            #ray.shutdown()
            
        else:
            inds=[np.random.choice(arr,size=nmask,replace=False) for i in range(s)]
            #slower
            #inds=np.stack([np.random.choice(np.arange(m),size=nmask,replace=False) for i in range(s)])
            mydata[np.arange(s)[:, None], inds] = my_mask
    else:
        #REAAAAAAALLY SLOOOOOOW, not using this anymore
        j = 0
        while j < len(mydata):
            #redefines which variants will be masked for every new sample
            maskindex = random.sample(range(0, len(mydata[0]-1)), nmask) 
            for i in maskindex:
                mydata[j][i]=my_mask
            j=j+1

    return mydata

In [7]:
measure_runtime(mask_data_per_sample_parallel,np.copy(my_input),"serial",0.95,nreps=3)

rep 0 : 2.8467207923531532
rep 1 : 2.446762025821954
rep 2 : 2.5559678939171135
Average time:  2.616483570697407


In [8]:
measure_runtime(mask_data_per_sample_parallel,np.copy(my_input),"pool",0.95,nreps=3)

rep 0 : 0.8282273770309985
rep 1 : 0.9477627142332494
rep 2 : 0.9965687529183924
Average time:  0.9241862813942134


In [9]:
measure_runtime(mask_data_per_sample_parallel,np.copy(my_input),"joblib",0.95,nreps=3)

rep 0 : 5.727488715667278
rep 1 : 3.9238162506371737
rep 2 : 3.524286929052323
Average time:  4.391863965118925


In [10]:
measure_runtime(mask_data_per_sample_parallel,np.copy(my_input),"thread",0.95,nreps=3)

rep 0 : 1.147389508318156
rep 1 : 1.1911329659633338
rep 2 : 1.2736856141127646
Average time:  1.204069362798085


In [11]:
ray.shutdown()
ray.init()
measure_runtime(mask_data_per_sample_parallel,my_input,"ray",0.95,nreps=3)
ray.shutdown()

2019-12-06 10:17:02,525	INFO resource_spec.py:205 -- Starting Ray with 12.84 GiB memory available for workers and up to 6.44 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


rep 0 : 5.236430027987808
rep 1 : 1.3128844080492854
rep 2 : 1.183456470258534
Average time:  2.577590302098542


In [12]:
s=12000 #simulating a larger batch of s observations (samples)
v=6500 #number of variables (variants)
a=2 #values per variable (alleles)
n_batches=100
my_input = np.ones((s,v*a))
my_input = my_input.reshape((s,int(v),a))

for i in [2,4,8]:
    global par_mask_proc
    par_mask_proc = i
    for j in ["serial","pool","thread","joblib","ray"]:
        print("Number of cores:", i, "method:", j)
        if(j=="ray"):
            ray.init()
            measure_runtime(mask_data_per_sample_parallel,my_input,j,0.95,nreps=3)
            ray.shutdown()
        else:
            measure_runtime(mask_data_per_sample_parallel,my_input,j,0.95,nreps=3)

Number of cores: 2 method: serial
rep 0 : 5.582591149024665
rep 1 : 5.368513234890997
rep 2 : 5.133274368941784
Average time:  5.361459584285815
Number of cores: 2 method: pool
rep 0 : 4.183285878971219
rep 1 : 4.341285431757569
rep 2 : 4.191703453660011
Average time:  4.2387582547962666
Number of cores: 2 method: thread
rep 0 : 6.244843455962837
rep 1 : 6.572435938287526
rep 2 : 6.495041779708117
Average time:  6.437440391319494
Number of cores: 2 method: joblib
rep 0 : 7.351932437624782




rep 1 : 8.524115597829223


2019-12-06 10:18:42,245	INFO resource_spec.py:205 -- Starting Ray with 15.28 GiB memory available for workers and up to 7.65 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


rep 2 : 8.000370273832232
Average time:  7.958806103095412
Number of cores: 2 method: ray
rep 0 : 9.343328353017569
rep 1 : 4.904000014066696
rep 2 : 5.023518930654973
Average time:  6.423615765913079
Number of cores: 4 method: serial
rep 0 : 5.877057794947177
rep 1 : 5.304797041229904
rep 2 : 5.4754237732850015
Average time:  5.5524262031540275
Number of cores: 4 method: pool
rep 0 : 2.3429697430692613
rep 1 : 2.6157713211141527
rep 2 : 2.6968920500949025
Average time:  2.5518777047594385
Number of cores: 4 method: thread
rep 0 : 3.769723561126739
rep 1 : 3.8617144702002406
rep 2 : 3.9173536538146436
Average time:  3.8495972283805404
Number of cores: 4 method: joblib
rep 0 : 7.125276986975223
rep 1 : 6.331410346087068


2019-12-06 10:20:18,244	INFO resource_spec.py:205 -- Starting Ray with 15.28 GiB memory available for workers and up to 7.66 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


rep 2 : 6.952824603766203
Average time:  6.803170645609498
Number of cores: 4 method: ray
rep 0 : 6.995519456919283
rep 1 : 3.3189884601160884
rep 2 : 3.2063463921658695
Average time:  4.5069514364004135
Number of cores: 8 method: serial
rep 0 : 5.607817193958908
rep 1 : 5.2389279701747
rep 2 : 5.16688754176721
Average time:  5.337877568633607
Number of cores: 8 method: pool
rep 0 : 1.7400501789525151
rep 1 : 1.8826302350498736
rep 2 : 1.8865611739456654
Average time:  1.8364138626493514
Number of cores: 8 method: thread
rep 0 : 2.3902946012094617
rep 1 : 2.6317544551566243
rep 2 : 2.384450905956328
Average time:  2.468833320774138
Number of cores: 8 method: joblib
rep 0 : 6.312227194663137
rep 1 : 6.5751835480332375


2019-12-06 10:21:39,450	INFO resource_spec.py:205 -- Starting Ray with 13.82 GiB memory available for workers and up to 6.91 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).


rep 2 : 5.233146513812244
Average time:  6.040185752169539
Number of cores: 8 method: ray
rep 0 : 6.158026240766048
rep 1 : 2.614896550308913
rep 2 : 2.567860417999327
Average time:  3.7802610696914294
