# Multiprocessing Relay

This notebook explores different relay configurations.
We test pipelines with 1-1, 1-n, n-1 and n-m connections.


In [2]:
%load_ext autoreload
%autoreload 2

# setting logger name manually, __name___ is '__main__' in ipynb
LOG = 'ktz.ipynb'

In [3]:
import yaml
import logging
import logging.config

from pprint import pprint

def setup_logging():
    with open('../conf/logging.yaml', mode='r') as fd:
        conf = yaml.safe_load(fd)

    conf['handlers']['console']['formatter'] = 'plain'
    conf['loggers']['ktz'] = {'handlers': ['console']}
    conf['loggers']['root']['level'] = 'DEBUG'

    logging.config.dictConfig(conf)
    return logging.getLogger(LOG)


log = setup_logging()
log.info('hello!')

[12:55:48] [19715] ktz.ipynb - hello!


In [4]:
import time

from ktz.multiprocessing import Actor
from ktz.multiprocessing import Relay

class Producer(Actor):

    def loop(self):
        for x in range(3):
            self.send(x)


class Consumer(Actor):

    def recv(self, x):
        time.sleep(1)


class Worker(Actor):

    def recv(self, x):
        time.sleep(1)
        y = x + 10
        self.send(y)

In [5]:
# 1 - 1
relay = Relay(log=LOG)
relay.connect(Producer(), Consumer())
relay.start()

[12:55:48] [19715] ktz.multiprocessing - relay: maintaining 2 groups


[12:55:48] [19715] ktz.multiprocessing - relay: starting processes


[12:55:48] [19715] ktz.multiprocessing - relay: starting logthread


[12:55:48] [19715] ktz.multiprocessing - relay: waiting for 2 processes to finish


[12:55:48] [19715] ktz.ipynb - [group-0] (Producer-1) starting up


[12:55:48] [19715] ktz.multiprocessing - relay: waiting for Producer-1


[12:55:48] [19715] ktz.ipynb - [group-0] (Producer-1) running loop


[12:55:48] [19715] ktz.multiprocessing - relay: waiting for Consumer-2


[12:55:48] [19715] ktz.ipynb - [group-0] (Producer-1) leaving loop


[12:55:48] [19715] ktz.ipynb - [group-0] (Producer-1) shutting down


[12:55:48] [19715] ktz.ipynb - [group-0] (Producer-1) shut down complete


[12:55:48] [19715] ktz.ipynb - [group-1] (Consumer-2) starting up


[12:55:48] [19715] ktz.ipynb - [group-1] (Consumer-2) running loop


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-2) received 1/1 poison pills


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-2) leaving loop


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-2) shutting down


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-2) shut down complete


[12:55:51] [19715] ktz.multiprocessing - relay: waiting for log thread


[12:55:51] [19715] ktz.multiprocessing - relay: finished, exiting


In [6]:
# 1 - n

relay = Relay(log=LOG)
relay.connect(Producer(), [Consumer() for _ in range(5)])
relay.start()

[12:55:51] [19715] ktz.multiprocessing - relay: maintaining 2 groups


[12:55:51] [19715] ktz.multiprocessing - relay: starting processes


[12:55:51] [19715] ktz.multiprocessing - relay: starting logthread


[12:55:51] [19715] ktz.ipynb - [group-0] (Producer-3) starting up


[12:55:51] [19715] ktz.multiprocessing - relay: waiting for 6 processes to finish


[12:55:51] [19715] ktz.ipynb - [group-0] (Producer-3) running loop


[12:55:51] [19715] ktz.multiprocessing - relay: waiting for Producer-3


[12:55:51] [19715] ktz.ipynb - [group-0] (Producer-3) leaving loop


[12:55:51] [19715] ktz.multiprocessing - relay: waiting for Consumer-4


[12:55:51] [19715] ktz.ipynb - [group-0] (Producer-3) shutting down


[12:55:51] [19715] ktz.ipynb - [group-0] (Producer-3) shut down complete


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-4) starting up


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-4) running loop


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-5) starting up


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-5) running loop


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-6) starting up


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-6) running loop


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-7) starting up


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-7) running loop


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-7) received 1/1 poison pills


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-7) leaving loop


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-7) shutting down


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-7) shut down complete


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-8) starting up


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-8) running loop


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-8) leaving loop


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-8) shutting down


[12:55:51] [19715] ktz.ipynb - [group-1] (Consumer-8) shut down complete


[12:55:52] [19715] ktz.ipynb - [group-1] (Consumer-4) leaving loop


[12:55:52] [19715] ktz.ipynb - [group-1] (Consumer-4) shutting down


[12:55:52] [19715] ktz.ipynb - [group-1] (Consumer-4) shut down complete


[12:55:52] [19715] ktz.multiprocessing - relay: waiting for Consumer-5


[12:55:52] [19715] ktz.ipynb - [group-1] (Consumer-5) leaving loop


[12:55:52] [19715] ktz.multiprocessing - relay: waiting for Consumer-6


[12:55:52] [19715] ktz.ipynb - [group-1] (Consumer-5) shutting down


[12:55:52] [19715] ktz.multiprocessing - relay: waiting for Consumer-7


[12:55:52] [19715] ktz.ipynb - [group-1] (Consumer-5) shut down complete


[12:55:52] [19715] ktz.multiprocessing - relay: waiting for Consumer-8


[12:55:52] [19715] ktz.ipynb - [group-1] (Consumer-6) leaving loop


[12:55:52] [19715] ktz.multiprocessing - relay: waiting for log thread


[12:55:52] [19715] ktz.ipynb - [group-1] (Consumer-6) shutting down


[12:55:52] [19715] ktz.ipynb - [group-1] (Consumer-6) shut down complete


[12:55:52] [19715] ktz.multiprocessing - relay: finished, exiting


In [7]:
# n - 1
relay = Relay(log=LOG)
relay.connect([Producer() for _ in range(3)], Consumer())
relay.start()


[12:55:52] [19715] ktz.multiprocessing - relay: maintaining 2 groups


[12:55:52] [19715] ktz.multiprocessing - relay: starting processes


[12:55:52] [19715] ktz.multiprocessing - relay: starting logthread


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-9) starting up


[12:55:52] [19715] ktz.multiprocessing - relay: waiting for 4 processes to finish


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-9) running loop


[12:55:52] [19715] ktz.multiprocessing - relay: waiting for Producer-9


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-9) leaving loop


[12:55:52] [19715] ktz.multiprocessing - relay: waiting for Producer-10


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-9) shutting down


[12:55:52] [19715] ktz.multiprocessing - relay: waiting for Producer-11


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-9) shut down complete


[12:55:52] [19715] ktz.multiprocessing - relay: waiting for Consumer-12


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-10) starting up


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-10) running loop


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-10) leaving loop


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-10) shutting down


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-10) shut down complete


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-11) starting up


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-11) running loop


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-11) leaving loop


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-11) shutting down


[12:55:52] [19715] ktz.ipynb - [group-0] (Producer-11) shut down complete


[12:55:52] [19715] ktz.ipynb - [group-1] (Consumer-12) starting up


[12:55:52] [19715] ktz.ipynb - [group-1] (Consumer-12) running loop


[12:55:55] [19715] ktz.ipynb - [group-1] (Consumer-12) received 1/3 poison pills


[12:55:58] [19715] ktz.ipynb - [group-1] (Consumer-12) received 2/3 poison pills


[12:56:01] [19715] ktz.ipynb - [group-1] (Consumer-12) received 3/3 poison pills


[12:56:01] [19715] ktz.ipynb - [group-1] (Consumer-12) leaving loop


[12:56:01] [19715] ktz.ipynb - [group-1] (Consumer-12) shutting down


[12:56:01] [19715] ktz.ipynb - [group-1] (Consumer-12) shut down complete


[12:56:01] [19715] ktz.multiprocessing - relay: waiting for log thread


[12:56:01] [19715] ktz.multiprocessing - relay: finished, exiting


In [8]:
# 1 - n - 1

relay = Relay(maxsize=2, log=LOG)
relay.connect(
    Producer(),
    [Worker() for _ in range(2)],
    Consumer(),
)

relay.start()

[12:56:01] [19715] ktz.multiprocessing - relay: maintaining 3 groups


[12:56:01] [19715] ktz.multiprocessing - relay: starting processes


[12:56:01] [19715] ktz.multiprocessing - relay: starting logthread


[12:56:01] [19715] ktz.ipynb - [group-0] (Producer-13) starting up


[12:56:01] [19715] ktz.multiprocessing - relay: waiting for 4 processes to finish


[12:56:01] [19715] ktz.ipynb - [group-0] (Producer-13) running loop


[12:56:01] [19715] ktz.multiprocessing - relay: waiting for Producer-13


[12:56:01] [19715] ktz.ipynb - [group-1] (Worker-14) starting up


[12:56:01] [19715] ktz.multiprocessing - relay: waiting for Worker-14


[12:56:01] [19715] ktz.ipynb - [group-1] (Worker-14) running loop


[12:56:01] [19715] ktz.ipynb - [group-0] (Producer-13) leaving loop


[12:56:01] [19715] ktz.ipynb - [group-1] (Worker-15) starting up


[12:56:01] [19715] ktz.ipynb - [group-1] (Worker-15) running loop


[12:56:01] [19715] ktz.ipynb - [group-0] (Producer-13) shutting down


[12:56:01] [19715] ktz.ipynb - [group-0] (Producer-13) shut down complete


[12:56:01] [19715] ktz.ipynb - [group-2] (Consumer-16) starting up


[12:56:01] [19715] ktz.ipynb - [group-2] (Consumer-16) running loop


[12:56:02] [19715] ktz.ipynb - [group-1] (Worker-15) received 1/1 poison pills


[12:56:02] [19715] ktz.ipynb - [group-1] (Worker-15) leaving loop


[12:56:02] [19715] ktz.ipynb - [group-1] (Worker-15) shutting down


[12:56:02] [19715] ktz.ipynb - [group-1] (Worker-15) shut down complete


[12:56:03] [19715] ktz.ipynb - [group-1] (Worker-14) leaving loop


[12:56:04] [19715] ktz.ipynb - [group-2] (Consumer-16) received 1/2 poison pills


[12:56:04] [19715] ktz.ipynb - [group-1] (Worker-14) shutting down


[12:56:04] [19715] ktz.ipynb - [group-1] (Worker-14) shut down complete


[12:56:04] [19715] ktz.multiprocessing - relay: waiting for Worker-15


[12:56:04] [19715] ktz.multiprocessing - relay: waiting for Consumer-16


[12:56:05] [19715] ktz.ipynb - [group-2] (Consumer-16) received 2/2 poison pills


[12:56:05] [19715] ktz.ipynb - [group-2] (Consumer-16) leaving loop


[12:56:05] [19715] ktz.ipynb - [group-2] (Consumer-16) shutting down


[12:56:05] [19715] ktz.ipynb - [group-2] (Consumer-16) shut down complete


[12:56:05] [19715] ktz.multiprocessing - relay: waiting for log thread


[12:56:05] [19715] ktz.multiprocessing - relay: finished, exiting


In [9]:
# 1 - n - m - 1

relay = Relay(maxsize=2, log=LOG)
relay.connect(
    Producer(),
    [Worker() for _ in range(2)],
    [Worker() for _ in range(3)],
    Consumer(),
)

relay.start()

[12:56:05] [19715] ktz.multiprocessing - relay: maintaining 4 groups


[12:56:05] [19715] ktz.multiprocessing - relay: starting processes


[12:56:05] [19715] ktz.multiprocessing - relay: starting logthread


[12:56:05] [19715] ktz.ipynb - [group-0] (Producer-17) starting up


[12:56:05] [19715] ktz.multiprocessing - relay: waiting for 7 processes to finish


[12:56:05] [19715] ktz.ipynb - [group-0] (Producer-17) running loop


[12:56:05] [19715] ktz.multiprocessing - relay: waiting for Producer-17


[12:56:05] [19715] ktz.ipynb - [group-1] (Worker-18) starting up


[12:56:05] [19715] ktz.multiprocessing - relay: waiting for Worker-18


[12:56:05] [19715] ktz.ipynb - [group-1] (Worker-18) running loop


[12:56:05] [19715] ktz.ipynb - [group-0] (Producer-17) leaving loop


[12:56:05] [19715] ktz.ipynb - [group-1] (Worker-19) starting up


[12:56:05] [19715] ktz.ipynb - [group-1] (Worker-19) running loop


[12:56:05] [19715] ktz.ipynb - [group-0] (Producer-17) shutting down


[12:56:05] [19715] ktz.ipynb - [group-0] (Producer-17) shut down complete


[12:56:05] [19715] ktz.ipynb - [group-2] (Worker-20) starting up


[12:56:05] [19715] ktz.ipynb - [group-2] (Worker-20) running loop


[12:56:05] [19715] ktz.ipynb - [group-2] (Worker-21) starting up


[12:56:05] [19715] ktz.ipynb - [group-2] (Worker-21) running loop


[12:56:05] [19715] ktz.ipynb - [group-2] (Worker-22) starting up


[12:56:05] [19715] ktz.ipynb - [group-2] (Worker-22) running loop


[12:56:05] [19715] ktz.ipynb - [group-3] (Consumer-23) starting up


[12:56:05] [19715] ktz.ipynb - [group-3] (Consumer-23) running loop


[12:56:06] [19715] ktz.ipynb - [group-1] (Worker-19) received 1/1 poison pills


[12:56:06] [19715] ktz.ipynb - [group-1] (Worker-19) leaving loop


[12:56:06] [19715] ktz.ipynb - [group-1] (Worker-19) shutting down


[12:56:06] [19715] ktz.ipynb - [group-1] (Worker-19) shut down complete


[12:56:06] [19715] ktz.ipynb - [group-2] (Worker-22) received 1/2 poison pills


[12:56:07] [19715] ktz.ipynb - [group-1] (Worker-18) leaving loop


[12:56:07] [19715] ktz.ipynb - [group-1] (Worker-18) shutting down


[12:56:07] [19715] ktz.ipynb - [group-1] (Worker-18) shut down complete


[12:56:07] [19715] ktz.multiprocessing - relay: waiting for Worker-19


[12:56:07] [19715] ktz.ipynb - [group-2] (Worker-20) received 2/2 poison pills


[12:56:07] [19715] ktz.multiprocessing - relay: waiting for Worker-20


[12:56:07] [19715] ktz.ipynb - [group-2] (Worker-20) leaving loop


[12:56:07] [19715] ktz.ipynb - [group-2] (Worker-21) leaving loop


[12:56:07] [19715] ktz.ipynb - [group-2] (Worker-21) shutting down


[12:56:07] [19715] ktz.ipynb - [group-2] (Worker-21) shut down complete


[12:56:08] [19715] ktz.ipynb - [group-2] (Worker-20) shutting down


[12:56:08] [19715] ktz.ipynb - [group-2] (Worker-20) shut down complete


[12:56:08] [19715] ktz.multiprocessing - relay: waiting for Worker-21


[12:56:08] [19715] ktz.multiprocessing - relay: waiting for Worker-22


[12:56:09] [19715] ktz.ipynb - [group-3] (Consumer-23) received 1/3 poison pills


[12:56:09] [19715] ktz.ipynb - [group-3] (Consumer-23) received 2/3 poison pills


[12:56:09] [19715] ktz.ipynb - [group-2] (Worker-22) leaving loop


[12:56:09] [19715] ktz.ipynb - [group-2] (Worker-22) shutting down


[12:56:09] [19715] ktz.ipynb - [group-2] (Worker-22) shut down complete


[12:56:09] [19715] ktz.multiprocessing - relay: waiting for Consumer-23


[12:56:10] [19715] ktz.ipynb - [group-3] (Consumer-23) received 3/3 poison pills


[12:56:10] [19715] ktz.ipynb - [group-3] (Consumer-23) leaving loop


[12:56:10] [19715] ktz.ipynb - [group-3] (Consumer-23) shutting down


[12:56:10] [19715] ktz.ipynb - [group-3] (Consumer-23) shut down complete


[12:56:10] [19715] ktz.multiprocessing - relay: waiting for log thread


[12:56:10] [19715] ktz.multiprocessing - relay: finished, exiting
