In [1]:
from multiprocessing import Pool

def square(x):
    return x*x

dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
print ('Dataset: ' + str(dataset))

# Run this with a pool of 5 agents having a chunksize of 3 until finished
agents = 5
chunksize = 3
with Pool(processes=agents) as pool:
    result = pool.map(square, dataset)


# Output the result
print ('Result:  ' + str(result))

Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
Result:  [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]


In [3]:
pool=Pool(processes=5)
result=[]
for mapped_result in pool.imap_unordered(square, dataset):
    result.append(mapped_result*mapped_result)

print ('Result:  ' + str(result))

Result:  [1, 16, 81, 2401, 1296, 6561, 625, 4096, 256, 10000, 38416, 20736, 28561, 14641]


In [6]:
result= {data: pool.map(square, [data]) for data in dataset}
print ('Result:  ' + str(result))

Result:  {1: [1], 2: [4], 3: [9], 4: [16], 5: [25], 6: [36], 7: [49], 8: [64], 9: [81], 10: [100], 11: [121], 12: [144], 13: [169], 14: [196]}


Here we are actually are doing several mapping, each with a single element list  (?)


In [8]:
result= {data: pool.map(square, [data]) for data in dataset}
pool.close()
pool.join()


print ('Result:  ' + str(result))

Result:  {1: [1], 2: [4], 3: [9], 4: [16], 5: [25], 6: [36], 7: [49], 8: [64], 9: [81], 10: [100], 11: [121], 12: [144], 13: [169], 14: [196]}


In [11]:
result = pool.apply_async(square, [10])    
print (result.get(timeout=1) )
print (pool.map(square, range(10))  )

ValueError: Pool not running

In [13]:
pool=Pool(processes=5)
result = pool.apply_async(square, [10])    
print (result.get(timeout=1) )
print (pool.map(square, range(10))  )


100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


The apply_async method can be used for (different) processes that can be run in parallel yet may take different amount of time.
and then resume next steps when all are done.

something like this:

pool = Pool(processes=3)
parsed = pool.apply_async(Process1, [largefile])
pattern = pool.apply_async(Process2, [bigfile])
calc_res = pool.apply_async(Process3, [integer])

pool.close()
pool.join()
final = FinalProcess(parsed.get(), pattern.get(), calc_res.get())

One thing to note is that apply_async needs a get() method to call the results but map does not


In [None]:
pool=Pool(processes=5)
result = pool.apply(square, range(10))    
print (result.get(timeout=1) )
print (pool.map(square, range(10))  )



Note here in above you cannot give a single list (iterable) as input, like we do in map, so the above syntax is wrong!


In [None]:
pool=Pool(processes=5)
result =[ pool.apply(square, args=[arg1]) for arg1 in range(10) ]
# which is the same as:
result2= pool.map(square, range(10)) 
print (result.get(timeout=1) )



In [33]:
pool=Pool(processes=5)
result =[ pool.apply(square, [arg1]) for arg1 in range(10) ]

# which is the same as:
result2= pool.map(square, range(10)) 
print (result)
print (result2)




[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]


The argument of apply also must be a list, although looks like we can't give all inputs at once.

In [34]:
def custom_multiply(x,y):
    return x*y*y

def unpack_multiply(args):
    return custom_multiply(args[0],args[1])

pool=Pool(processes=5)
result =[ pool.apply(custom_multiply, (arg1,10) ) for arg1 in range(10) ]

# which is the same as:
result2= pool.map(unpack_multiply, [ (item,10) for item in range(10)] ) 

print (result)
print (result2)


[0, 100, 200, 300, 400, 500, 600, 700, 800, 900]
[0, 100, 200, 300, 400, 500, 600, 700, 800, 900]


Process ForkPoolWorker-118:
Process ForkPoolWorker-117:
Process ForkPoolWorker-119:
Process ForkPoolWorker-116:
Process ForkPoolWorker-120:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*se

Interesting note around previous note is that when we entered a single argument as (arg), it considered it as integer and not tuple, so we needed []. but here we donot need to enter [] and () gives the tuple.

Also, for multiple arguments, even if use the unpacking function like above, I ran into some issues becasue 
the object given as input was not passed correctly.
(Now was this an issue with my unpacking code?)

The other method that can be used to solve this is starmap. effectively, Pool.starmap() is like a version of Pool.map() that accepts arguments.






In [35]:
result= pool.starmap(custom_multiply, [ (item,10) for item in range(10)] ) 
print (result)




[0, 100, 200, 300, 400, 500, 600, 700, 800, 900]


The issues of the pool with codes that are not self contained.

Looks like the pool should not include any method that cannot be solved by itself.
So in that sense, all the read and writes should be removed from the method.
Also if you try to use this pool function in a method of a class
then should introduce some separate class or method to run it with pool
I am not sure at this point if one can solve any non-static methods of a class using the pool.
The info here may be usefull also https://gist.github.com/bnyeggen/1086393

In [42]:
import numpy as np

np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])
data = arr.tolist()


Using the apply_async with a callback function

 if  don’t provide a callback, then you get a list of pool.ApplyResult objects
 which contains the computed output values from each process.

In [46]:

results = []

def howmany_within_range2(i, row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return (i, count)

def collect_result(result):
    global results
    results.append(result)

for i, row in enumerate(data):
    pool.apply_async(howmany_within_range2, args=(i, row, 4, 8), callback=collect_result)


# results.sort(key=lambda x: x[0])
results_final = [r for i, r in results]

print(results[:20])
# print(results_final[:10])

[(0, 3), (1, 2), (2, 3), (3, 1), (4, 3), (5, 2), (6, 1), (7, 4), (8, 2), (9, 2), (10, 3), (12, 4), (13, 4), (11, 1), (15, 0), (17, 3), (14, 2), (16, 4), (18, 2), (19, 2)]


In [48]:
result_objects = [pool.apply_async(howmany_within_range2, args=(i, row, 4, 8)) for i, row in enumerate(data)]

# result_objects is a list of pool.ApplyResult objects
results = [r.get()[1] for r in result_objects]
print(results[:20])



[3, 2, 3, 1, 3, 2, 1, 4, 2, 2, 3, 1, 4, 4, 2, 0, 4, 3, 2, 2]


In [51]:
from time import sleep


def callback(a):
    print (a)


def worker(i, n):
    print ('Entering worker ', i)
    sleep(n)
    print ('Exiting worker')
    return 'worker_response'


pool = Pool(4)
a = [pool.apply_async(worker, (i, 4), callback=callback) for i in range(8)]
for i in a:
    i.wait()

Entering worker  3
Entering worker  0
Entering worker  1Entering worker 
 2
Exiting worker
Entering worker  4
Exiting worker
Entering worker  5
Exiting worker
Entering worker Exiting worker 
6
Entering worker  7
worker_response
worker_response
worker_response
worker_response
Exiting worker
Exiting worker
Exiting worker
Exiting worker
worker_response
worker_response
worker_response
worker_response


https://stackoverflow.com/questions/48807196/difference-between-apply-and-apply-async-in-python-multiprocessing-module


additiona readings


https://stackabuse.com/parallel-processing-in-python/

https://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers


https://docs.python.org/2/library/multiprocessing.html
https://docs.python.org/3/library/multiprocessing.html


https://helpful.knobs-dials.com/index.php/Python_usage_notes/Multiprocessing_notes


https://www.binpress.com/simple-python-parallelism/