# Multiprocessing 

In [1]:
from multiprocessing import Pool, Process

def f(x):
    return x*x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))

[1, 4, 9]


In [2]:
def fn(x):
    return x*x

if __name__ == '__main__':
    p = Process(target=fn, args=([2]))
    p.start()
    p.join()

In [3]:
def fn(name):
    print ('hello', name)

if __name__ == '__main__':
    p = Process(target=fn, args=('bob'))
    p.start()
    p.join()

Process Process-7:
Traceback (most recent call last):
  File "/Users/sameera/mc/envs/py3k/lib/python3.4/multiprocessing/process.py", line 254, in _bootstrap
    self.run()
  File "/Users/sameera/mc/envs/py3k/lib/python3.4/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
TypeError: fn() takes 1 positional argument but 3 were given


In [4]:
import os

def info(title):
    print (title)
    print ('module name:', __name__)
    if hasattr(os, 'getppid'):  # only available on Unix
        print ('parent process:', os.getppid())
    print ('process id:', os.getpid())

def f(name):
    info('function f')
    print ('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

main line
module name: __main__
parent process: 5893
process id: 37633
function f
module name: __main__
parent process: 37633
process id: 37676
hello bob


In [5]:
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print (q.get())    # prints "[42, None, 'hello']"
    p.join()

[42, None, 'hello']


In [6]:
import numpy as np
np.linspace(0, 100, 11)

array([   0.,   10.,   20.,   30.,   40.,   50.,   60.,   70.,   80.,
         90.,  100.])

In [7]:
np.zeros(9)

array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.])

In [8]:
from multiprocessing import Pool
import time


def f(x):
    return x*x

if __name__ == "__main__":
    pool = Pool(processes=4)
    
    result = pool.apply_async(f, (10, ))
    print (result.get(timeout=1))
    
    print (pool.map(f, range(10)))
    
    it = pool.imap(f, range(10))
    print (it.next())
    print (it.next())
    print (it.next(timeout=1))
    
    result = pool.apply_async(time.sleep, (10,))
    print (result.get(timeout=1))

100
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4


TimeoutError: 

In [9]:
import multiprocessing

def worker():
    #worker function
    print ('Worker')
    x = 0
    while x < 100:
        print(x)
        x += 1
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

In [10]:
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    print('worker ', i)
    x = 0
    while x < 10:
        print(x)
        x += 1
    l.release()

if __name__ == '__main__': 
    lock = Lock()
    for num in range(50):
        Process(target=f, args=(lock, num)).start()

Worker
Worker
Worker
Worker
Worker
0
0
0
0
0
1
1
1
1
1
2
2
2
2
2
3
3
3
3
3
4
4
4
4
4
5
5
5
5
5
6
6
6
6
6
7
7
7
7
7
8
8
8
8
8
9
9
9
9
9
10
10
10
10
10
11
11
11
11
11
12
12
12
12
12
13
13
13
13
13
14
14
14
14
14
15
15
15
15
15
16
16
16
16
16
17
17
17
17
17
18
18
18
18
18
19
19
19
19
19
20
20
20
20
20
21
21
21
21
21
22
22
22
22
22
23
23
23
23
23
24
24
24
24
24
25
25
25
25
25
26
26
26
26
26
27
27
27
27
27
28
28
28
28
28
29
29
29
29
29
30
30
30
30
30
31
31
31
31
31
32
32
32
32
32
33
33
33
33
33
34
34
34
34
34
35
35
35
35
35
36
36
36
36
36
37
37
37
37
37
38
38
38
38
38
39
39
39
39
39
40
40
40
40
40
41
41
41
41
41
42
42
42
42
42
43
43
43
43
43
44
44
44
44
44
45
45
45
45
45
46
46
46
46
46
47
47
47
47
47
48
48
48
48
48
49
49
49
49
49
50
50
50
50
50
51
51
51
51
51
52
52
52
52
52
53
53
53
53
53
54
54
54
54
54
55
55
55
55
55
56
56
56
56
56
57
57
57
57
57
58
58
58
58
58
59
59
59
59
59
60
60
60
60
60
61
61
61
61
61
62
62
62
62
62
63
63
63
63
63
64
64
64
64
64
65
65
65
65
65
66
66
66
66
66
67
67
67
6

In [14]:
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)  # start 4 worker processes
    
    # print "[0, 1, 4,..., 81]" 
    print (pool.map(f, range(10)))
    # start 4 worker processes
    
    # print same numbers in arbitrary order
    for i in pool.imap_unordered(f, range(10)):
        print (i)
        
    # evaluate "f(20)" asynchronously
    res = pool.apply_async(f, (20,)) # runs in *only* one process
    print (res.get(timeout=1)) # prints "400"
    
    # evaluate "os.getpid()" asynchronously
    res = pool.apply_async(os.getpid, ()) # runs in *only* one process
    print (res.get(timeout=1)) # prints the PID of that process
    
    # launching multiple evaluations asynchronously *may* use more processes
    multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
    print ([res.get(timeout=1) for res in multiple_results])
    
    # make a single worker sleep for 10 secs
    res = pool.apply_async(time.sleep, (10,))
    try:
        print (res.get(timeout=1))
    except TimeoutError:
    print ("We lacked patience and got a multiprocessing.TimeoutError")
    


231
231
231
231
231
231
231
231
231
231
231
231
231
231
231
231
231
220
217
215
212
212
212
212
212
212
212
212
212
3
99
95
97
5
90
90
90
90
90
90
90
90
91
90
90
1
9
90
90
7
90
816
800
815
795
795
797
791
792
806
803
791
791
790
791
809
799
794
732
729
729
729
725
704
693
676
668
669
642
642
632
634
600
599
592
589
595
571
569
569
563
558
527
525
525
525
525
517
501
494
494
445
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
232
221
218
216
213
213
213
213
213
213
213
213
213
4
96
98
6
91
91
91
91
91
91
91
91
92
91
91
2
91
91
8
91
817
801
816
796
796
798
792
793
807
804
792
792
791
792
810
800
795
733
730
730
730
726
705
694
677
669
670
643
643
633
635
601
600
593
590
596
572
570
570
564
559
528
526
526
526
526
518
502
495
495
446
233
233
233
233
233
233
233
233
233
233
233
233
233
233
233
233
233
233
233
233
233
233
233
233
233
233
233


IndentationError: expected an indented block (<ipython-input-14-de7980735f5a>, line 36)

233
233
233
233
233
233
233
233
233
233
233
233
233
233
233
222
219
217
214
214
214
214
214
214
214
214
214
5
92
97
99
7
92
92
92
92
92
92
92
92
93
92
92
3
9
92
92
818
802
817
797
797
799
793
794
808
805
793
793
792
793
811
801
796
734
731
731
731
727
706
695
678
670
671
644
644
634
636
602
601
594
591
597
573
571
571
565
560
529
527
527
527
527
519
503
496
496
447
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
234
223
220
218
215
215
215
215
215
215
215
215
215
6
93
98
8
93
93
93
93
93
93
93
93
94
93
93
4
93
93
819
803
818
798
798
800
794
795
809
806
794
794
793
794
812
802
797
735
732
732
732
728
707
696
679
671
672
645
645
635
637
603
602
595
592
598
574
572
572
566
561
530
528
528
528
528
520
504
497
497
448
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
235
2

In [11]:
 #
 # A test of `multiprocessing.Pool` class
 #
 # Copyright (c) 2006-2008, R Oudkerk
 # All rights reserved.
 #
 import multiprocessing
 import time
 import random
 import sys
 #
 # Functions used by test code
 #
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
         multiprocessing.current_process().name,
         func.__name__, args, result
         )
def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

def f(x):
    return 1.0 / (x-5.0)

def pow3(x):
    return x**3

def noop(x):
    pass
#
# Test code #
def test():
    print ('cpu_count() = %d\n' % multiprocessing.cpu_count())
    
#
# Create pool
#
PROCESSES = 4
print ('Creating pool with %d processes\n' % PROCESSES)
pool = multiprocessing.Pool(PROCESSES)
print ('pool = %s' % pool)
#print
#
# Tests  

TASKS = [(mul, (i, 7)) for i in range(10)] + \ [(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS] imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

print ('Ordered results using pool.apply_async():')
for r in results:
    print ('\t', r.get())
print

print ('Ordered results using pool.imap():')
for x in imap_it:
    print ('\t', x)
print
print ('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
    print ('\t', x)
print

print ('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
    print ('\t', x)
print

#
# Simple benchmarks
#
N = 100000
print ('def pow3(x): return x**3')

t = time.time()
A = map(pow3, xrange(N))
print ('\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % (N, time.time() - t))

t = time.time()
B = pool.map(pow3, xrange(N))

print ('\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % (N, time.time() - t))

t = time.time()
C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
print ('\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s seconds' % (N, N//8, time.time() - t))

assert A == B == C, (len(A), len(B), len(C))
print

L = [None] * 1000000
print ('def noop(x): pass')
print ('L = [None] * 1000000')

t = time.time()
A = map(noop, L)
print ('\tmap(noop, L):\n\t\t%s seconds' % (time.time() - t))

t = time.time()
B = pool.map(noop, L)
print ('\tpool.map(noop, L):\n\t\t%s seconds' % (time.time() - t))

t = time.time()
C = list(pool.imap(noop, L, chunksize=len(L)//8))
print ('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % (len(L)//8, time.time() - t))

assert A == B == C, (len(A), len(B), len(C))
print
del A, B, C, L

#
# Test error handling
#
print ('Testing error handling:')

try:
    print (pool.apply(f, (5,)))
except ZeroDivisionError:
    print ('\tGot ZeroDivisionError as expected from pool.apply()')
else:
    raise AssertionError('expected ZeroDivisionError')
    
try:
    print (pool.map(f, range(10)))
except ZeroDivisionError:
    print ('\tGot ZeroDivisionError as expected from pool.map()')
else:
    raise AssertionError('expected ZeroDivisionError')
    
try:
    print (list(pool.imap(f, range(10))))
except ZeroDivisionError:
    print ('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
    raise AssertionError('expected ZeroDivisionError')
    
it = pool.imap(f, range(10))
for i in range(10):
    try:
        x = it.next()
    except ZeroDivisionError: 
        if i == 5:
            pass
    except StopIteration:
        break
    else:
        if i == 5:
            raise AssertionError('expected ZeroDivisionError')
            
assert i == 9
print ()'\tGot ZeroDivisionError as expected from IMapIterator.next()')
print

#
# Testing timeouts
#

print ('Testing ApplyResult.get() with timeout:'),
res = pool.apply_async(calculate, TASKS[0]) 
while 1:
    sys.stdout.flush()
    try:
        sys.stdout.write('\n\t%s' % res.get(0.02))
        break
    except multiprocessing.TimeoutError:
        sys.stdout.write('.')
print
print

print ('Testing IMapIterator.next() with timeout:'),
it = pool.imap(calculatestar, TASKS)
while 1:
    sys.stdout.flush()
    try:
        sys.stdout.write('\n\t%s' % it.next(0.02))
    except StopIteration:
        break
    except multiprocessing.TimeoutError:
        sys.stdout.write('.')
print
print

#
# Testing callback
#
print ('Testing callback:')

A = []
B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
r = pool.apply_async(mul, (7, 8), callback=A.append)
r.wait()

r = pool.map_async(pow3, range(10), callback=A.extend)
r.wait()
if A == B:
    print ('\tcallbacks succeeded\n')
else:
    print ('\t*** callbacks failed\n\t\t%s != %s\n' % (A, B))
    
#
# Check there are no outstanding tasks
#

assert not pool._cache, 'cache = %r' % pool._cache

#
# Check close() methods
#

print ('Testing close():')

for worker in pool._pool:
    assert worker.is_alive()
    
result = pool.apply_async(time.sleep, [0.5])
pool.close()
pool.join()

assert result.get() is None

for worker in pool._pool:
    assert not worker.is_alive() 
    
print ('\tclose() succeeded\n')

#
# Check terminate() method
#
print ('Testing terminate():')

pool = multiprocessing.Pool(2)
DELTA = 0.1
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)] pool.terminate()
pool.join()
for worker in pool._pool:
    assert not worker.is_alive()
    
print ('\tterminate() succeeded\n')

#
     # Check garbage collection
     #
print ('Testing garbage collection:')

pool = multiprocessing.Pool(2)
DELTA = 0.1
processes = pool._pool
ignore = pool.apply(pow3, [2])
results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
     results = pool = None
     time.sleep(DELTA * 2)
for worker in processes:
    assert not worker.is_alive()
    
print ('\tgarbage collection succeeded\n')

if __name__ == '__main__':
     multiprocessing.freeze_support()
    assert len(sys.argv) in (1, 2)
    if len(sys.argv) == 1 or sys.argv[1] == 'processes': 
        print (' Using processes '.center(79, '-'))
    elif sys.argv[1] == 'threads':
        print (' Using threads '.center(79, '-'))
        import multiprocessing.dummy as multiprocessing
    else:
        print ('Usage:\n\t%s [processes | threads]' % sys.argv[0])
        raise SystemExit(2)
    test()


worker  11
worker  12
worker  13
worker  14
worker  15
worker  16
worker  17
worker  18
worker  19
worker  20
worker  21
worker  22
worker  23
worker  24
worker  25
worker  26
worker  27
worker  28
worker  29
worker  30
worker  31
worker  32
worker  33
worker  34
worker  35
worker  36
worker  37
worker  38
worker  39
worker  40
worker  41
worker  42
worker  43
worker  44
worker  45
worker  46
worker  47
worker  48
worker  49
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
2
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
3
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
4
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5


IndentationError: expected an indented block (<ipython-input-11-946242807ce9>, line 15)

5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
5
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
6
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
7
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
8
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
9
