In [1]:
import numpy as np
import timeit
from scipy.sparse import csr_matrix
from numpy.random import default_rng
import time
import concurrent.futures
import multiprocessing
from multiprocessing import Pool
from itertools import repeat

In [2]:
#--- intersect function 1
def permutation_fset_intersect(permutation_array, functionset_array):
    result = np.full([permutation_array.shape[0], functionset_array.shape[0]],-1, dtype=np.int32)
    permutation_array_sets = map(set, permutation_array)
    functionset_array_sets = list(map(set, functionset_array))
    for i, set1 in enumerate(permutation_array_sets):
        for j, set2 in enumerate(functionset_array_sets):
            result[i,j] = len(set1.intersection(set2))
    return result

In [3]:
#--- intersect function 2
def permutation_fset_np_intersect(permutation_array, functionset_array):
    result = np.full([permutation_array.shape[0], functionset_array.shape[0]],-1, dtype=np.int32)
    for i, x in np.ndenumerate(permutation_array):
        for j, y in np.ndenumerate(functionset_array):
            result[i,j] = len(np.intersect1d(x, y, assume_unique=True))
    return result

In [5]:
def listnp_to_padded_nparray(listnp):
    max_width=np.max([np.size(l) for l in listnp])
    padded_array=np.asarray([np.pad(l,(0,max_width-np.size(l)),mode='constant',constant_values=(0,0) ) for l in listnp])
    return(padded_array)

In [21]:
#--- generate random data
rng = default_rng()
perm_array = []
int_array = list(range(1, 15000))
for i in range(10000):
    perm_array.append(rng.choice(int_array, size=800, replace=False))
    #perm_array[i] = np.unique(perm_array[i])

perm_array = np.asarray(perm_array)

fset_array = []
for i in range(15000):
    set_size = np.random.randint(10,300,1)
    fset_array.append(np.unique(np.random.randint(1,15000,set_size)))
fset_array = listnp_to_padded_nparray(fset_array)

In [22]:
function_array_g=fset_array

In [23]:
#--- intersect function 3
def csr_sparse(A, z):
    m, n = A.shape
    indptr = np.arange(0, m*n+1, n)
    data = np.ones(m*n, dtype=np.uint16)
    return csr_matrix((data, A.ravel(), indptr), shape=(m,z))
    
def permutation_fset_Mintersect(args):
    start = time.perf_counter()
    permutation_array = args[0]
    function_array = args[1]
    max_z = max(permutation_array.max(), function_array.max()) + 1
    intersection = csr_sparse(permutation_array,max_z ) * csr_sparse(function_array, max_z).T
    intersection = intersection.todense()
    finish = time.perf_counter()
    print(f'This process took {round(finish-start, 2)} second(s)')
    return np.squeeze(np.asarray(intersection))

def permutation_fset_Mintersect_mp(permutation_array, function_array):
    start = time.perf_counter()
    max_z = max(permutation_array.max(), function_array.max()) + 1
    intersection = csr_sparse(permutation_array,max_z ) * csr_sparse(function_array, max_z).T
    intersection = intersection.todense()
    finish = time.perf_counter()
    print(f'This process took {round(finish-start, 2)} second(s)')
    return np.squeeze(np.asarray(intersection))

def permutation_fset_Mintersect_global(permutation_array):
    start = time.perf_counter()
    max_z = max(permutation_array.max(), function_array_g.max()) + 1
    intersection = csr_sparse(permutation_array,max_z ) * csr_sparse(function_array_g, max_z).T
    intersection = intersection.todense()
    finish = time.perf_counter()
    print(f'This process took {round(finish-start, 2)} second(s)')
    return np.squeeze(np.asarray(intersection))

In [9]:
n_cores=4

In [31]:
perm_array= np.concatenate([perm_array, perm_array, perm_array, perm_array, perm_array, perm_array, perm_array, perm_array, perm_array, perm_array])

In [32]:
np.shape(perm_array)

(1000000, 800)

In [26]:
def multicore_intersect_mp(permutation_array, functionalset_array,n_cores):
    #results = []
    split_permutation_array = np.array_split(permutation_array, n_cores)
    with multiprocessing.Pool(processes=n_cores) as pool:
        results = pool.starmap(permutation_fset_Mintersect_global, zip(split_permutation_array,repeat(fset_array)))
    pool.close()
    pool.join()
    return(results)

In [27]:
def multicore_intersect_global(permutation_array,n_cores):
    split_permutation_array = np.array_split(permutation_array, n_cores)
    with concurrent.futures.ProcessPoolExecutor(max_workers=n_cores) as executor:
        results = executor.map(permutation_fset_Mintersect_global,split_permutation_array)
    executor.shutdown()
    return(results)

In [28]:
def multicore_intersect(permutation_array, functionalset_array,n_cores):
    #results = []
    split_permutation_array = np.array_split(permutation_array, n_cores)
    with concurrent.futures.ProcessPoolExecutor(max_workers=n_cores) as executor:
        jobs = executor.map(permutation_fset_Mintersect,zip(split_permutation_array, repeat(fset_array)))
    executor.shutdown()
    #for idx, job in enumerate(jobs):
    #    results[idx] = job
    #for job in concurrent.futures.as_completed(jobs):
    #    results.append(job)
    #return(np.concatenate([j for j in jobs], axis=0))
    results = list(jobs)
    return(results)

In [15]:
%timeit permutation_fset_Mintersect((perm_array,fset_array))

This process took 25.09 second(s)


KeyboardInterrupt: 

In [34]:
start = time.perf_counter()
results=multicore_intersect(perm_array, fset_array, 1)
finish = time.perf_counter()
print(f'Finished in {round(finish-start, 2)} second(s)')

This process took 248.86 second(s)
This process took 772.85 second(s)
This process took 822.6 second(s)


Process ForkProcess-51:
Process ForkProcess-56:
Process ForkProcess-52:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/joshuaschmidt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/joshuaschmidt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/Users/joshuaschmidt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/joshuaschmidt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/joshuaschmidt/anaconda3/lib/python3.7/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Users/joshuaschmidt/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/Users/

KeyboardInterrupt: 

  File "/Users/joshuaschmidt/anaconda3/lib/python3.7/multiprocessing/queues.py", line 93, in get
    with self._rlock:
  File "/Users/joshuaschmidt/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/Users/joshuaschmidt/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/joshuaschmidt/anaconda3/lib/python3.7/multiprocessing/queues.py", line 93, in get
    with self._rlock:
  File "/Users/joshuaschmidt/anaconda3/lib/python3.7/concurrent/futures/process.py", line 233, in _process_worker
    call_item = call_queue.get(block=True)
KeyboardInterrupt
  File "/Users/joshuaschmidt/anaconda3/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
    return self._semlock.__enter__()
  File "/Users/joshuaschmidt/anaconda3/lib/python3.7/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
KeyboardInterrupt
  File "/

In [35]:
start = time.perf_counter()
results=multicore_intersect_global(perm_array, 4)
finish = time.perf_counter()
print(f'Finished in {round(finish-start, 2)} second(s)')

This process took 1349.79 second(s)
This process took 1943.51 second(s)
This process took 1934.15 second(s)
This process took 1965.88 second(s)
Finished in 2016.38 second(s)


In [39]:
[j for j in jobs]

[array([[ 5,  2, 10, ...,  8,  2, 10],
        [ 8,  0, 16, ...,  4,  7,  8],
        [ 8,  0, 11, ...,  7,  6,  9],
        ...,
        [ 5,  2, 14, ..., 10,  6, 15],
        [ 2,  0, 13, ...,  6,  6,  6],
        [ 4,  2,  6, ...,  2,  6, 13]], dtype=uint32),
 array([[ 6,  1, 11, ...,  4,  5, 17],
        [ 6,  1, 15, ...,  4,  8,  9],
        [ 3,  2, 13, ...,  5,  5, 20],
        ...,
        [ 6,  2, 19, ...,  2,  1, 17],
        [ 2,  2,  9, ...,  2,  3, 16],
        [ 7,  0, 17, ...,  3,  3, 17]], dtype=uint32),
 array([[ 8,  2, 12, ...,  4,  2, 17],
        [ 7,  0, 13, ...,  5,  8, 15],
        [ 3,  1, 10, ...,  6,  3, 15],
        ...,
        [ 7,  2, 18, ...,  5,  2, 12],
        [ 8,  1, 13, ...,  3,  3, 13],
        [ 5,  2,  9, ...,  8,  2,  7]], dtype=uint32),
 array([[ 3,  4,  9, ...,  5,  6, 11],
        [ 6,  1,  9, ...,  3,  7, 17],
        [ 5,  2, 14, ...,  4,  4, 12],
        ...,
        [ 7,  1,  5, ...,  4,  3, 10],
        [ 8,  1, 10, ...,  5,  6, 16],
   

In [138]:
result_sm

array([[ 3,  5, 17, ...,  5,  4,  4],
       [ 3, 10, 14, ...,  0,  2,  2],
       [ 1,  5, 16, ...,  2,  3,  4],
       ...,
       [ 3,  5, 17, ...,  3,  3,  6],
       [ 3,  6, 15, ...,  1,  2,  2],
       [ 0,  5, 13, ...,  1,  1,  4]], dtype=uint32)

True