-
Notifications
You must be signed in to change notification settings - Fork 3
Closed
Labels
documentationImprovements or additions to documentationImprovements or additions to documentation
Description
When using many-to-one dependencies in the processing of future objects it can be helpful to batch multiple future objects. This can be achieved with the following example:
from time import sleep
from executorlib import SingleNodeExecutor
def wait_function(i):
sleep(i)
return i
def get_batches(future_lst, batch_size):
if len(future_lst) % batch_size != 0:
raise ValueError("Batchsize does not match")
count = 0
div = len(future_lst) // batch_size
batch_lst, tmp_lst = [], []
while count < div:
for f in future_lst:
if f.done():
batch_lst.append(f)
if len(batch_lst) == batch_size:
yield batch_lst
batch_lst = []
count += 1
else:
tmp_lst.append(f)
future_lst = tmp_lst
with SingleNodeExecutor(max_workers=2) as exe:
future_lst = []
for i in range(10):
f = exe.submit(wait_function, i=i)
future_lst.append(f)
for batch in get_batches(future_lst=future_lst, batch_size=2):
print([f.result() for f in batch])
>>> [0, 1]
>>> [2, 3]
>>> [4, 5]
>>> [6, 7]
>>> [8, 9]It is important to mention that when the batches are submitted to the SingleNodeExecutor for additional processing then by default they are only evaluated once all the future objects from the previous step are completed. Consequently, the execution order does not change. To address this challenge it is important to increase the queuing system priority for the batches of submitted future objects, as explained in #693
Metadata
Metadata
Assignees
Labels
documentationImprovements or additions to documentationImprovements or additions to documentation