Skip to content

Processor

Ahaan Dabholkar edited this page Jul 12, 2022 · 19 revisions

class Processor

This class represents the nodes of the pipeline. Each Processor instance is tasked with performing an operation. An operation can be anything like -

  1. Reading input data from a source. (see InputProcessor)
  2. Processing i.e. applying a coroutine to an input element.
  3. Providing outputs for the pipeline.
                           ┌-----------------------[Processor]---------------------------┐     
 ┌-----------┐       ┌-----└---┐                                                    ┌----┘----┐
 | input_src |  -->  |         |      ┌-------------┐         ┌--------------┐      |         |
 └-----------┘       |  input  | ---> | input_queue |         | output_queue | ---> | output  |
                     | handler |      └-------------┘         └--------------┘      | handler |
 ┌-----------┐       |         |        |     ┌----------------┐    ^               |         |
 | input_src |  -->  |         |        └---> | processor_coro | ---┘               |         |
 └-----------┘       └----┐----┘              └----------------┘    ┌-------------┐ └----┐----┘
                          |                                         | accumulator |      |     
                          └-----------------------------------------└-------------┘------┘     

Attributes

_name : str

Contains the name string for that particular node

_uuid : str

A unique identifier for that node.

_input_queue : asyncio.Queue

The input Queue for the node. This queue contains tuples of the form (input1, input2, ...) with each input coming from the output of one of the input sources.

_output_queue : asyncio.Queue

The output Queue for the node. This queue contains the output of the _processor_coro coroutine.

_processor_coro : coroutine object

The associated coroutine with this Processor instance. This is the coroutine that does the actual processing expected of that node. The function signature of this coroutine is -

async def coro(self:Processor, q_elt:tuple, *args, **kwargs):
    ...
    return *something*

self is the Processor instance coro is running on and q_elt is the tuple popped off the _input_queue. coro can also be defined as a @classmethod to group related coroutines together as in the demo

In which case, the signature becomes -

@classmethod
async def coro(cls, self:Processor, q_elt:tuple, *args, **kwargs)

The signature is also different for instances of class InputProcessor where q_elt:tuple is replaced by q_out:asyncio.Queue which is a reference to the output_queue. The difference between Processor._process and InputProcessor._process is in the way they expect self._processor_coro to behave. This is evident in their function signature -

# Processor._processor_coro
async def coro(self, q_element, *args, **kwargs)

# InputProcessor._processor_coro
async def coro(self, output_queue, *args, **kwargs)

That is, Processor._processor_coro works with input data while InputProcessor._processor_coro needs a reference to the ouput_queue to fill it up.


_output_accumulator : list

An empty list to be used for persisting values across calls? No specific use defined

_input_srcs : list

A list of asyncio.Queues representing the input sources for that particular node. These queues are set up by the Plumber that is instrumenting the pipeline and are used for passing data between processors.

_input_srcs is used by the _input_handler to populate the _input_queue of the instance.

_output_dests : list

Similar to _input_srcs, this is a list of asyncio.Queues representing the output destinations for that particular node. The _output_handler is responsible for populating these queues with copies of the output of _process

_env_vars : dict

This is a dict passed by the Plumber instrumenting the pipeline to each Processor node, containing "environment variables" i.e. variables from contexts that are not directly visible to the Processor member functions/variables.


Methods

__init__(...)

def __init__(self, name:str = None, input_queue:asyncio.Queue = None,
             output_queue:asyncio.Queue = None, coro = None,
             input_srcs:list = None, output_dests:list = None,
             env_vars:dict = None, 
             *args, **kwargs):
    ...

The __init__ constructor sets the values of the attributes based on the arguments provided and sets up 3 tasks -

  1. _input_handler_task : _input_handler coroutine
  2. _processor_task : _processor coroutine
  3. _output_handler_task : _output_handler coroutine

_input_handler(...)

async def _input_handler(self, input_src:list=None):
    ...

_input_handler waits on each of the queues in the input_srcs list and puts a tuple (elt1, elt2,...) into the input_queue of the object, where each element belongs to one of the queues from the input_srcs (in order of insertion)

When input_srcs is an empty list or None, the _input_handler_task stops.

_output_handler(...)

async def _output_handler(self, output_dest:list=None):
    ...

_output_handler pops an element off the output_queue and puts the same in each of the queues in output_dests.

When output_dests is an empty list or None, the _output_handler_task stops.

_process(...)

async def _process(self, *args, **kwargs):
    ...

_processor_task is the task that is responsible for the processing of input data in each node. _process waits on the _input_queue of the node and for each element is pops off the queue, it does the following-

  • Create a task _processor_task(input_element, *args, **kwargs) that takes in the element (input) and calls
await self._processor_coro(self, input_element, *args, **kwargs)

Note that,

  1. *args, **kwargs were passed in the Processor.__init__ call
  2. self._processor_coro needs to return a value which is put into the node's output_queue