Hello, today we are exploring async and concurrent processing in Python.

# Distributed Job Queue

First, we'll implement a toy distributed job queue in several ways.

This is to say, we'll implement:

- a queue to which "jobs" can be submitted and from which "jobs" can be picked up;
- a handful of submitters;
- a handful of executors.

## asyncio

The stdlib library `asyncio` is, as it turns out, perfect for this.

In [1]:
import asyncio

queue = asyncio.Queue(maxsize=10)

Let's define a class that submits jobs to this queue on a set cadence.

For simplicity's sake, let's say that "jobs" are simply integers, and that "executing" said job simply entails printing the value.

In [2]:
from datetime import timedelta
from typing import Optional

import uuid
import logging

logging.getLogger().setLevel(logging.INFO)

class PeriodicSubmitter:
    def __init__(
        self, 
        queue: asyncio.Queue, 
        *,
        cadence: timedelta = timedelta(seconds=2), 
        timeout: Optional[timedelta] = timedelta(seconds=10),
        value: int = 100,
        n: int = 20,
    ):
        self.queue = queue
        self.cadence = cadence
        self.timeout = timeout
        self.value = value
        
        self.id: uuid.UUID = uuid.uuid4()

        self.n = n

        self.task = None
        
        logging.info("Created submitter %s submitting value %s", self.id, self.value)

    async def put(self) -> None:
        try:
            logging.info("Submitter %s putting %s", self.id, self.value)
            
            await asyncio.wait_for(
                self.queue.put(self.value), 
                timeout=self.timeout.total_seconds() if self.timeout else None,
            )
        except asyncio.TimeoutError:
            logging.info("Submitter %s timed out; dropping", self.id)
            return

        logging.info("Submitter %s successfully put %s", self.id, self.value)

    def start(self):
        self.task = asyncio.create_task(self._run())

    async def _run(self) -> None:
        for _ in range(self.n):
            await self.put()
            asyncio.sleep(2)

In [None]:
p = PeriodicSubmitter(queue, timeout=None)
p.start()

INFO:root:Created submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 submitting value 100


INFO:root:Submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 putting 100
INFO:root:Submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 successfully put 100
  asyncio.sleep(2)
INFO:root:Submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 putting 100
INFO:root:Submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 successfully put 100
INFO:root:Submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 putting 100
INFO:root:Submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 successfully put 100
INFO:root:Submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 putting 100
INFO:root:Submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 successfully put 100
INFO:root:Submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 putting 100
INFO:root:Submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 successfully put 100
INFO:root:Submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 putting 100
INFO:root:Submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 successfully put 100
INFO:root:Submitter fc20cd9d-5cfc-49ab-a09e-0424b97b4891 putting 100
INFO:root:Submitter fc20cd9d-5

In [5]:
class GreedyProcessor:
    def __init__(
        self,
        queue,
    ):
        self.queue = queue
        self.id: uuid.UUID = uuid.uuid4()

        logging.info("Created processor %s", self.id)

        self.task = None

    async def get(self):
        val = await self.queue.get()
        logging.info("Processor %s got value %s", self.id, val)

    def start(self):
        self.task = asyncio.create_task(self._run_forever())
        return self.task

    async def _run_forever(self):
        while True:
            await self.get()

In [6]:
processor = GreedyProcessor(queue)

INFO:root:Created processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25


In [8]:
processor.start()

<Task pending name='Task-6' coro=<GreedyProcessor._run_forever() running at /var/folders/14/sqncpjld5kv7qvfzb1tpds680000gn/T/ipykernel_44201/1050128745.py:21>>

INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:root:Processor b04884b4-2b22-46fe-a7e7-f9d4ca49ce25 got value 100
INFO:r

In [9]:
queue.qsize()

0