
# Multiprocessing

Demo of using multiprocessing for generating data in one process and
plotting in another.

Written by Robert Cimrman


In [None]:
import multiprocessing as mp
import time

import matplotlib.pyplot as plt
import numpy as np

# Fixing random state for reproducibility
np.random.seed(19680801)

## Processing Class

This class plots data it receives from a pipe.




In [None]:
class ProcessPlotter:
    def __init__(self):
        self.x = []
        self.y = []

    def terminate(self):
        plt.close('all')

    def call_back(self):
        while self.pipe.poll():
            command = self.pipe.recv()
            if command is None:
                self.terminate()
                return False
            else:
                self.x.append(command[0])
                self.y.append(command[1])
                self.ax.plot(self.x, self.y, 'ro')
        self.fig.canvas.draw()
        return True

    def __call__(self, pipe):
        print('starting plotter...')

        self.pipe = pipe
        self.fig, self.ax = plt.subplots()
        timer = self.fig.canvas.new_timer(interval=1000)
        timer.add_callback(self.call_back)
        timer.start()

        print('...done')
        plt.show()

## Plotting class

This class uses multiprocessing to spawn a process to run code from the
class above. When initialized, it creates a pipe and an instance of
``ProcessPlotter`` which will be run in a separate process.

When run from the command line, the parent process sends data to the spawned
process which is then plotted via the callback function specified in
``ProcessPlotter:__call__``.




In [None]:
class NBPlot:
    def __init__(self):
        self.plot_pipe, plotter_pipe = mp.Pipe()
        self.plotter = ProcessPlotter()
        self.plot_process = mp.Process(
            target=self.plotter, args=(plotter_pipe,), daemon=True)
        self.plot_process.start()

    def plot(self, finished=False):
        send = self.plot_pipe.send
        if finished:
            send(None)
        else:
            data = np.random.random(2)
            send(data)

In [None]:
def main():
    pl = NBPlot()
    for _ in range(10):
        pl.plot()
        time.sleep(0.5)
    pl.plot(finished=True)

In [None]:
if __name__ == '__main__':
    if plt.get_backend() == "MacOSX":
        mp.set_start_method("forkserver")
    main()

In [None]:
import matplotlib.pyplot as plt
import numpy as np

import multiprocessing
#multiprocessing.freeze_support() # <- may be required on windows

def plot(datax, datay, name):
    x = datax
    y = datay**2
    print(x,y)
    plt.scatter(x, y, label=name)
    plt.legend()
    plt.show()

def multiP():
    p = []
    for i in range(2):
        p.append(multiprocessing.Process(target=plot, args=(i, i, i)))
        p[i].start()
    for i in range(2):
        p[i].join()

if __name__ == "__main__": 
    # input('Value: ') 
    multiP()


In [None]:
import multiprocessing as mp
from time import sleep

class A(object):
    def __init__(self, *args, **kwargs):
        # do other stuff
        pass

    def do_something(self, i):
        sleep(0.2)
        print('%s * %s = %s' % (i, i, i*i))

    def run(self):
        processes = []

        for i in range(10):
            p = mp.Process(target=self.do_something, args=(i,))
            processes.append(p)

        [x.start() for x in processes]


if __name__ == '__main__':
    a = A()
    a.run()
    print('21')

In [1]:
from time import time
import multiprocessing as mp
from multiprocessing.pool import ThreadPool
import numpy as np
import pickle

def main():
    arr = np.ones((1024*2, 1024*2, 1024*2), dtype=np.uint8)
    expected_sum = np.sum(arr)
    print(expected_sum)
    with ThreadPool(4) as threadpool:
        start = time()
        assert (threadpool.apply(np.sum, (arr,)) == expected_sum)
        print("Thread pool4:", time() - start)
    
    with ThreadPool(2) as threadpool:
        start = time()
        assert (threadpool.apply(np.sum, (arr,)) == expected_sum)
        print("Thread pool2:", time() - start)
    
    with ThreadPool(1) as threadpool:
        start = time()
        assert (threadpool.apply(np.sum, (arr,)) == expected_sum)
        print("Thread pool1:", time() - start)

    with mp.get_context("spawn").Pool(4) as processpool:
        start = time()
        assert (processpool.apply(np.sum, (arr,)) == expected_sum)
        print("Process pool:", time() - start)

if __name__ == "__main__":
    main()


8589934592
Thread pool4: 2.652794361114502
Thread pool2: 2.5439236164093018
Thread pool1: 2.569181203842163
Process pool: 131.4846315383911


In [None]:
import numpy as np
from time import time
from multiprocessing.pool import ThreadPool

arr = np.ones((1024, 1024, 1024))

start = time()
for i in range(10):
    arr.sum()
print("Sequential:", time() - start)

expected = arr.sum()
start = time()
with ThreadPool(10) as pool:
    result = pool.map(np.sum, [arr] * 10)
    assert result == [expected] * 10
print("4 threads:", time() - start)


In [1]:
from concurrent.futures import ThreadPoolExecutor
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)


In [4]:
def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)

<Future at 0x7f4e6df1e0d0 state=running>

In [5]:
with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(sum, range(10000000000))
    print(future.result())

49999999995000000000


In [3]:
True or ''

True