<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Slice-specified-nodes-in-dimspec" data-toc-modified-id="Slice-specified-nodes-in-dimspec-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Slice specified nodes in dimspec</a></span></li><li><span><a href="#Test-parallelism" data-toc-modified-id="Test-parallelism-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Test parallelism</a></span><ul class="toc-item"><li><ul class="toc-item"><li><span><a href="#Example-task" data-toc-modified-id="Example-task-2.0.1"><span class="toc-item-num">2.0.1&nbsp;&nbsp;</span>Example task</a></span></li></ul></li><li><span><a href="#Simple-invocation" data-toc-modified-id="Simple-invocation-2.1"><span class="toc-item-num">2.1&nbsp;&nbsp;</span>Simple invocation</a></span></li><li><span><a href="#One-var-parallelisation" data-toc-modified-id="One-var-parallelisation-2.2"><span class="toc-item-num">2.2&nbsp;&nbsp;</span>One var parallelisation</a></span></li><li><span><a href="#Two-var-parallelisation" data-toc-modified-id="Two-var-parallelisation-2.3"><span class="toc-item-num">2.3&nbsp;&nbsp;</span>Two var parallelisation</a></span></li><li><span><a href="#Many-var-parallelisation" data-toc-modified-id="Many-var-parallelisation-2.4"><span class="toc-item-num">2.4&nbsp;&nbsp;</span>Many var parallelisation</a></span></li><li><span><a href="#Use-ray" data-toc-modified-id="Use-ray-2.5"><span class="toc-item-num">2.5&nbsp;&nbsp;</span>Use ray</a></span></li><li><span><a href="#Concurrent-assignment" data-toc-modified-id="Concurrent-assignment-2.6"><span class="toc-item-num">2.6&nbsp;&nbsp;</span>Concurrent assignment</a></span></li><li><span><a href="#Use-multiprocessing" data-toc-modified-id="Use-multiprocessing-2.7"><span class="toc-item-num">2.7&nbsp;&nbsp;</span>Use multiprocessing</a></span></li></ul></li></ul></div>

In [1]:
import ray
import pyrofiler as pyrof
import numpy as np
np.random.seed(42)

# Slice specified nodes in dimspec

In [2]:
def _none_slice():
    return slice(None)

def _get_idx(x, idxs, slice_idx, shapes=None):
    if shapes is None:
        shapes = [2]*len(idxs)
    point = np.unravel_index(slice_idx, shapes)
    get_point = {i:p for i,p in zip(idxs, point)}
    if x in idxs:
        p = get_point[x]
        return slice(p,p+1)
    else:
        return _none_slice()

def _slices_for_idxs(idxs, *args, shapes=None, slice_idx=0):
    """Return array of slices along idxs"""
    slices = []
    for indexes in args:
        _slice = [_get_idx(x, idxs, slice_idx, shapes) for x in indexes ]
        slices.append(tuple(_slice))
    return slices
        

In [3]:
dims1 = [1,3,4 ]
dims2 = [2,4,3, 5]
contract = [dims1, dims2]

slice_among = [4, 3]
shapes = [2, 3]

test_slices = [
    _slices_for_idxs(slice_among, *contract, shapes=shapes, slice_idx=i)
    for i in range(4)
    ]
[print(x) for x in test_slices]


[(slice(None, None, None), slice(0, 1, None), slice(0, 1, None)), (slice(None, None, None), slice(0, 1, None), slice(0, 1, None), slice(None, None, None))]
[(slice(None, None, None), slice(1, 2, None), slice(0, 1, None)), (slice(None, None, None), slice(0, 1, None), slice(1, 2, None), slice(None, None, None))]
[(slice(None, None, None), slice(2, 3, None), slice(0, 1, None)), (slice(None, None, None), slice(0, 1, None), slice(2, 3, None), slice(None, None, None))]
[(slice(None, None, None), slice(0, 1, None), slice(1, 2, None)), (slice(None, None, None), slice(1, 2, None), slice(0, 1, None), slice(None, None, None))]


[None, None, None, None]

# Test parallelism
### Example task

In [4]:
def get_example_task():
    A = 11
    B, C = 9, 7
    shape1 = [2]*(A+B)
    shape2 = [2]*(A+C)
    T1 = np.random.randn(*shape1)
    T2 = np.random.randn(*shape2)
    common = list(range(A))
    idxs1 = common + list(range(A, A+B))
    idxs2 = common + list(range(A+B, A+B+C))
    return (T1, idxs1), (T2, idxs2)

x, y = get_example_task()
x[1], y[1]

([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19],
 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 21, 22, 23, 24, 25, 26])

## Simple invocation

In [5]:

#@ray.remote
def contract(A, B):
    a, idxa = A
    b, idxb = B
    contract_idx = set(idxa) & set(idxb)
    result_idx = set(idxa + idxb)
    C = np.einsum(a,idxa, b,idxb, result_idx)
    return C

with pyrof.timing('contract'):
    C = contract(x, y)


contract : 0.6748836040496826


## One var parallelisation

In [6]:
def sliced_contract(x, y, idxs, num):
    slices = _slices_for_idxs(idxs, x[1], y[1], slice_idx=num)
    a = x[0][slices[0]]
    b = y[0][slices[1]]
    with pyrof.timing(f'\tcontract sliced {num}'):
        C = contract((a, x[1]), (b, y[1]))
    return C


def target_slice(result_idx, idxs, num):
    slices = _slices_for_idxs(idxs, result_idx, slice_idx=num)
    return slices

In [7]:
contract_idx = set(x[1]) & set(y[1])
result_idx = set(x[1] + y[1])

with pyrof.timing(f'contract simple'):
    C = contract(x,y)
    
par_vars = [1]
target_shape = C.shape
C0 = sliced_contract(x, y, par_vars, 0)
C1 = sliced_contract(x, y, par_vars, 1)

with pyrof.timing('allocate result'):
    C_par = np.empty(target_shape)
s0 = target_slice(result_idx, par_vars, 0)
s1 = target_slice(result_idx, par_vars, 1)

with pyrof.timing('slice result'):
    _ = C_par[s0[0]]
    
with pyrof.timing('assignment'):
    C_par[s0[0]] = C0
    C_par[s1[0]] = C1

assert np.array_equal(C, C_par)


contract simple : 0.4562342166900635
	contract sliced 0 : 0.236128568649292
	contract sliced 1 : 0.25557637214660645
allocate result : 3.075599670410156e-05
slice result : 4.5299530029296875e-06
assignment : 0.5560660362243652


## Two var parallelisation

In [10]:
contract_idx = set(x[1]) & set(y[1])
result_idx = set(x[1] + y[1])

with pyrof.timing(f'contract simple'):
    C = contract(x,y)
    
par_vars = [1, 17]
target_shape = C.shape
C_patches = [
    sliced_contract(x, y, par_vars, i)
    for i in range(4)
]

with pyrof.timing('allocate result'):
    C_par = np.empty(target_shape)

patch_slces = [
    target_slice(result_idx, par_vars, i)
    for i in range(4)
]

with pyrof.timing('assignment'):
    for s, patch in zip(patch_slces, C_patches):
        C_par[s[0]] = patch

assert np.array_equal(C, C_par)


contract simple : 1.868086338043213
	contract sliced 0 : 0.09775805473327637
	contract sliced 1 : 0.20709609985351562
	contract sliced 2 : 0.10585927963256836
	contract sliced 3 : 0.09561419486999512
allocate result : 3.123283386230469e-05
assignment : 2.9174225330352783


## Many var parallelisation

In [11]:
contract_idx = set(x[1]) & set(y[1])
result_idx = set(x[1] + y[1])

with pyrof.timing(f'contract simple'):
    C = contract(x,y)
    
par_vars = [1, 4, 17, 5]
threads = 2**len(par_vars)
target_shape = C.shape

with pyrof.timing('Sequential patches'):
    C_patches = [
        sliced_contract(x, y, par_vars, i)
        for i in range(threads)
    ]

with pyrof.timing('allocate result'):
    C_par = np.empty(target_shape)

patch_slces = [
    target_slice(result_idx, par_vars, i)
    for i in range(threads)
]

with pyrof.timing('assignment'):
    for s, patch in zip(patch_slces, C_patches):
        C_par[s[0]] = patch

assert np.array_equal(C, C_par)


contract simple : 1.0820136070251465
	contract sliced 0 : 0.033449411392211914
	contract sliced 1 : 0.03836464881896973
	contract sliced 2 : 0.06398296356201172
	contract sliced 3 : 0.0529017448425293
	contract sliced 4 : 0.04923677444458008
	contract sliced 5 : 0.04331612586975098
	contract sliced 6 : 0.050642967224121094
	contract sliced 7 : 0.04017186164855957
	contract sliced 8 : 0.042578935623168945
	contract sliced 9 : 0.047518253326416016
	contract sliced 10 : 0.3524761199951172
	contract sliced 11 : 0.24108147621154785
	contract sliced 12 : 0.07741117477416992
	contract sliced 13 : 0.10958695411682129
	contract sliced 14 : 0.06605362892150879
	contract sliced 15 : 0.06382369995117188
Sequential patches : 1.3844408988952637
allocate result : 0.03616809844970703
assignment : 0.40332555770874023


## Use ray

In [8]:
try: 
    ray.init()
except:
    print('ray already working')
    pass
ray.nodes()

2020-03-09 00:14:43,260	INFO resource_spec.py:212 -- Starting Ray with 2.34 GiB memory available for workers and up to 1.19 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-03-09 00:14:43,697	INFO services.py:1078 -- View the Ray dashboard at [1m[32mlocalhost:8265[39m[22m


[{'NodeID': '33dae5548ed06bb6f95964638f8ef38e93b07f84',
  'Alive': True,
  'NodeManagerAddress': '130.202.136.154',
  'NodeManagerHostname': 'archlinux',
  'NodeManagerPort': 43901,
  'ObjectManagerPort': 39087,
  'ObjectStoreSocketName': '/tmp/ray/session_2020-03-09_00-14-43_241991_55820/sockets/plasma_store',
  'RayletSocketName': '/tmp/ray/session_2020-03-09_00-14-43_241991_55820/sockets/raylet',
  'Resources': {'node:130.202.136.154': 1.0,
   'CPU': 4.0,
   'memory': 48.0,
   'GPU': 1.0,
   'object_store_memory': 16.0},
  'alive': True}]

In [9]:
contract_idx = set(x[1]) & set(y[1])
result_idx = set(x[1] + y[1])


with pyrof.timing(f'contract simple'):
    C = contract(x,y)
    
sliced_contract_ray = ray.remote(sliced_contract)
    
par_vars = [1, 17]
threads = 2**len(par_vars)
target_shape = C.shape

with pyrof.timing('Ray total compute'):
    with pyrof.timing('Ray compute'):
        with pyrof.timing('  Ray submit'):
            C_patches = [
                sliced_contract_ray.remote(x, y, par_vars, i)
                for i in range(threads)
            ]


        patch_slces = [
            target_slice(result_idx, par_vars, i)
            for i in range(threads)
        ]

        with pyrof.timing('  fetch results'):
            patches_fetched = [ray.get(patch) for patch in C_patches]
    with pyrof.timing(' allocate result'):
        C_par = np.empty(target_shape)

    with pyrof.timing(' assignment'):
        for s, patch in zip(patch_slces, patches_fetched):
            C_par[s[0]] = patch

assert np.array_equal(C, C_par)


contract simple : 3.102792978286743
  Ray submit : 0.14916658401489258
[2m[36m(pid=55893)[0m 	contract sliced 0 : 0.5200221538543701
[2m[36m(pid=55895)[0m 	contract sliced 1 : 0.5445835590362549
[2m[36m(pid=55894)[0m 	contract sliced 2 : 0.6233804225921631
[2m[36m(pid=55896)[0m 	contract sliced 3 : 0.6949300765991211
  fetch results : 1.090453863143921
Ray compute : 1.2628028392791748
 allocate result : 4.482269287109375e-05
 assignment : 0.7576515674591064
Ray total compute : 2.0207574367523193


## Concurrent assignment

In [10]:
contract_idx = set(x[1]) & set(y[1])
result_idx = set(x[1] + y[1])


with pyrof.timing(f'contract simple'):
    C = contract(x,y)
    
sliced_contract_ray = ray.remote(sliced_contract)
    
par_vars = [1, 17]
threads = 2**len(par_vars)
target_shape = C.shape

with pyrof.timing('Ray total compute'):
    with pyrof.timing('Ray compute'):
        with pyrof.timing('  Ray submit'):
            C_patches = [
                sliced_contract_ray.remote(x, y, par_vars, i)
                for i in range(threads)
            ]


        patch_slces = [
            target_slice(result_idx, par_vars, i)
            for i in range(threads)
        ]
        
        with pyrof.timing(' allocate result'):
            C_par = np.empty(target_shape)

        idx = 0
        obj_slice_map = {o:s for o, s in zip(C_patches, patch_slces)}
        obj_slice_id = {o:i for o, i in zip(C_patches, range(threads))}
        with pyrof.timing('fetching results'):
            while True:
                ready_ids, working_ids = ray.wait(C_patches)
                if len(C_patches)==0:
                    break
                C_patches = [i for i in C_patches if i not in ready_ids]

                for patch in ready_ids:
                    sl = obj_slice_map[patch]
                    print(obj_slice_id[patch])
                    C_par[sl[0]] = ray.get(patch)

assert np.array_equal(C, C_par)


contract simple : 52.250582695007324
  Ray submit : 0.3260166645050049
 allocate result : 0.20093178749084473
0
[2m[36m(pid=55896)[0m 	contract sliced 0 : 0.733295202255249
[2m[36m(pid=55894)[0m 	contract sliced 1 : 0.8876485824584961
[2m[36m(pid=55893)[0m 	contract sliced 2 : 0.7955994606018066
[2m[36m(pid=55895)[0m 	contract sliced 3 : 0.7930917739868164


RayTaskError: [36mray::IDLE[39m (pid=55896, ip=130.202.136.154)
  File "python/ray/_raylet.pyx", line 458, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 459, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1050, in ray._raylet.CoreWorker.store_task_outputs
  File "python/ray/_raylet.pyx", line 139, in ray._raylet.check_status
ray.exceptions.ObjectStoreFullError: Failed to put object 7e0a4dfc4c87306fffffffff010000c801000000 in object store because it is full. Object size is 268435847 bytes.

## Use multiprocessing

In [11]:
from multiprocessing import Pool, Array
import os
def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

In [None]:
contract_idx = set(x[1]) & set(y[1])
result_idx = set(x[1] + y[1])

with pyrof.timing(f'contract simple'):
    C = contract(x,y)
target_shape = C.shape
    

flat_size = len(C.flatten())
par_vars = [1,3,15]
threads = 2**len(par_vars)

with pyrof.timing('init array'):
    os.global_C = tonumpyarray(Array('d', flat_size))
os.global_C = os.global_C.reshape(target_shape)

def work(i):
    patch = sliced_contract(x, y, par_vars, i)
    sl = target_slice(result_idx, par_vars, i)
    os.global_C[sl[0]] = patch
    print('done work',i)
    
pool = Pool(processes=threads)
print('inited pool')
with pyrof.timing('parallel work'):
    print('started work')
    _ = pool.map(work, range(threads))

assert np.array_equal(C, os.global_C)


In [None]:
del os.global_C