# A Quick `multiprocessing` Discusson

In this assignment, we will use `multiprocessing`. In this environment, we can expect that variables are not shared unless explicitly designed to. As a downside, however, `multiprocessing` does not play very nicely with Jupyter notebooks, so we will have to reference other Python files in this part. All of these files will be generated in this notebook, but will be saved in `tutorial/demos`.

In [35]:
# these will be in the demos folder. Make sure it exists
from pathlib import Path

demofol = Path() / "demos"
if not demofol.exists():
    demofol.mkdir()

The Course material includes a discussion on `Pool`s. You can also spawn processes directly. The latter is recommended, as passing queues to pools requires `global` variable magic.

## Example 1: A `multiprocessing` Event Loop

Based off of the [`Queue`](https://se-for-sci.github.io/content/week11_omp/concepts.html#queue), we can create a `multiprocessing` version of the [event loop](https://se-for-sci.github.io/content/week11_omp/concepts.html#event-loop), below.

Rather than a list of generators as in the example, we can provide a queue to read from, so that a child process can dynamically pick up new tasks from the main process.

```python
    def instructions(
        self,
    ):
        held = None

        while True:
            # wait until further instructions
            command = self.commands_input.get()

            # student is told that we're finished with the demonstration
            if command.header == CommandType.THANKS_FOR_HELPING:
                self.say("You're welcome!")
                break

            # student is asked what their number is
            if command.header == CommandType.WHATS_YOUR_NUMBER:
                if held is None:
                    self.say("I'm not holding anything.")
                else:
                    self.say(f"I have a {held}")

            # student is asked to take this new number
            elif command.header == CommandType.HOLD_THIS:
                if held is None:
                    self.say(f"A {command.extra}? Got it.")
                    held = command.extra
                else:
                    self.say(f"A {command.extra}? I'll just drop this {held}.")
                    held = command.extra
```

Note that with a `student` instance, the function `student.instructions` is an idiom for `Student.instructions(student)`, so if `instructions` had some arguments, say `instructions(self, my_name)`, then the arguments after self would be given to the `Process` constructor.

In [36]:
%%writefile demos/1_queue.py
import time
from dataclasses import dataclass
from enum import IntEnum
from multiprocessing import Process, Queue
from typing import Any


class CommandType(IntEnum):
    THANKS_FOR_HELPING = 0
    WHATS_YOUR_NUMBER = 1
    HOLD_THIS = 2


@dataclass(frozen=True)
class Command:
    header: CommandType
    extra: Any = None


@dataclass(frozen=True)
class Student:
    name: str
    commands_input: Queue

    def say(self, message: str):
        print(f"[{self.name}]  {message}")

    def instructions(
        self,
    ):
        held = None

        while True:
            # wait until further instructions
            command = self.commands_input.get()

            # student is told that we're finished with the demonstration
            if command.header == CommandType.THANKS_FOR_HELPING:
                self.say("You're welcome!")
                break

            # student is asked what their number is
            if command.header == CommandType.WHATS_YOUR_NUMBER:
                if held is None:
                    self.say("I'm not holding anything.")
                else:
                    self.say(f"I have a {held}")

            # student is asked to take this new number
            elif command.header == CommandType.HOLD_THIS:
                if held is None:
                    self.say(f"A {command.extra}? Got it.")
                    held = command.extra
                else:
                    self.say(f"A {command.extra}? I'll just drop this {held}.")
                    held = command.extra


def command_student(student: Student, command: Command):
    print(
        f"[Teacher]  {student.name}, {command.header.name} {'' if command.extra is None else command.extra}"
    )
    student.commands_input.put(command)


if __name__ == "__main__":
    # these students volunteered
    students = {
        name: Student(
            name=name, commands_input=Queue(),
        )
        for name in ["Alice", "Bob", "Charlie"]
    }
    processes = {}

    try:
        for student in students.values():
            # give each student their instructions. Everyone gets the same instructions.
            process = Process(target=student.instructions)
            process.start()

            processes[student.name] = process

        # ======================================
        #   demo begins
        #   note that if this was a function, you could use the @contextmanager
        #   decorator and yield here so that these commands could be made
        #   dynamically

        # give out numbers and ask Charlie what his is.
        command_student(students["Alice"], Command(CommandType.HOLD_THIS, 4))
        command_student(students["Bob"], Command(CommandType.HOLD_THIS, 5))
        command_student(
            students["Charlie"], Command(CommandType.WHATS_YOUR_NUMBER)
        )
        time.sleep(2)

        # give Alice a new number
        command_student(students["Alice"], Command(CommandType.HOLD_THIS, 3))
        time.sleep(2)

        print("[Teacher]  And that's about it!")

    finally:
        # wrap up up
        for student_name, process in processes.items():
            command_student(
                students[student_name], Command(CommandType.THANKS_FOR_HELPING)
            )
            process.join()


Overwriting demos/1_queue.py


In [37]:
!python demos/1_queue.py

[Teacher]  Alice, HOLD_THIS 4
[Teacher]  Bob, HOLD_THIS 5
[Teacher]  Charlie, WHATS_YOUR_NUMBER 
[Alice]  A 4? Got it.
[Bob]  A 5? Got it.
[Charlie]  I'm not holding anything.
[Teacher]  Alice, HOLD_THIS 3
[Alice]  A 3? I'll just drop this 4.
[Teacher]  And that's about it!
[Teacher]  Alice, THANKS_FOR_HELPING 
[Alice]  You're welcome!
[Teacher]  Bob, THANKS_FOR_HELPING 
[Bob]  You're welcome!
[Teacher]  Charlie, THANKS_FOR_HELPING 
[Charlie]  You're welcome!


## Example 2: With a Context Manager

While not dependent on the context manager, we made a change by including a `return_queue` method, and replacing `print` with an insertion of the message into that queue.
```python
class Student:
    ...
    return_queue: Queue
    
    def say(self, message: str):
        self.return_queue.put(f"[{self.name}]  {message}")
```

For the main change, using [`contextlib`'s](https://docs.python.org/3/library/contextlib.html) `context_manager` decorator, we converted the method
```python
@contextmanager
def demo_context(audience_queue, student_iter: Collection[Student]):
    processes = []
    for student in student_iter:
        process = Process(target=student.instructions)
        process.start()

        processes.append(process)

    yield

    print("[Teacher]  And that's about it!")
    for student in student_iter:
        command_student(student, Command(CommandType.THANKS_FOR_HELPING))

    wait_for_responses(audience_queue, len(students))

    for process in processes:
        process.join()
```
into a context manager (entered with `with`). Note that you can make a class into a context manager by implementing the [`__enter__`](https://docs.python.org/3/reference/datamodel.html#object.__enter__) and [`__exit__`](https://docs.python.org/3/reference/datamodel.html#object.__exit__) dunder methods. That is the purpose of
```python
    def __enter__(self):
        # =================
        #    populate me
        # =================
        raise NotImplementedError
        # =================
        #  end populate me
        # =================
        return self

    def __exit__(self, type, value, traceback):
        # =================
        #    populate me
        # =================
        raise NotImplementedError
        # =================
        #  end populate me
        # =================
```
in the provided `DistributedDomain` skeleton.

In [38]:
%%writefile demos/2_context.py
from collections.abc import Collection
from contextlib import contextmanager
from dataclasses import dataclass
from enum import IntEnum
from multiprocessing import Process, Queue
from queue import Empty as QueueEmpty
from typing import Any


class CommandType(IntEnum):
    THANKS_FOR_HELPING = 0
    WHATS_YOUR_NUMBER = 1
    HOLD_THIS = 2


@dataclass(frozen=True)
class Command:
    header: CommandType
    extra: Any = None


@dataclass(frozen=True)
class Student:
    name: str
    commands_input: Queue
    return_queue: Queue

    def say(self, message: str):
        self.return_queue.put(f"[{self.name}]  {message}")

    def instructions(
        self,
    ):
        held = None

        while True:
            # wait until further instructions
            command = self.commands_input.get()

            # student is told that we're finished with the demonstration
            if command.header == CommandType.THANKS_FOR_HELPING:
                self.say("You're welcome!")
                break

            # student is asked what their number is
            if command.header == CommandType.WHATS_YOUR_NUMBER:
                if held is None:
                    self.say("I'm not holding anything.")
                else:
                    self.say(f"I have a {held}")

            # student is asked to take this new number
            elif command.header == CommandType.HOLD_THIS:
                if held is None:
                    self.say(f"A {command.extra}? Got it.")
                    held = command.extra
                else:
                    self.say(f"A {command.extra}? I'll just drop this {held}.")
                    held = command.extra


def command_student(student: Student, command: Command):
    print(
        f"[Teacher]  {student.name}, {command.header.name} {'' if command.extra is None else command.extra}"
    )
    student.commands_input.put(command)


def wait_for_responses(audience_queue: Queue):
    try:
        while True:
            # number of seconds until we conclude the queue is empty
            print(audience_queue.get(timeout=0.2))
    except QueueEmpty:
        # safely catch this error -- we emptied the queue
        ...



@contextmanager
def demo_context(audience_queue, student_iter: Collection[Student]):
    processes = []
    for student in student_iter:
        process = Process(target=student.instructions)
        process.start()

        processes.append(process)
    try:
        yield
    finally:
        print("[Teacher]  And that's about it!")
        for student in student_iter:
            command_student(student, Command(CommandType.THANKS_FOR_HELPING))

        wait_for_responses(audience_queue)

        for process in processes:
            process.join()


if __name__ == "__main__":
    audience_queue = Queue()
    # these students volunteered
    students = {
        name: Student(
            name=name, commands_input=Queue(), return_queue=audience_queue
        )
        for name in ["Alice", "Bob", "Charlie"]
    }
    processes = {}

    with demo_context(audience_queue, students.values()):
        command_student(students["Alice"], Command(CommandType.HOLD_THIS, 4))
        command_student(students["Bob"], Command(CommandType.HOLD_THIS, 5))

        wait_for_responses(audience_queue)

        command_student(
            students["Charlie"], Command(CommandType.WHATS_YOUR_NUMBER)
        )
        wait_for_responses(audience_queue)

        # give Alice a new number
        command_student(students["Alice"], Command(CommandType.HOLD_THIS, 3))
        wait_for_responses(audience_queue)

Overwriting demos/2_context.py


In [39]:
!python demos/2_context.py

[Teacher]  Alice, HOLD_THIS 4
[Teacher]  Bob, HOLD_THIS 5
[Alice]  A 4? Got it.
[Bob]  A 5? Got it.
[Teacher]  Charlie, WHATS_YOUR_NUMBER 
[Charlie]  I'm not holding anything.
[Teacher]  Alice, HOLD_THIS 3
[Alice]  A 3? I'll just drop this 4.
[Teacher]  And that's about it!
[Teacher]  Alice, THANKS_FOR_HELPING 
[Teacher]  Bob, THANKS_FOR_HELPING 
[Teacher]  Charlie, THANKS_FOR_HELPING 
[Alice]  You're welcome!
[Charlie]  You're welcome!
[Bob]  You're welcome!


## Example 3: Inter-Process Communication

You can use `SharedMemory` as mentioned in the [`multiprocessing` class material](https://se-for-sci.github.io/content/week11_omp/threading.html#multiprocessing-in-python) very straight-forwardly. You can also pass the `Connection`s given by [`Pipe`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Pipe) into a `Process`, just as we did for `Queue`s.

In this example, we use `SharedMemory` for each student's held number so that they can put one in each other's hands. To prevent overwriting values, We employ a `Barrier` to allow each student to ready up.

In [40]:
%%writefile demos/3_shared_memory.py
from collections.abc import Collection
from contextlib import contextmanager
from dataclasses import dataclass
from enum import IntEnum
from multiprocessing import Barrier, Process, Queue, managers, shared_memory
from queue import Empty as QueueEmpty
from threading import Barrier as BarrierType
from typing import Any

import numpy as np


class CommandType(IntEnum):
    THANKS_FOR_HELPING = 0
    WHATS_YOUR_NUMBER = 1
    HOLD_THIS = 2
    PASS_IT_ON = 3
    YOU_PASS_TO = 4


@dataclass(frozen=True)
class Command:
    header: CommandType
    extra: Any = None


@dataclass(frozen=True)
class Student:
    name: str
    commands_input: Queue
    return_queue: Queue
    barrier: BarrierType

    def say(self, message: str):
        print(f"[{self.name}]  {message}")

    def instructions(
        self,
    ):
        # link a numpy array to a buffer
        shmem = shared_memory.SharedMemory(
            name=f"{self.name}'s hand",
            create=True,
            size=(np.array([0], dtype=np.int64)).nbytes,
        )
        held = np.ndarray((1,), dtype=np.int64, buffer=shmem.buf)

        # -1 will replace our `None`
        held[0] = -1

        # this is the student who will get passed to next.
        next_student: str | None = None

        while True:
            # wait until further instructions
            command = self.commands_input.get()

            # student is told that we're finished with the demonstration
            if command.header == CommandType.THANKS_FOR_HELPING:
                del held
                shmem.close()
                shmem.unlink()
                self.say("You're welcome!")
                break

            # student is asked what their number is
            if command.header == CommandType.WHATS_YOUR_NUMBER:
                if held[0] == -1:
                    self.say("I'm not holding anything.")
                else:
                    self.say(f"I have a {held[0]:d}")

            # student is asked to take this new number
            elif command.header == CommandType.HOLD_THIS:
                if held[0] == -1:
                    self.say(f"A {command.extra}? Got it.")
                    held[0] = command.extra
                else:
                    self.say(
                        f"A {command.extra}? I'll just drop this {held[0]:d}."
                    )
                    held[0] = command.extra

            # student is told who they should be passing to
            elif command.header == CommandType.YOU_PASS_TO:
                next_student = command.extra
                self.say(f"I pass to {next_student}? Got it.")

            # student is asked to pass their number on
            elif command.header == CommandType.PASS_IT_ON:
                # store value so it's not overwritten
                readied = held.copy()

                # we've readied, so it's not held anymore?
                # the analogy breaks down a little, here
                held[0] = -1

                self.say("Ready!")
                self.barrier.wait()
                self.say("Go!")
                if readied[0] != -1 and next_student is not None:
                    next_held = shared_memory.SharedMemory(
                        name=f"{next_student}'s hand"
                    )
                    self.say(f"Here, {next_student}!")
                    next_held.buf[:] = readied.tobytes()
                    next_held.close()
                else:
                    self.say("...")


def command_student(student: Student, command: Command):
    print(
        f"[Teacher]  {student.name}, {command.header.name} {'' if command.extra is None else command.extra}"
    )
    student.commands_input.put(command)


def command_all_students(students: Collection[Student], command: Command):
    print(
        f"[Teacher]  Everyone, {command.header.name} {'' if command.extra is None else command.extra}"
    )
    for student in students:
        student.commands_input.put(command)


def wait_for_responses(audience_queue: Queue):
    try:
        while True:
            # number of seconds until we conclude the queue is empty
            print(audience_queue.get(timeout=0.2))
    except QueueEmpty:
        # safely catch this error -- we emptied the queue
        ...


@contextmanager
def demo_context(audience_queue, student_iter: Collection[Student]):
    processes = []

    # https://docs.python.org/3/library/multiprocessing.shared_memory.html#multiprocessing.shared_memory.SharedMemory
    # SharedMemoryManager will auto-unlink anything that we forgot. Might as well.
    with managers.SharedMemoryManager():
        for student in student_iter:
            process = Process(target=student.instructions)
            process.start()

            processes.append(process)
        try:
            yield
        finally:
            print("[Teacher]  And that's about it!")
            for student in student_iter:
                command_student(
                    student, Command(CommandType.THANKS_FOR_HELPING)
                )

            wait_for_responses(audience_queue)

            for process in processes:
                process.join()


if __name__ == "__main__":
    audience_queue = Queue()
    student_names = ["Alice", "Bob", "Charlie"]

    student_barrier = Barrier(parties=len(student_names))
    # these students volunteered
    students = {
        name: Student(
            name=name,
            commands_input=Queue(),
            return_queue=audience_queue,
            barrier=student_barrier,
        )
        for name in student_names
    }
    processes = {}

    with demo_context(audience_queue, students.values()):
        command_student(students["Alice"], Command(CommandType.HOLD_THIS, 4))
        command_student(students["Bob"], Command(CommandType.HOLD_THIS, 5))
        command_student(students["Charlie"], Command(CommandType.HOLD_THIS, 6))

        wait_for_responses(audience_queue)

        command_student(
            students["Alice"], Command(CommandType.YOU_PASS_TO, "Bob")
        )
        command_student(
            students["Bob"], Command(CommandType.YOU_PASS_TO, "Charlie")
        )
        wait_for_responses(audience_queue)

        command_all_students(students.values(), Command(CommandType.PASS_IT_ON))
        wait_for_responses(audience_queue)
        command_all_students(
            students.values(), Command(CommandType.WHATS_YOUR_NUMBER)
        )
        wait_for_responses(audience_queue)

        command_student(
            students["Charlie"], Command(CommandType.YOU_PASS_TO, "Alice")
        )
        wait_for_responses(audience_queue)

        command_all_students(students.values(), Command(CommandType.PASS_IT_ON))
        wait_for_responses(audience_queue)

        command_all_students(
            students.values(), Command(CommandType.WHATS_YOUR_NUMBER)
        )
        wait_for_responses(audience_queue)


Overwriting demos/3_shared_memory.py


In [41]:
!python demos/3_shared_memory.py

[Teacher]  Alice, HOLD_THIS 4
[Teacher]  Bob, HOLD_THIS 5
[Teacher]  Charlie, HOLD_THIS 6
[Alice]  A 4? Got it.
[Bob]  A 5? Got it.
[Charlie]  A 6? Got it.
[Teacher]  Alice, YOU_PASS_TO Bob
[Teacher]  Bob, YOU_PASS_TO Charlie
[Alice]  I pass to Bob? Got it.
[Bob]  I pass to Charlie? Got it.
[Teacher]  Everyone, PASS_IT_ON 
[Alice]  Ready!
[Charlie]  Ready!
[Bob]  Ready!
[Bob]  Go!
[Alice]  Go!
[Charlie]  Go!
[Charlie]  ...
[Bob]  Here, Charlie![Alice]  Here, Bob!

[Teacher]  Everyone, WHATS_YOUR_NUMBER 
[Charlie]  I have a 5
[Bob]  I have a 4
[Alice]  I'm not holding anything.
[Teacher]  Charlie, YOU_PASS_TO Alice
[Charlie]  I pass to Alice? Got it.
[Teacher]  Everyone, PASS_IT_ON 
[Charlie]  Ready!
[Alice]  Ready!
[Bob]  Ready!
[Bob]  Go!
[Charlie]  Go!
[Alice]  Go!
[Alice]  ...
[Bob]  Here, Charlie!
[Charlie]  Here, Alice!
[Teacher]  Everyone, WHATS_YOUR_NUMBER 
[Charlie]  I have a 4
[Bob]  I'm not holding anything.
[Alice]  I have a 5
[Teacher]  And that's about it!
[Teacher]  Alice