# Лекция 12: параллелизм и Python - продолжение

## Python multiprocessing

* Ввиду ограничений GIL стандартного интерпретатора в Python принято использовать модуль multiprocessing для реализации параллельного выполнения кода.
* Модуль даёт необходимые инструменты для запуска выполнения в нескольких процессах, передачи данных и синхронизации между ними.
* Во всех примерах ниже вывод на stdout из разных процессов может быть перемешан, если мы не синхронизируем это явно.

#### Немного про fork

* fork в Unix системах - операция по клонированию текущего процесса
* После вызова fork() происходит создание отдельного процесса с копией адресного пространства первого.
* Созданный процесс будет выполняться начиная с инструкции непосредственно после вызова fork().
* В Python в модуле os есть функция fork().
* Дочерний процесс в результате вызова получает свой PID (process identifier).
* C помощью возвращаемого значения можно понять где мы оказались:
  * 0 - в дочернем.
  * \> 0 - в родительском (значение - pid дочернего для нас).
  * < 0 - ошибка.
* https://docs.python.org/3/library/os.html#os.fork
* http://www.python-course.eu/forking.php
* https://mildopinions.wordpress.com/2008/05/31/python-and-parallel-programming/

In [1]:
import os

def child():
   print('\nA new child ',  os.getpid())
   os._exit(0)  

def parent():
   while True:
      newpid = os.fork()

      if newpid == 0:
         child()
      else:
         pids = (os.getpid(), newpid)
         print("parent: %d, child: %d\n" % pids)
      reply = input("q for quit / c for new fork")

      if reply == 'c': 
          continue
      else:
          break

parent()

parent: 10905, child: 10920


A new child  10920
q for quit / c for new forkc
parent: 10905, child: 10926


A new child  10926
q for quit / c for new forkc
parent: 10905, child: 10931


A new child  10931
q for quit / c for new forkc
parent: 10905, child: 10936


A new child  10936
q for quit / c for new forkq


##### sys.exit vs os._exit

* Специфичные для Python-мира функции.
* sys.exit - кросплатформенный вызов exit, аргументом передаём код возврата.
* sys.exit выполняет дополнительные действия по выводу из буфера stdio (flush stdio), вызову обработчиков по atexit и т.д.
* os._exit - unix-специфичный вызов exit, аргументом передаём код возврата, выполняет минимум дополнительных действий в userspace, например, закрытие файловых дескрипторов и некоторые другие.
* os._exit следует использовать только для дочерних процессов в результате fork.
* https://docs.python.org/3/library/os.html#os._exit

#### Простой пример

С выводом на экран есть артефакты в многих примерах ниже (нет синхронизаций в этом у отдельных процессов, IPython только усугубляет перемешивание сообщений).

In [2]:
import multiprocessing

def worker():
    """worker function"""
    print('Worker')
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

Worker
Worker
Worker
Worker
Worker


#### Передача аргументов

* Для передачи аргументов данные необходимо сериализовать через pickle.
* Объект должен поддерживать синхронизацию через этот модуль, конвертация пройдёт автоматически.
* Есть много подводных камней с некоторыми типами данных.

In [3]:
def worker(num):
    """thread worker function"""
    print('Worker:', num)
    return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

Worker: 0
Worker: 2
Worker: 3
Worker: 1


#### multiprocessing и импортируемость

* Для работы multiprocessing текущий модуль (модуль содержащий целевую функцию) должен быть импортируемым.
* Поэтому функция main должна быть обязательно защищена проверкой, которую рассматривали ранее.
* Альтернатива: определить целевую функцию в другом модуле (не там, где main).

#### Определяем текущий процесс

In [4]:
import multiprocessing
import time

def worker():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(2)
    print(name, 'Exiting')

def my_service():
    name = multiprocessing.current_process().name
    print(name, 'Starting')
    time.sleep(3)
    print(name, 'Exiting')

if __name__ == '__main__':
    service = multiprocessing.Process(name='my_service', target=my_service)
    worker_1 = multiprocessing.Process(name='worker 1', target=worker)
    worker_2 = multiprocessing.Process(target=worker) # use default name

    worker_1.start()
    worker_2.start()
    service.start()

worker 1 Starting
Process-13 Starting


#### Процессы-демоны

* По-умолчанию основная программа не завершится, пока все порождённые не завершатся.
* Процессы также можно делать демонами.

In [5]:
import multiprocessing
import time
import sys

def daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    time.sleep(2)
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()

def non_daemon():
    p = multiprocessing.current_process()
    print('Starting:', p.name, p.pid)
    sys.stdout.flush()
    print('Exiting :', p.name, p.pid)
    sys.stdout.flush()

if __name__ == '__main__':
    d = multiprocessing.Process(name='daemon', target=daemon)
    d.daemon = True

    n = multiprocessing.Process(name='non-daemon', target=non_daemon)
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

Starting: daemon 10982


#### Ждём процессы и join

Опять же, всё сильно напоминает threading - интерфейсы специально так сделаны.

In [6]:
import multiprocessing
import time
import sys

def daemon():
    print('Starting:', multiprocessing.current_process().name)
    time.sleep(2)
    print('Exiting :', multiprocessing.current_process().name)

def non_daemon():
    print('Starting:', multiprocessing.current_process().name)
    print('Exiting :', multiprocessing.current_process().name)

if __name__ == '__main__':
    d = multiprocessing.Process(name='daemon', target=daemon)
    d.daemon = True

    n = multiprocessing.Process(name='non-daemon', target=non_daemon)
    n.daemon = False

    d.start()
    n.start()

    d.join(1)
    print('d.is_alive()', d.is_alive())
    n.join()

Starting: daemon
Starting: non-daemon
Exiting : non-daemon
d.is_alive() True


#### Завершаем процессы

* Лучше передать процессу сообщение, чтобы он завершился (poison pill method).
* Если не подходит (завис, например), то можно вызвать принудительный terminate().

In [7]:
import multiprocessing
import time

def slow_worker():
    print('Starting worker')
    time.sleep(0.1)
    print('Finished worker')

if __name__ == '__main__':
    p = multiprocessing.Process(target=slow_worker)
    print('BEFORE:', p, p.is_alive())
    
    p.start()
    print('DURING:', p, p.is_alive())
    
    p.terminate()
    print('TERMINATED:', p, p.is_alive())

    p.join()
    print('JOINED:', p, p.is_alive())

BEFORE: <Process(Process-18, initial)> False
DURING: <Process(Process-18, started)> True
TERMINATED: <Process(Process-18, started)> True
JOINED: <Process(Process-18, stopped[SIGTERM])> False


Важно вызвать join(), чтобы дать время на обновление состояние терминированного процесса в соответствующем объекте.

#### Обработка кодов возврата

* Как и другие процессы, процессы создаваемые через multiprocessing имеют коды возврата (код, с которым завершилось выполнение).
* Возможные значения:
  * exitcode == 0 - нет ошибки
  * exitcode > 0 - была ошибка, процесс вернул этот код
  * exitcode < 0 - процесс был убит с сигналом -1 * exitcode
* Процессы, закончившиеся из-за Exception автоматически имеют exitcode равный единице.

In [8]:
import multiprocessing
import sys
import time

def exit_error():
    sys.exit(1)

def exit_ok():
    return

def return_value():
    return 1

def raises():
    raise RuntimeError('There was an error!')

def terminated():
    time.sleep(3)

if __name__ == '__main__':
    jobs = []
    for f in [exit_error, exit_ok, return_value, raises, terminated]:
        print('Starting process for', f.__name__)
        j = multiprocessing.Process(target=f, name=f.__name__)
        jobs.append(j)
        j.start()
        
    jobs[-1].terminate()

    for j in jobs:
        j.join()
        print('%s.exitcode = %s' % (j.name, j.exitcode))

Starting process for exit_error
Starting process for exit_ok
Starting process for return_value
Starting process for raises
Starting process for terminated


Process raises:


exit_error.exitcode = 1
exit_ok.exitcode = 0
return_value.exitcode = 0


Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 249, in _bootstrap
    self.run()
  File "/usr/lib/python3.5/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-8-b699d4f1b0af>", line 15, in raises
    raise RuntimeError('There was an error!')
RuntimeError: There was an error!


raises.exitcode = 1
terminated.exitcode = -15


#### Логгирование multiprocessing

* Функция log_to_stderr() устанавливает логгер в logging и добавляет туда же обработчик, записывающий сообщения в stderr.
* По-умолчанию уровень там стоит уровень NOTSET.

In [9]:
import multiprocessing
import logging
import sys

def worker():
    print('Doing some work')
    sys.stdout.flush()

if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.DEBUG)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

[INFO/Process-24] child process calling self.run()


Doing some work


[INFO/Process-24] process shutting down
[DEBUG/Process-24] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-24] running the remaining "atexit" finalizers
[INFO/Process-24] process exiting with exitcode 0


#### Наследуемся от Process

In [10]:
import multiprocessing

class Worker(multiprocessing.Process):

    def run(self):
        print('In %s' % self.name)
        return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

[INFO/Worker-25] child process calling self.run()
[INFO/Worker-27] child process calling self.run()
[INFO/Worker-28] child process calling self.run()


In Worker-25


[INFO/Worker-26] child process calling self.run()


In Worker-28


[INFO/Worker-25] process shutting down
[INFO/Worker-29] child process calling self.run()


In Worker-29
In Worker-27
In Worker-26


[DEBUG/Worker-25] running all "atexit" finalizers with priority >= 0
[INFO/Worker-26] process shutting down
[DEBUG/Worker-26] running all "atexit" finalizers with priority >= 0
[INFO/Worker-28] process shutting down
[DEBUG/Worker-28] running all "atexit" finalizers with priority >= 0
[DEBUG/Worker-25] running the remaining "atexit" finalizers
[INFO/Worker-27] process shutting down
[DEBUG/Worker-26] running the remaining "atexit" finalizers
[INFO/Worker-26] process exiting with exitcode 0
[INFO/Worker-29] process shutting down
[DEBUG/Worker-29] running all "atexit" finalizers with priority >= 0
[DEBUG/Worker-27] running all "atexit" finalizers with priority >= 0
[DEBUG/Worker-28] running the remaining "atexit" finalizers
[INFO/Worker-25] process exiting with exitcode 0
[INFO/Worker-28] process exiting with exitcode 0
[DEBUG/Worker-29] running the remaining "atexit" finalizers
[DEBUG/Worker-27] running the remaining "atexit" finalizers
[INFO/Worker-29] process exiting with exitcode 0
[IN

#### Обмен сообщениями между процессами

* multiprocessing.Queue - очередь для обмена сообщениями между процессами (имеет нужные синхронизации).
* Объект, передавайемый в Queue должен уметь сериализовываться и десериализовываться через pickle (pickle-able).

Простой пример:

In [11]:
import multiprocessing

class MyFancyClass:
    
    def __init__(self, name):
        self.name = name
    
    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print('Doing something fancy in %s for %s!' % (proc_name, self.name))


def worker(q):
    obj = q.get()
    obj.do_something()


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    
    queue.put(MyFancyClass('Fancy Dan'))
    
    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

[DEBUG/MainProcess] created semlock with handle 140091336916992
[DEBUG/MainProcess] created semlock with handle 140091336912896
[DEBUG/MainProcess] created semlock with handle 140091336908800
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] Queue._start_thread()
[DEBUG/Process-30] Queue._after_fork()
[DEBUG/MainProcess] doing self._thread.start()
[INFO/Process-30] child process calling self.run()
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()
[DEBUG/MainProcess] telling queue thread to quit
[DEBUG/MainProcess] Queue.join_thread()
[DEBUG/MainProcess] feeder thread got sentinel -- exiting


Doing something fancy in Process-30 for Fancy Dan!


[INFO/Process-30] process shutting down
[DEBUG/Process-30] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-30] running the remaining "atexit" finalizers
[INFO/Process-30] process exiting with exitcode 0


Рассмотрим более сложный пример с задачами и поставщиками задач:

In [12]:
import multiprocessing
import time

class Consumer(multiprocessing.Process):
    
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print('%s: Exiting' % proc_name)
                self.task_queue.task_done()
                break
            print('%s: %s' % (proc_name, next_task))
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)


if __name__ == '__main__':
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()
    
    # Start consumers
    num_consumers = multiprocessing.cpu_count() * 2
    print('Creating %d consumers' % num_consumers)
    consumers = [ Consumer(tasks, results)
                  for i in range(num_consumers) ]
    for w in consumers:
        w.start()
    
    # Enqueue jobs
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))
    
    # Add a poison pill for each consumer
    for i in range(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()
    
    # Start printing results
    while num_jobs:
        result = results.get()
        print('Result:', result)
        num_jobs -= 1

[DEBUG/MainProcess] created semlock with handle 140091336904704
[DEBUG/MainProcess] created semlock with handle 140091336900608
[DEBUG/MainProcess] created semlock with handle 140091336896512
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] created semlock with handle 140091336892416
[DEBUG/MainProcess] created semlock with handle 140091336888320
[DEBUG/MainProcess] created semlock with handle 140091336884224
[DEBUG/MainProcess] created semlock with handle 140091336880128
[DEBUG/MainProcess] created semlock with handle 140091336876032
[DEBUG/MainProcess] created semlock with handle 140091336871936
[DEBUG/MainProcess] created semlock with handle 140091336867840
[DEBUG/MainProcess] created semlock with handle 140091336863744
[DEBUG/MainProcess] Queue._after_fork()


Creating 8 consumers


[DEBUG/Consumer-31] Queue._after_fork()
[DEBUG/Consumer-31] Queue._after_fork()
[DEBUG/Consumer-31] Queue._after_fork()
[DEBUG/Consumer-32] Queue._after_fork()
[INFO/Consumer-31] child process calling self.run()
[DEBUG/Consumer-32] Queue._after_fork()
[DEBUG/Consumer-32] Queue._after_fork()
[INFO/Consumer-32] child process calling self.run()
[DEBUG/Consumer-34] Queue._after_fork()
[DEBUG/Consumer-33] Queue._after_fork()
[DEBUG/Consumer-34] Queue._after_fork()
[DEBUG/Consumer-35] Queue._after_fork()
[DEBUG/Consumer-35] Queue._after_fork()
[DEBUG/Consumer-33] Queue._after_fork()
[DEBUG/Consumer-34] Queue._after_fork()
[DEBUG/Consumer-35] Queue._after_fork()
[DEBUG/Consumer-33] Queue._after_fork()
[INFO/Consumer-35] child process calling self.run()
[DEBUG/MainProcess] Queue._start_thread()
[DEBUG/Consumer-36] Queue._after_fork()
[INFO/Consumer-33] child process calling self.run()
[DEBUG/MainProcess] doing self._thread.start()
[INFO/Consumer-34] child process calling self.run()
[DEBUG/Cons

Consumer-33: 2 * 2
Consumer-32: 3 * 3
Consumer-35: 4 * 4
Consumer-37: 7 * 7
Consumer-36: 6 * 6
Consumer-34: 5 * 5
Consumer-38: 1 * 1
Consumer-31: 0 * 0


[DEBUG/Consumer-32] Queue._start_thread()
[DEBUG/Consumer-32] doing self._thread.start()
[DEBUG/Consumer-32] starting thread to feed data to pipe
[DEBUG/Consumer-33] Queue._start_thread()
[DEBUG/Consumer-38] Queue._start_thread()
[DEBUG/Consumer-34] Queue._start_thread()
[DEBUG/Consumer-35] Queue._start_thread()
[DEBUG/Consumer-33] doing self._thread.start()
[DEBUG/Consumer-36] Queue._start_thread()
[DEBUG/Consumer-37] Queue._start_thread()
[DEBUG/Consumer-38] doing self._thread.start()
[DEBUG/Consumer-36] doing self._thread.start()
[DEBUG/Consumer-37] doing self._thread.start()
[DEBUG/Consumer-37] starting thread to feed data to pipe
[DEBUG/Consumer-38] starting thread to feed data to pipe
[DEBUG/Consumer-34] doing self._thread.start()
[DEBUG/Consumer-35] doing self._thread.start()
[DEBUG/Consumer-34] starting thread to feed data to pipe
[DEBUG/Consumer-36] starting thread to feed data to pipe
[DEBUG/Consumer-38] ... done self._thread.start()
[DEBUG/Consumer-37] ... done self._thread.

Consumer-38: 8 * 8


[DEBUG/Consumer-32] ... done self._thread.start()
[DEBUG/Consumer-33] starting thread to feed data to pipe


Consumer-37: 9 * 9


[DEBUG/Consumer-35] starting thread to feed data to pipe


Consumer-32: Exiting


[DEBUG/Consumer-33] ... done self._thread.start()
[DEBUG/Consumer-31] doing self._thread.start()
[DEBUG/Consumer-34] ... done self._thread.start()
[DEBUG/Consumer-36] ... done self._thread.start()
[DEBUG/Consumer-35] ... done self._thread.start()


Consumer-36: Exiting


[INFO/Consumer-32] process shutting down


Consumer-34: Exiting
Consumer-35: Exiting
Consumer-33: Exiting


[INFO/Consumer-36] process shutting down
[INFO/Consumer-34] process shutting down
[DEBUG/Consumer-31] starting thread to feed data to pipe
[INFO/Consumer-35] process shutting down
[DEBUG/Consumer-36] running all "atexit" finalizers with priority >= 0
[DEBUG/Consumer-32] running all "atexit" finalizers with priority >= 0
[INFO/Consumer-33] process shutting down
[DEBUG/Consumer-34] running all "atexit" finalizers with priority >= 0
[DEBUG/Consumer-33] running all "atexit" finalizers with priority >= 0
[DEBUG/Consumer-34] telling queue thread to quit
[DEBUG/Consumer-31] ... done self._thread.start()
[DEBUG/Consumer-33] telling queue thread to quit
[DEBUG/Consumer-32] telling queue thread to quit


Consumer-31: Exiting


[DEBUG/Consumer-33] running the remaining "atexit" finalizers
[DEBUG/Consumer-34] running the remaining "atexit" finalizers
[DEBUG/Consumer-33] ... queue thread joined
[DEBUG/Consumer-33] joining queue thread
[DEBUG/Consumer-33] feeder thread got sentinel -- exiting
[DEBUG/Consumer-36] telling queue thread to quit
[DEBUG/Consumer-35] running all "atexit" finalizers with priority >= 0
[INFO/Consumer-31] process shutting down
[DEBUG/Consumer-31] running all "atexit" finalizers with priority >= 0
[INFO/Consumer-33] process exiting with exitcode 0
[DEBUG/Consumer-32] running the remaining "atexit" finalizers
[DEBUG/Consumer-36] running the remaining "atexit" finalizers
[DEBUG/Consumer-35] telling queue thread to quit
[DEBUG/Consumer-36] joining queue thread
[DEBUG/Consumer-35] running the remaining "atexit" finalizers
[DEBUG/Consumer-36] feeder thread got sentinel -- exiting
[DEBUG/Consumer-35] joining queue thread
[DEBUG/Consumer-35] feeder thread got sentinel -- exiting
[DEBUG/Consumer-3

Consumer-38: Exiting


[INFO/Consumer-38] process shutting down


Consumer-37: Exiting


[DEBUG/Consumer-38] running all "atexit" finalizers with priority >= 0
[DEBUG/Consumer-38] telling queue thread to quit
[DEBUG/Consumer-38] running the remaining "atexit" finalizers
[DEBUG/Consumer-38] joining queue thread
[DEBUG/Consumer-38] feeder thread got sentinel -- exiting


Result: 1 * 1 = 1
Result: 7 * 7 = 49
Result: 3 * 3 = 9
Result: 2 * 2 = 4
Result: 5 * 5 = 25
Result: 6 * 6 = 36
Result: 4 * 4 = 16
Result: 0 * 0 = 0
Result: 8 * 8 = 64
Result: 9 * 9 = 81


[DEBUG/Consumer-38] ... queue thread joined
[INFO/Consumer-37] process shutting down
[INFO/Consumer-38] process exiting with exitcode 0
[DEBUG/Consumer-37] running all "atexit" finalizers with priority >= 0


#### Передача сигналов между процессами

С помощью класса Event можно передавать состояние бинарного флага между процессами.

In [13]:
import multiprocessing
import time

def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    print('wait_for_event: starting')
    e.wait()
    print('wait_for_event: e.is_set()->', e.is_set())

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    print('wait_for_event_timeout: starting')
    e.wait(t)
    print('wait_for_event_timeout: e.is_set()->', e.is_set())


if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(name='block', 
                                 target=wait_for_event,
                                 args=(e,))
    w1.start()

    w2 = multiprocessing.Process(name='non-block', 
                                 target=wait_for_event_timeout, 
                                 args=(e, 2))
    w2.start()

    print('main: waiting before calling Event.set()')
    time.sleep(3)
    e.set()
    print('main: event is set')

[DEBUG/MainProcess] created semlock with handle 140091336859648
[DEBUG/MainProcess] created semlock with handle 140091336855552
[DEBUG/MainProcess] created semlock with handle 140091336851456
[DEBUG/MainProcess] created semlock with handle 140091336847360
[DEBUG/MainProcess] created semlock with handle 140091336843264
[DEBUG/block] Queue._after_fork()
[DEBUG/block] Queue._after_fork()
[DEBUG/non-block] Queue._after_fork()
[DEBUG/block] Queue._after_fork()
[INFO/block] child process calling self.run()


wait_for_event: starting


[DEBUG/non-block] Queue._after_fork()
[DEBUG/non-block] Queue._after_fork()
[INFO/non-block] child process calling self.run()


wait_for_event_timeout: starting
main: waiting before calling Event.set()
wait_for_event_timeout: e.is_set()-> False


[INFO/non-block] process shutting down
[DEBUG/non-block] running all "atexit" finalizers with priority >= 0
[DEBUG/non-block] running the remaining "atexit" finalizers
[INFO/non-block] process exiting with exitcode 0


main: event is set
wait_for_event: e.is_set()-> True


[INFO/block] process shutting down
[DEBUG/block] running all "atexit" finalizers with priority >= 0
[DEBUG/block] running the remaining "atexit" finalizers
[INFO/block] process exiting with exitcode 0


Завершение wait() по таймауту не приводит к ошибке, вызывающий должен сам проверить is_set().

#### Управление доступом к ресурсам

С помощью Lock можно организовать критическую секцию кода, доступк которой будет эксклюзивным.

In [14]:
import multiprocessing
import sys

def worker_with(lock, stream):
    with lock:
        stream.write('Lock acquired via with\n')
        
def worker_no_with(lock, stream):
    lock.acquire()
    try:
        stream.write('Lock acquired directly\n')
    finally:
        lock.release()

lock = multiprocessing.Lock()
w = multiprocessing.Process(target=worker_with, args=(lock, sys.stdout))
nw = multiprocessing.Process(target=worker_no_with, args=(lock, sys.stdout))

w.start()
nw.start()

w.join()
nw.join()

[DEBUG/MainProcess] created semlock with handle 140091336839168
[DEBUG/Process-41] Queue._after_fork()
[DEBUG/Process-41] Queue._after_fork()
[DEBUG/Process-41] Queue._after_fork()
[INFO/Process-41] child process calling self.run()


Lock acquired via with


[INFO/Process-41] process shutting down
[DEBUG/Process-42] Queue._after_fork()
[DEBUG/Process-41] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-42] Queue._after_fork()
[DEBUG/Process-41] running the remaining "atexit" finalizers
[DEBUG/Process-42] Queue._after_fork()
[INFO/Process-41] process exiting with exitcode 0
[INFO/Process-42] child process calling self.run()


Lock acquired directly


[INFO/Process-42] process shutting down
[DEBUG/Process-42] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-42] running the remaining "atexit" finalizers
[INFO/Process-42] process exiting with exitcode 0


#### Синхронизация выполнения операций

С помощью Condition можно использовать межпроцессную условную переменную.

In [15]:
import multiprocessing
import time

def stage_1(cond):
    """perform first stage of work, then notify stage_2 to continue"""
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        print('%s done and ready for stage 2' % name)
        cond.notify_all()

def stage_2(cond):
    """wait for the condition telling us stage_1 is done"""
    name = multiprocessing.current_process().name
    print('Starting', name)
    with cond:
        cond.wait()
        print('%s running' % name)

if __name__ == '__main__':
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name='s1', target=stage_1, args=(condition,))
    s2_clients = [
        multiprocessing.Process(name='stage_2[%d]' % i, target=stage_2, args=(condition,))
        for i in range(1, 3)
        ]

    for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()

    s1.join()
    for c in s2_clients:
        c.join()

[DEBUG/MainProcess] created semlock with handle 140091336835072
[DEBUG/MainProcess] created semlock with handle 140091336830976
[DEBUG/MainProcess] created semlock with handle 140091336826880
[DEBUG/MainProcess] created semlock with handle 140091336822784
[DEBUG/stage_2[1]] Queue._after_fork()
[DEBUG/stage_2[1]] Queue._after_fork()
[DEBUG/stage_2[1]] Queue._after_fork()
[INFO/stage_2[1]] child process calling self.run()


Starting stage_2[1]


[DEBUG/stage_2[2]] Queue._after_fork()
[DEBUG/stage_2[2]] Queue._after_fork()
[DEBUG/stage_2[2]] Queue._after_fork()
[INFO/stage_2[2]] child process calling self.run()


Starting stage_2[2]


[DEBUG/s1] Queue._after_fork()
[DEBUG/s1] Queue._after_fork()
[DEBUG/s1] Queue._after_fork()
[INFO/s1] child process calling self.run()


Starting s1
s1 done and ready for stage 2
stage_2[1] running


[INFO/s1] process shutting down


stage_2[2] running


[DEBUG/s1] running all "atexit" finalizers with priority >= 0
[INFO/stage_2[1]] process shutting down
[INFO/stage_2[2]] process shutting down
[DEBUG/s1] running the remaining "atexit" finalizers
[DEBUG/stage_2[1]] running all "atexit" finalizers with priority >= 0
[DEBUG/stage_2[1]] running the remaining "atexit" finalizers
[DEBUG/stage_2[2]] running all "atexit" finalizers with priority >= 0
[INFO/stage_2[1]] process exiting with exitcode 0
[DEBUG/stage_2[2]] running the remaining "atexit" finalizers
[INFO/s1] process exiting with exitcode 0
[INFO/stage_2[2]] process exiting with exitcode 0


#### Контроль параллельного доступа к ресурсам

С помощью Semaphore можно ограничить количество одновременно работающих (или имеющих доступ к чему-то) процессов. Например, для ограничения количества одновременных соединений или параллельных загрузок.

In [16]:
import random
import multiprocessing
import time

class ActivePool:
    def __init__(self):
        super(ActivePool, self).__init__()
        self.mgr = multiprocessing.Manager()
        self.active = self.mgr.list()
        self.lock = multiprocessing.Lock()
    def makeActive(self, name):
        with self.lock:
            self.active.append(name)
    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
    def __str__(self):
        with self.lock:
            return str(self.active)

def worker(s, pool):
    name = multiprocessing.current_process().name
    with s:
        pool.makeActive(name)
        print('Now running: %s' % str(pool))
        time.sleep(random.random())
        pool.makeInactive(name)

if __name__ == '__main__':
    pool = ActivePool()
    s = multiprocessing.Semaphore(3)
    jobs = [
        multiprocessing.Process(target=worker, name=str(i), args=(s, pool))
        for i in range(10)
        ]

    for j in jobs:
        j.start()

    for j in jobs:
        j.join()
        print('Now running: %s' % str(pool))

[DEBUG/SyncManager-46] Queue._after_fork()
[DEBUG/SyncManager-46] Queue._after_fork()
[DEBUG/SyncManager-46] Queue._after_fork()
[INFO/SyncManager-46] child process calling self.run()
[INFO/SyncManager-46] created temp directory /tmp/pymp-ckdkmmww
[INFO/SyncManager-46] manager serving at '/tmp/pymp-ckdkmmww/listener-hv63xapc'
[DEBUG/MainProcess] requesting creation of a shared 'list' object
[DEBUG/SyncManager-46] 'list' callable returned object with id '7f69841fe588'
[DEBUG/MainProcess] INCREF '7f69841fe588'
[DEBUG/MainProcess] created semlock with handle 140091336818688
[DEBUG/MainProcess] created semlock with handle 140091336814592
[DEBUG/0] Queue._after_fork()
[DEBUG/2] Queue._after_fork()
[DEBUG/1] Queue._after_fork()
[DEBUG/8] Queue._after_fork()
[DEBUG/0] Queue._after_fork()
[DEBUG/3] Queue._after_fork()
[DEBUG/2] Queue._after_fork()
[DEBUG/5] Queue._after_fork()
[DEBUG/7] Queue._after_fork()
[DEBUG/6] Queue._after_fork()
[DEBUG/4] Queue._after_fork()
[DEBUG/3] Queue._after_fork(

Now running: ['3']


[DEBUG/5] making connection to manager
[DEBUG/SyncManager-46] starting server thread to service '5'
[DEBUG/8] thread 'MainThread' does not own a connection


Now running: ['3', '5']


[DEBUG/8] making connection to manager
[DEBUG/SyncManager-46] starting server thread to service '8'


Now running: ['3', '5', '8']


[INFO/3] process shutting down
[DEBUG/6] thread 'MainThread' does not own a connection
[DEBUG/3] running all "atexit" finalizers with priority >= 0
[DEBUG/3] DECREF '7f69841fe588'
[DEBUG/6] making connection to manager
[DEBUG/3] thread 'MainThread' has no more proxies so closing conn
[DEBUG/SyncManager-46] starting server thread to service '6'
[DEBUG/3] running the remaining "atexit" finalizers
[DEBUG/SyncManager-46] got EOF -- exiting thread serving '3'


Now running: ['5', '8', '6']


[INFO/3] process exiting with exitcode 0
[INFO/5] process shutting down
[DEBUG/2] thread 'MainThread' does not own a connection
[DEBUG/5] running all "atexit" finalizers with priority >= 0
[DEBUG/2] making connection to manager
[DEBUG/5] DECREF '7f69841fe588'
[DEBUG/SyncManager-46] starting server thread to service '2'
[DEBUG/5] thread 'MainThread' has no more proxies so closing conn
[DEBUG/5] running the remaining "atexit" finalizers


Now running: ['8', '6', '2']


[DEBUG/SyncManager-46] got EOF -- exiting thread serving '5'
[INFO/5] process exiting with exitcode 0
[DEBUG/7] thread 'MainThread' does not own a connection
[INFO/8] process shutting down
[DEBUG/7] making connection to manager
[DEBUG/8] running all "atexit" finalizers with priority >= 0
[DEBUG/8] DECREF '7f69841fe588'
[DEBUG/SyncManager-46] starting server thread to service '7'
[INFO/2] process shutting down
[DEBUG/8] thread 'MainThread' has no more proxies so closing conn
[DEBUG/0] thread 'MainThread' does not own a connection


Now running: ['6', '2', '7']


[DEBUG/0] making connection to manager
[DEBUG/2] running all "atexit" finalizers with priority >= 0
[DEBUG/8] running the remaining "atexit" finalizers
[INFO/8] process exiting with exitcode 0
[DEBUG/2] DECREF '7f69841fe588'
[DEBUG/SyncManager-46] got EOF -- exiting thread serving '8'
[DEBUG/SyncManager-46] starting server thread to service '0'
[DEBUG/2] thread 'MainThread' has no more proxies so closing conn


Now running: ['6', '7', '0']


[DEBUG/SyncManager-46] got EOF -- exiting thread serving '2'
[DEBUG/2] running the remaining "atexit" finalizers
[INFO/2] process exiting with exitcode 0
[INFO/6] process shutting down
[DEBUG/4] thread 'MainThread' does not own a connection
[DEBUG/6] running all "atexit" finalizers with priority >= 0
[DEBUG/6] DECREF '7f69841fe588'
[DEBUG/4] making connection to manager
[DEBUG/6] thread 'MainThread' has no more proxies so closing conn
[DEBUG/SyncManager-46] starting server thread to service '4'
[DEBUG/6] running the remaining "atexit" finalizers
[INFO/6] process exiting with exitcode 0
[DEBUG/SyncManager-46] got EOF -- exiting thread serving '6'


Now running: ['7', '0', '4']


[INFO/4] process shutting down
[DEBUG/1] thread 'MainThread' does not own a connection
[DEBUG/4] running all "atexit" finalizers with priority >= 0
[DEBUG/1] making connection to manager
[DEBUG/4] DECREF '7f69841fe588'
[DEBUG/SyncManager-46] starting server thread to service '1'


Now running: ['7', '0', '1']


[DEBUG/4] thread 'MainThread' has no more proxies so closing conn
[DEBUG/SyncManager-46] got EOF -- exiting thread serving '4'
[DEBUG/4] running the remaining "atexit" finalizers
[INFO/4] process exiting with exitcode 0
[INFO/7] process shutting down
[DEBUG/9] thread 'MainThread' does not own a connection
[DEBUG/9] making connection to manager
[DEBUG/7] running all "atexit" finalizers with priority >= 0
[DEBUG/7] DECREF '7f69841fe588'
[DEBUG/SyncManager-46] starting server thread to service '9'
[DEBUG/7] thread 'MainThread' has no more proxies so closing conn
[DEBUG/7] running the remaining "atexit" finalizers
[DEBUG/SyncManager-46] got EOF -- exiting thread serving '7'


Now running: ['0', '1', '9']


[INFO/7] process exiting with exitcode 0
[INFO/9] process shutting down
[DEBUG/9] running all "atexit" finalizers with priority >= 0
[DEBUG/9] DECREF '7f69841fe588'
[DEBUG/9] thread 'MainThread' has no more proxies so closing conn
[DEBUG/SyncManager-46] got EOF -- exiting thread serving '9'
[DEBUG/9] running the remaining "atexit" finalizers
[INFO/9] process exiting with exitcode 0
[INFO/0] process shutting down
[DEBUG/0] running all "atexit" finalizers with priority >= 0
[DEBUG/0] DECREF '7f69841fe588'
[DEBUG/0] thread 'MainThread' has no more proxies so closing conn
[DEBUG/SyncManager-46] got EOF -- exiting thread serving '0'
[DEBUG/0] running the remaining "atexit" finalizers
[INFO/0] process exiting with exitcode 0
[DEBUG/MainProcess] thread 'MainThread' does not own a connection
[DEBUG/MainProcess] making connection to manager
[DEBUG/SyncManager-46] starting server thread to service 'MainProcess'
[INFO/1] process shutting down
[DEBUG/1] running all "atexit" finalizers with priorit

Now running: ['1']
Now running: []
Now running: []
Now running: []
Now running: []
Now running: []
Now running: []
Now running: []
Now running: []
Now running: []


#### Управление  общим состоянием

Класс Manager предоставляет, среди прочих, dict и list для совместного доступа из различных процессов (использовался и в прошлом примере). 

In [17]:
import multiprocessing

def worker(d, key, value):
    d[key] = value

if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    d = mgr.dict()
    jobs = [ multiprocessing.Process(target=worker, args=(d, i, i*2))
             for i in range(10) 
             ]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print('Results:', d)

[DEBUG/SyncManager-57] Queue._after_fork()
[DEBUG/SyncManager-57] Queue._after_fork()
[DEBUG/SyncManager-57] Queue._after_fork()
[DEBUG/SyncManager-57] INCREF '7f69841fe588'
[INFO/SyncManager-57] child process calling self.run()
[INFO/SyncManager-57] created temp directory /tmp/pymp-ayb6hj9w
[DEBUG/MainProcess] requesting creation of a shared 'dict' object
[INFO/SyncManager-57] manager serving at '/tmp/pymp-ayb6hj9w/listener-rx9hpr31'
[DEBUG/SyncManager-57] 'dict' callable returned object with id '7f6984290508'
[DEBUG/MainProcess] INCREF '7f6984290508'
[DEBUG/Process-59] Queue._after_fork()
[DEBUG/Process-60] Queue._after_fork()
[DEBUG/Process-58] Queue._after_fork()
[DEBUG/Process-61] Queue._after_fork()
[DEBUG/Process-60] Queue._after_fork()
[DEBUG/Process-59] Queue._after_fork()
[DEBUG/Process-61] Queue._after_fork()
[DEBUG/Process-63] Queue._after_fork()
[DEBUG/Process-58] Queue._after_fork()
[DEBUG/Process-64] Queue._after_fork()
[DEBUG/Process-59] Queue._after_fork()
[DEBUG/Proce

Results: {0: 0, 1: 2, 2: 4, 3: 6, 4: 8, 5: 10, 6: 12, 7: 14, 8: 16, 9: 18}


#### Shared namespaces

С помощью Manager можно создать общий разделяемый Namespace. Любые добавленные имена будут доступны всем инстансам.

In [18]:
import multiprocessing

def producer(ns, event):
    ns.value = 'This is the value'
    event.set()

def consumer(ns, event):
    try:
        value = ns.value
    except Exception as err:
        print('Before event, consumer got:', str(err))
    event.wait()
    print('After event, consumer got:', ns.value)

if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    event = multiprocessing.Event()
    p = multiprocessing.Process(target=producer, args=(namespace, event))
    c = multiprocessing.Process(target=consumer, args=(namespace, event))
    
    c.start()
    p.start()
    
    c.join()
    p.join()

[DEBUG/SyncManager-68] Queue._after_fork()
[DEBUG/SyncManager-68] Queue._after_fork()
[DEBUG/SyncManager-68] Queue._after_fork()
[DEBUG/SyncManager-68] INCREF '7f69841fe588'
[DEBUG/SyncManager-68] INCREF '7f6984290508'
[INFO/SyncManager-68] child process calling self.run()
[INFO/SyncManager-68] created temp directory /tmp/pymp-btloyphn
[DEBUG/MainProcess] requesting creation of a shared 'Namespace' object
[INFO/SyncManager-68] manager serving at '/tmp/pymp-btloyphn/listener-lef4au2q'
[DEBUG/SyncManager-68] 'Namespace' callable returned object with id '7f6987303390'
[DEBUG/MainProcess] INCREF '7f6987303390'
[DEBUG/MainProcess] created semlock with handle 140091336810496
[DEBUG/MainProcess] created semlock with handle 140091335471104
[DEBUG/MainProcess] created semlock with handle 140091335467008
[DEBUG/MainProcess] created semlock with handle 140091335462912
[DEBUG/MainProcess] created semlock with handle 140091335458816
[DEBUG/Process-70] Queue._after_fork()
[DEBUG/Process-69] Queue._a

Before event, consumer got: 'Namespace' object has no attribute 'value'


[DEBUG/SyncManager-68] starting server thread to service 'Process-69'


After event, consumer got: This is the value


[INFO/Process-69] process shutting down
[DEBUG/Process-69] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-69] DECREF '7f6987303390'
[INFO/Process-70] process shutting down
[DEBUG/Process-69] thread 'MainThread' has no more proxies so closing conn
[DEBUG/Process-70] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-70] DECREF '7f6987303390'
[DEBUG/Process-69] DECREF '7f6984290508'
[DEBUG/Process-69] DECREF '7f69841fe588'
[DEBUG/Process-69] running the remaining "atexit" finalizers
[INFO/Process-69] process exiting with exitcode 0
[DEBUG/SyncManager-68] got EOF -- exiting thread serving 'Process-69'
[DEBUG/Process-70] thread 'MainThread' has no more proxies so closing conn
[DEBUG/Process-70] DECREF '7f6984290508'
[DEBUG/SyncManager-68] got EOF -- exiting thread serving 'Process-70'
[DEBUG/Process-70] DECREF '7f69841fe588'
[DEBUG/Process-70] running the remaining "atexit" finalizers
[INFO/Process-70] process exiting with exitcode 0


Важно помнить, что апдейты (не добавления) изменяемых значений не будут автоматически распространяться, поэтому нужно явно добавить элемент ещё раз.

In [19]:
import multiprocessing

def producer(ns, event):
    ns.my_list.append('This is the value') # DOES NOT UPDATE GLOBAL VALUE!
    event.set()

def consumer(ns, event):
    print('Before event, consumer got:', ns.my_list)
    event.wait()
    print('After event, consumer got:', ns.my_list)

if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    namespace.my_list = []
    
    event = multiprocessing.Event()
    p = multiprocessing.Process(target=producer, args=(namespace, event))
    c = multiprocessing.Process(target=consumer, args=(namespace, event))
    
    c.start()
    p.start()
    
    c.join()
    p.join()

[DEBUG/SyncManager-71] Queue._after_fork()
[DEBUG/SyncManager-71] Queue._after_fork()
[DEBUG/SyncManager-71] Queue._after_fork()
[DEBUG/SyncManager-71] INCREF '7f69841fe588'
[DEBUG/SyncManager-71] INCREF '7f6984290508'
[DEBUG/SyncManager-71] INCREF '7f6987303390'
[INFO/SyncManager-71] child process calling self.run()
[INFO/SyncManager-71] created temp directory /tmp/pymp-7qg1ez5k
[DEBUG/MainProcess] requesting creation of a shared 'Namespace' object
[INFO/SyncManager-71] manager serving at '/tmp/pymp-7qg1ez5k/listener-k6ue89mb'
[DEBUG/SyncManager-71] 'Namespace' callable returned object with id '7f6984259978'
[DEBUG/MainProcess] INCREF '7f6984259978'
[DEBUG/MainProcess] thread 'MainThread' does not own a connection
[DEBUG/MainProcess] making connection to manager
[DEBUG/SyncManager-71] starting server thread to service 'MainProcess'
[DEBUG/MainProcess] created semlock with handle 140091335454720
[DEBUG/MainProcess] created semlock with handle 140091335450624
[DEBUG/MainProcess] created

Before event, consumer got: []


[DEBUG/Process-72] Queue._after_fork()
[DEBUG/Process-72] Queue._after_fork()
[DEBUG/Process-72] Queue._after_fork()
[DEBUG/Process-72] INCREF '7f69841fe588'
[DEBUG/Process-72] INCREF '7f6984290508'
[DEBUG/Process-72] INCREF '7f6984259978'
[INFO/Process-72] child process calling self.run()
[DEBUG/Process-72] thread 'MainThread' does not own a connection
[DEBUG/Process-72] making connection to manager
[DEBUG/SyncManager-71] starting server thread to service 'Process-72'


After event, consumer got: []


[INFO/Process-72] process shutting down
[INFO/Process-73] process shutting down
[DEBUG/Process-72] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-73] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-72] DECREF '7f6984259978'
[DEBUG/Process-73] DECREF '7f6984259978'
[DEBUG/Process-72] thread 'MainThread' has no more proxies so closing conn
[DEBUG/Process-73] thread 'MainThread' has no more proxies so closing conn
[DEBUG/Process-73] DECREF '7f6984290508'
[DEBUG/Process-72] DECREF '7f6984290508'
[DEBUG/SyncManager-71] got EOF -- exiting thread serving 'Process-72'
[DEBUG/SyncManager-71] got EOF -- exiting thread serving 'Process-73'
[DEBUG/Process-72] DECREF '7f69841fe588'
[DEBUG/Process-73] DECREF '7f69841fe588'
[DEBUG/Process-72] running the remaining "atexit" finalizers
[INFO/Process-72] process exiting with exitcode 0
[DEBUG/Process-73] running the remaining "atexit" finalizers
[INFO/Process-73] process exiting with exitcode 0


#### Пулы процессов

* С помощью Pool можно поддерживать фиксированное множество процессов-рабочих (workers).
* map() аналогичен встроенному, но выполняется в отдельных процессах.

In [20]:
import multiprocessing

def do_calculation(data):
    return data * 2

def start_process():
    print('Starting', multiprocessing.current_process().name)

if __name__ == '__main__':
    inputs = list(range(10))
    print('Input   :', inputs)
    
    builtin_outputs = map(do_calculation, inputs)
    print('Built-in:', list(builtin_outputs))
    
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size,
                                initializer=start_process,
                                )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks

    print('Pool    :', pool_outputs)

[DEBUG/MainProcess] created semlock with handle 140091336810496
[DEBUG/MainProcess] created semlock with handle 140091335471104
[DEBUG/MainProcess] created semlock with handle 140091335467008
[DEBUG/MainProcess] created semlock with handle 140091335462912


Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-74] Queue._after_fork()
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-75] Queue._after_fork()
[DEBUG/ForkPoolWorker-74] Queue._after_fork()
[DEBUG/ForkPoolWorker-74] Queue._after_fork()
[DEBUG/ForkPoolWorker-75] Queue._after_fork()
[DEBUG/ForkPoolWorker-75] Queue._after_fork()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-76] Queue._after_fork()
[DEBUG/ForkPoolWorker-74] INCREF '7f69841fe588'
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-77] Queue._after_fork()
[DEBUG/ForkPoolWorker-75] INCREF '7f69841fe588'
[DEBUG/ForkPoolWorker-77] Queue._after_fork()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-77] Queue._after_fork()
[DEBUG/ForkPoolWorker-77] INCREF '7f69841fe588'
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-76] Queue._after_fork()
[DEBUG/ForkPoolWorker-79] Queue._after_fork()
[DEBUG/ForkPoolWorker-79] Queue._after_fork()
[DEBUG/ForkPoolWorker-76] Q

Starting ForkPoolWorker-74


[DEBUG/ForkPoolWorker-79] INCREF '7f69841fe588'
[DEBUG/ForkPoolWorker-76] INCREF '7f69841fe588'
[DEBUG/ForkPoolWorker-75] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-77] INCREF '7f6984290508'
[DEBUG/ForkPoolWorker-80] Queue._after_fork()
[DEBUG/ForkPoolWorker-76] INCREF '7f6984290508'
[INFO/ForkPoolWorker-75] child process calling self.run()
[DEBUG/ForkPoolWorker-80] Queue._after_fork()
[DEBUG/ForkPoolWorker-77] INCREF '7f6984259978'


Starting ForkPoolWorker-75


[DEBUG/MainProcess] DECREF '7f69841fe588'
[INFO/ForkPoolWorker-77] child process calling self.run()
[DEBUG/ForkPoolWorker-80] INCREF '7f69841fe588'
[DEBUG/ForkPoolWorker-78] INCREF '7f6984290508'
[DEBUG/MainProcess] thread 'MainThread' has no more proxies so closing conn
[DEBUG/ForkPoolWorker-78] Queue._after_fork()
[DEBUG/ForkPoolWorker-79] INCREF '7f6984290508'
[DEBUG/ForkPoolWorker-76] INCREF '7f6984259978'
[INFO/MainProcess] sending shutdown message to manager
[DEBUG/ForkPoolWorker-81] Queue._after_fork()
[INFO/ForkPoolWorker-76] child process calling self.run()
[DEBUG/ForkPoolWorker-78] Queue._after_fork()


Starting ForkPoolWorker-77


[DEBUG/ForkPoolWorker-79] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-80] INCREF '7f6984290508'


Starting ForkPoolWorker-76


[DEBUG/ForkPoolWorker-78] Queue._after_fork()
[DEBUG/ForkPoolWorker-81] Queue._after_fork()
[INFO/ForkPoolWorker-79] child process calling self.run()
[DEBUG/ForkPoolWorker-81] Queue._after_fork()


Starting ForkPoolWorker-79


[DEBUG/ForkPoolWorker-78] INCREF '7f69841fe588'
[DEBUG/ForkPoolWorker-80] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-81] INCREF '7f69841fe588'
[DEBUG/ForkPoolWorker-78] INCREF '7f6984259978'
[INFO/ForkPoolWorker-80] child process calling self.run()
[DEBUG/ForkPoolWorker-81] INCREF '7f6984290508'


Starting ForkPoolWorker-80


[INFO/ForkPoolWorker-78] child process calling self.run()


Starting ForkPoolWorker-78


[DEBUG/ForkPoolWorker-81] INCREF '7f6984259978'
[INFO/ForkPoolWorker-81] child process calling self.run()


Starting ForkPoolWorker-81


[DEBUG/MainProcess] closing pool
[DEBUG/MainProcess] joining pool
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/ForkPoolWorker-78] worker got sentinel -- exiting
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0
[DEBUG/ForkPoolWorker-78] worker exiting after 2 tasks
[INFO/ForkPoolWorker-78] process shutting down
[DEBUG/ForkPoolWorker-78] running all "atexit" finalizers with priority >= 0
[DEBUG/ForkPoolWorker-78] DECREF '7f6984259978'
[DEBUG/ForkPoolWorker-75] worker got sentinel -- exiting
[DEBUG/ForkPoolWorker-80] worker got sentinel -- exiting
[DEBUG/ForkPoolWorker-76] worker got sentinel -- exiting
[DEBUG/ForkPoolWorker-79] worker got sentin

Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


In [21]:
import multiprocessing

def do_calculation(data):
    return data * 2

def start_process():
    print('Starting', multiprocessing.current_process().name)

if __name__ == '__main__':
    inputs = list(range(10))
    print('Input   :', inputs)
    
    builtin_outputs = map(do_calculation, inputs)
    print('Built-in:', builtin_outputs)
    
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size,
                                initializer=start_process,
                                maxtasksperchild=2,
                                )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    
    print('Pool    :', pool_outputs)

[DEBUG/MainProcess] created semlock with handle 140091336818688
[DEBUG/MainProcess] created semlock with handle 140091335458816
[DEBUG/MainProcess] created semlock with handle 140091335434240
[DEBUG/MainProcess] created semlock with handle 140091335430144


Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x7f6984259940>


[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-82] Queue._after_fork()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-83] Queue._after_fork()
[DEBUG/ForkPoolWorker-83] Queue._after_fork()
[DEBUG/ForkPoolWorker-82] Queue._after_fork()
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-83] Queue._after_fork()
[DEBUG/ForkPoolWorker-82] Queue._after_fork()
[DEBUG/ForkPoolWorker-85] Queue._after_fork()
[DEBUG/ForkPoolWorker-83] INCREF '7f6984290508'
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-86] Queue._after_fork()
[DEBUG/ForkPoolWorker-85] Queue._after_fork()
[DEBUG/ForkPoolWorker-84] Queue._after_fork()
[DEBUG/ForkPoolWorker-86] Queue._after_fork()
[DEBUG/ForkPoolWorker-85] Queue._after_fork()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-83] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-84] Queue._after_fork()
[DEBUG/ForkPoolWorker-88] Queue._after_fork()
[DEBUG/ForkPoolWorker-82] INC

Starting ForkPoolWorker-83


[DEBUG/ForkPoolWorker-84] Queue._after_fork()
[DEBUG/ForkPoolWorker-85] INCREF '7f6984290508'
[DEBUG/ForkPoolWorker-87] Queue._after_fork()
[DEBUG/ForkPoolWorker-88] Queue._after_fork()
[DEBUG/ForkPoolWorker-87] Queue._after_fork()
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] joining worker handler
[DEBUG/MainProcess] terminating workers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers
[DEBUG/ForkPoolWorker-86] INCREF '7f6984290508'
[DEBUG/ForkPoolWorker-88] Queue._after_fork()
[DEBUG/ForkPoolWorker-89] Queue._after_fork()
[DEBUG/ForkPoolWorker-82] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-84] INCREF '7f6984290508'
[DEBUG/ForkPoolWorker-85] INCREF '7f6984259978'
[INFO/ForkPoolWorker-82] child process calling self.run()
[DEBUG/ForkPoolWorker-87] Queue._after_fork()
[

Starting ForkPoolWorker-82


[INFO/ForkPoolWorker-86] child process calling self.run()
[DEBUG/ForkPoolWorker-89] INCREF '7f6984290508'


Starting ForkPoolWorker-85


[DEBUG/ForkPoolWorker-83] running all "atexit" finalizers with priority >= 0
[DEBUG/ForkPoolWorker-88] INCREF '7f6984290508'
[DEBUG/ForkPoolWorker-87] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-85] worker exiting after 2 tasks
[DEBUG/ForkPoolWorker-84] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-83] DECREF '7f6984259978'
[INFO/ForkPoolWorker-87] child process calling self.run()
[INFO/ForkPoolWorker-85] process shutting down
[DEBUG/ForkPoolWorker-89] INCREF '7f6984259978'


Starting ForkPoolWorker-87


[INFO/ForkPoolWorker-89] child process calling self.run()
[DEBUG/ForkPoolWorker-82] worker exiting after 2 tasks
[DEBUG/ForkPoolWorker-85] running all "atexit" finalizers with priority >= 0
[DEBUG/ForkPoolWorker-87] worker exiting after 2 tasks


Starting ForkPoolWorker-86


[DEBUG/ForkPoolWorker-85] DECREF '7f6984259978'


Starting ForkPoolWorker-89


[DEBUG/MainProcess] closing pool
[INFO/ForkPoolWorker-82] process shutting down
[DEBUG/MainProcess] joining pool
[INFO/ForkPoolWorker-87] process shutting down
[INFO/ForkPoolWorker-84] child process calling self.run()
[DEBUG/ForkPoolWorker-89] worker exiting after 2 tasks
[DEBUG/ForkPoolWorker-87] running all "atexit" finalizers with priority >= 0
[INFO/ForkPoolWorker-89] process shutting down
[DEBUG/ForkPoolWorker-87] DECREF '7f6984259978'
[DEBUG/ForkPoolWorker-89] running all "atexit" finalizers with priority >= 0
[DEBUG/ForkPoolWorker-89] DECREF '7f6984259978'
[DEBUG/ForkPoolWorker-88] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-83] DECREF '7f6984290508'
[DEBUG/ForkPoolWorker-82] running all "atexit" finalizers with priority >= 0
[DEBUG/ForkPoolWorker-85] DECREF '7f6984290508'
[INFO/ForkPoolWorker-88] child process calling self.run()
[DEBUG/ForkPoolWorker-83] running the remaining "atexit" finalizers


Starting ForkPoolWorker-84


[DEBUG/ForkPoolWorker-82] DECREF '7f6984259978'
[DEBUG/ForkPoolWorker-87] DECREF '7f6984290508'
[DEBUG/ForkPoolWorker-85] running the remaining "atexit" finalizers


Starting ForkPoolWorker-88


[INFO/ForkPoolWorker-83] process exiting with exitcode 0
[DEBUG/ForkPoolWorker-89] DECREF '7f6984290508'
[DEBUG/ForkPoolWorker-82] DECREF '7f6984290508'
[INFO/ForkPoolWorker-85] process exiting with exitcode 0
[DEBUG/ForkPoolWorker-87] running the remaining "atexit" finalizers
[DEBUG/ForkPoolWorker-89] running the remaining "atexit" finalizers
[INFO/ForkPoolWorker-87] process exiting with exitcode 0
[DEBUG/ForkPoolWorker-82] running the remaining "atexit" finalizers
[INFO/ForkPoolWorker-89] process exiting with exitcode 0
[INFO/ForkPoolWorker-82] process exiting with exitcode 0
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] task handler exiting
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] result handler exiting: len(cache

Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


In [22]:
import multiprocessing

def do_calculation(data):
    return data * 2

def start_process():
    print('Starting', multiprocessing.current_process().name)

if __name__ == '__main__':
    inputs = list(range(10))
    print('Input   :', inputs)
    
    builtin_outputs = map(do_calculation, inputs)
    print('Built-in:', builtin_outputs)
    
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size,
                                initializer=start_process,
                                maxtasksperchild=2,
                                )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    
    print('Pool    :', pool_outputs)

[DEBUG/MainProcess] created semlock with handle 140091336810496
[DEBUG/MainProcess] created semlock with handle 140091335471104
[DEBUG/MainProcess] created semlock with handle 140091335467008
[DEBUG/MainProcess] created semlock with handle 140091335462912


Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x7f6984220fd0>


[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-90] Queue._after_fork()
[DEBUG/ForkPoolWorker-90] Queue._after_fork()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-90] Queue._after_fork()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-91] Queue._after_fork()
[DEBUG/ForkPoolWorker-91] Queue._after_fork()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-90] INCREF '7f6984290508'
[DEBUG/ForkPoolWorker-91] Queue._after_fork()
[DEBUG/ForkPoolWorker-91] INCREF '7f6984290508'
[DEBUG/MainProcess] added worker
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-91] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-94] Queue._after_fork()
[DEBUG/ForkPoolWorker-93] Queue._after_fork()
[DEBUG/MainProcess] added worker
[DEBUG/ForkPoolWorker-90] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-92] Queue._after_fork()
[DEBUG/ForkPoolWorker-92] Queue._after_fork()
[INFO/ForkPoolWorker-90] child process calling self.run()
[DEBUG/ForkPoolWorker-94] Queue._after_fork()
[DEBUG/ForkPo

Starting ForkPoolWorker-90


[DEBUG/MainProcess] joining result handler
[DEBUG/ForkPoolWorker-94] Queue._after_fork()
[DEBUG/MainProcess] joining pool workers
[DEBUG/ForkPoolWorker-93] Queue._after_fork()


Starting ForkPoolWorker-91


[DEBUG/ForkPoolWorker-97] Queue._after_fork()
[DEBUG/ForkPoolWorker-96] Queue._after_fork()
[DEBUG/ForkPoolWorker-95] Queue._after_fork()
[DEBUG/ForkPoolWorker-95] Queue._after_fork()
[DEBUG/ForkPoolWorker-96] Queue._after_fork()
[DEBUG/ForkPoolWorker-92] INCREF '7f6984290508'
[DEBUG/ForkPoolWorker-91] worker exiting after 2 tasks
[DEBUG/ForkPoolWorker-90] worker exiting after 2 tasks
[DEBUG/ForkPoolWorker-93] INCREF '7f6984290508'
[DEBUG/ForkPoolWorker-97] Queue._after_fork()
[DEBUG/ForkPoolWorker-96] Queue._after_fork()
[INFO/ForkPoolWorker-91] process shutting down
[DEBUG/ForkPoolWorker-91] running all "atexit" finalizers with priority >= 0
[DEBUG/ForkPoolWorker-97] Queue._after_fork()
[INFO/ForkPoolWorker-90] process shutting down
[DEBUG/ForkPoolWorker-92] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-91] DECREF '7f6984259978'
[DEBUG/ForkPoolWorker-96] INCREF '7f6984290508'
[DEBUG/ForkPoolWorker-93] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-95] INCREF '7f6984290508'
[INFO/ForkPoolW

Starting ForkPoolWorker-92


[INFO/ForkPoolWorker-93] child process calling self.run()
[DEBUG/ForkPoolWorker-90] DECREF '7f6984259978'
[DEBUG/ForkPoolWorker-96] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-92] worker exiting after 2 tasks


Starting ForkPoolWorker-93


[DEBUG/ForkPoolWorker-93] worker exiting after 2 tasks
[INFO/ForkPoolWorker-93] process shutting down
[INFO/ForkPoolWorker-92] process shutting down
[DEBUG/ForkPoolWorker-95] INCREF '7f6984259978'
[INFO/ForkPoolWorker-96] child process calling self.run()
[DEBUG/ForkPoolWorker-94] INCREF '7f6984290508'
[DEBUG/ForkPoolWorker-91] running the remaining "atexit" finalizers
[DEBUG/ForkPoolWorker-97] INCREF '7f6984259978'
[DEBUG/ForkPoolWorker-92] running all "atexit" finalizers with priority >= 0
[INFO/ForkPoolWorker-95] child process calling self.run()


Starting ForkPoolWorker-96


[DEBUG/MainProcess] closing pool
[DEBUG/ForkPoolWorker-93] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] joining pool
[DEBUG/ForkPoolWorker-90] DECREF '7f6984290508'
[INFO/ForkPoolWorker-97] child process calling self.run()


Starting ForkPoolWorker-95


[DEBUG/ForkPoolWorker-96] worker exiting after 2 tasks
[INFO/ForkPoolWorker-91] process exiting with exitcode 0
[DEBUG/ForkPoolWorker-92] DECREF '7f6984259978'
[DEBUG/ForkPoolWorker-93] DECREF '7f6984259978'
[DEBUG/ForkPoolWorker-94] INCREF '7f6984259978'


Starting ForkPoolWorker-97


[INFO/ForkPoolWorker-94] child process calling self.run()
[DEBUG/ForkPoolWorker-92] DECREF '7f6984290508'
[INFO/ForkPoolWorker-96] process shutting down
[DEBUG/ForkPoolWorker-90] running the remaining "atexit" finalizers
[DEBUG/ForkPoolWorker-96] running all "atexit" finalizers with priority >= 0


Starting ForkPoolWorker-94


[DEBUG/ForkPoolWorker-92] running the remaining "atexit" finalizers
[DEBUG/ForkPoolWorker-96] DECREF '7f6984259978'
[INFO/ForkPoolWorker-92] process exiting with exitcode 0
[INFO/ForkPoolWorker-90] process exiting with exitcode 0
[DEBUG/ForkPoolWorker-96] DECREF '7f6984290508'
[DEBUG/ForkPoolWorker-93] DECREF '7f6984290508'
[DEBUG/ForkPoolWorker-96] running the remaining "atexit" finalizers
[DEBUG/ForkPoolWorker-93] running the remaining "atexit" finalizers
[INFO/ForkPoolWorker-96] process exiting with exitcode 0
[INFO/ForkPoolWorker-93] process exiting with exitcode 0
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/ForkPoolWorker-97] worker got sentinel -- exiting
[DEBUG/MainProcess] result handler exiting: le

Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


#### Что почитать

* https://docs.python.org/3/library/multiprocessing.html
* https://pymotw.com/3/multiprocessing/index.html
* https://habrahabr.ru/post/167503/
* https://www.ibm.com/developerworks/aix/library/au-multiprocessing/index.html

## Несколько интересных библиотек для параллельного выполнения

* ipython shell
* pp
* pyMPI
* и др.

Почитать про библиотеки и способы:
* https://wiki.python.org/moin/ParallelProcessing
* http://www.python-course.eu/forking.php
* http://www.davekuhlman.org/python_multiprocessing_01.html
* http://materials.jeremybejarano.com/MPIwithPython/
* https://pythonhosted.org/mpi4py/