Parallel Processing in Python
====

You will almost always start from the question, "How can I break up this problem into smaller pieces that can run concurrently?"

Once you have an answer to that question, there are a few Python tools that can help implement that answer.

Multiprocessing
----

In [1]:
import multiprocessing
import time
import numpy.random as rand

data = [(a,rand.uniform(0,1)) for a in 'abcdefghijklmnop']

def mp_worker(data):
    inputs, the_time = data
    print(" Processs %s\tWaiting %s seconds" % (inputs, the_time))
    time.sleep(float(the_time))
    print(" Process %s\tDONE" % inputs)
    return inputs.upper()


In [2]:
data

[('a', 0.6272539437149072),
 ('b', 0.8717696777262646),
 ('c', 0.004825398481090426),
 ('d', 0.194154897686166),
 ('e', 0.6205979576664783),
 ('f', 0.025414608022270024),
 ('g', 0.771179986873319),
 ('h', 0.33821362908093056),
 ('i', 0.7762389117777398),
 ('j', 0.778520222706149),
 ('k', 0.6397020436509635),
 ('l', 0.8360742019107122),
 ('m', 0.22197773161559575),
 ('n', 0.24335924697744327),
 ('o', 0.7442893913319537),
 ('p', 0.7000924320502633)]

In [3]:
[ mp_worker(d) for d in data ]

 Processs a	Waiting 0.6272539437149072 seconds
 Process a	DONE
 Processs b	Waiting 0.8717696777262646 seconds
 Process b	DONE
 Processs c	Waiting 0.004825398481090426 seconds
 Process c	DONE
 Processs d	Waiting 0.194154897686166 seconds
 Process d	DONE
 Processs e	Waiting 0.6205979576664783 seconds
 Process e	DONE
 Processs f	Waiting 0.025414608022270024 seconds
 Process f	DONE
 Processs g	Waiting 0.771179986873319 seconds
 Process g	DONE
 Processs h	Waiting 0.33821362908093056 seconds
 Process h	DONE
 Processs i	Waiting 0.7762389117777398 seconds
 Process i	DONE
 Processs j	Waiting 0.778520222706149 seconds
 Process j	DONE
 Processs k	Waiting 0.6397020436509635 seconds
 Process k	DONE
 Processs l	Waiting 0.8360742019107122 seconds
 Process l	DONE
 Processs m	Waiting 0.22197773161559575 seconds
 Process m	DONE
 Processs n	Waiting 0.24335924697744327 seconds
 Process n	DONE
 Processs o	Waiting 0.7442893913319537 seconds
 Process o	DONE
 Processs p	Waiting 0.7000924320502633 seconds
 Pro

['A',
 'B',
 'C',
 'D',
 'E',
 'F',
 'G',
 'H',
 'I',
 'J',
 'K',
 'L',
 'M',
 'N',
 'O',
 'P']

In [4]:
# Using a multiprocess pool

p = multiprocessing.Pool(3)
p.map(mp_worker, data)

 Processs a	Waiting 0.6272539437149072 seconds Processs e	Waiting 0.6205979576664783 seconds Processs c	Waiting 0.004825398481090426 seconds


 Process c	DONE
 Processs d	Waiting 0.194154897686166 seconds
 Process d	DONE
 Processs g	Waiting 0.771179986873319 seconds
 Process e	DONE
 Processs f	Waiting 0.025414608022270024 seconds
 Process a	DONE
 Processs b	Waiting 0.8717696777262646 seconds
 Process f	DONE
 Processs i	Waiting 0.7762389117777398 seconds
 Process g	DONE
 Processs h	Waiting 0.33821362908093056 seconds
 Process h	DONE
 Processs k	Waiting 0.6397020436509635 seconds
 Process i	DONE
 Processs j	Waiting 0.778520222706149 seconds
 Process b	DONE
 Processs m	Waiting 0.22197773161559575 seconds
 Process m	DONE
 Processs n	Waiting 0.24335924697744327 seconds
 Process k	DONE
 Processs l	Waiting 0.8360742019107122 seconds Process n	DONE

 Processs o	Waiting 0.7442893913319537 seconds
 Process j	DONE
 Process o	DONE
 Processs p	Waiting 0.7000924320502633 seconds
 Process l	DONE
 Pro

['A',
 'B',
 'C',
 'D',
 'E',
 'F',
 'G',
 'H',
 'I',
 'J',
 'K',
 'L',
 'M',
 'N',
 'O',
 'P']

In [5]:
import numpy as np

def bigpower(power):
    d = np.random.randn(100000000)**power
    print("Raising random array to the {0}th power".format(power))
    return d.mean()

data = [1, 2, 12, 15]

In [7]:
p = multiprocessing.Pool(3)
p.map(bigpower, data)

Raising random array to the 1th power
Raising random array to the 2th power
Raising random array to the 12th power
Raising random array to the 15th power


[0.00012718819504248534,
 0.9997271917541751,
 10366.223031436715,
 -5354.697182884281]

Threading
----

Threads are lighter-weight since they share the Python interpreter and can sometimes share data. But mind the GIL!

In [8]:
import threading
import queue

In [9]:
q = queue.Queue()

In [10]:
q.put('foo')

In [11]:
q.put(5)

In [12]:
q.put('even more')

In [13]:
q.get(block=False)

'foo'

In [14]:
def work():
    q.put(np.random.randn(1000))

In [15]:
t = threading.Thread(target=work)

In [16]:
t

<Thread(Thread-10, initial)>

In [17]:
t.start()

In [18]:
q.get(block=False)

5

In [19]:
multiprocessing.pool.ThreadPool?

[0;31mInit signature:[0m
[0mmultiprocessing[0m[0;34m.[0m[0mpool[0m[0;34m.[0m[0mThreadPool[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mprocesses[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0minitializer[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0minitargs[0m[0;34m=[0m[0;34m([0m[0;34m)[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m      Class which supports an async version of applying functions to arguments.
[0;31mFile:[0m           /common/software/install/migrated/anaconda/python3-2020.07-mamba/lib/python3.8/multiprocessing/pool.py
[0;31mType:[0m           type
[0;31mSubclasses:[0m     


In [20]:
p = multiprocessing.pool.ThreadPool(3)
p.map(bigpower, data)

Raising random array to the 1th power
Raising random array to the 2th power
Raising random array to the 12th power
Raising random array to the 15th power


[0.00012698720232404945,
 0.9999282098733709,
 10335.03917034152,
 11702.444529421553]

Dask
---

Higher level abstractions are available!

In [21]:
import numpy as np
import dask.array as da
import memory_profiler

In [22]:
Y = da.random.normal(size=(1000, 1000),
                     chunks=(100, 100))

Y

Unnamed: 0,Array,Chunk
Bytes,8.00 MB,80.00 kB
Shape,"(1000, 1000)","(100, 100)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 8.00 MB 80.00 kB Shape (1000, 1000) (100, 100) Count 100 Tasks 100 Chunks Type float64 numpy.ndarray",1000  1000,

Unnamed: 0,Array,Chunk
Bytes,8.00 MB,80.00 kB
Shape,"(1000, 1000)","(100, 100)"
Count,100 Tasks,100 Chunks
Type,float64,numpy.ndarray


In [23]:
mu = Y.mean(axis=0)
mu

Unnamed: 0,Array,Chunk
Bytes,8.00 kB,800 B
Shape,"(1000,)","(100,)"
Count,240 Tasks,10 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 8.00 kB 800 B Shape (1000,) (100,) Count 240 Tasks 10 Chunks Type float64 numpy.ndarray",1000  1,

Unnamed: 0,Array,Chunk
Bytes,8.00 kB,800 B
Shape,"(1000,)","(100,)"
Count,240 Tasks,10 Chunks
Type,float64,numpy.ndarray


In [27]:
mu.sum()

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,254 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Count 254 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,254 Tasks,1 Chunks
Type,float64,numpy.ndarray


Note that the computation hasn't actually happened yet...

In [28]:
mu[0].compute()

0.018836592200720988

In [29]:
from dask.diagnostics import ProgressBar

with ProgressBar():
    mu = Y.mean().sum().compute()

[########################################] | 100% Completed |  0.2s


In [30]:
mu

0.000827160965970755

Another great feature here is that `dask` will automatically use a pool of threads or processes to evaluate the resulting task graph using parallelism.

To scale up further, including across multiple compute nodes, the Dask project supplies additional tools. In an environment like ours, use `dask.distributed` to manage execution, and `dask_jobqueue` to interface to the Slurm scheduler. See here for a useful walkthrough: 

https://docs.dask.org/en/stable/deploying-hpc.html