Skip to content

Plumber

Ahaan Dabholkar edited this page Jul 12, 2022 · 7 revisions

class Plumber

An instance of this class is responsible for actually creating the pipeline using Processor objects and the provided input graph. It sets up a series of producer-consumer relationships using asyncio.Queues between different Processor nodes as specified in the input graph.

Attributes

__liason_q_graph : list

This attribute is set by the Plumber._parse_input_graph method and holds the liason queue graph for the specified input_d. Liason Queues are the asyncio.Queues that build the producer-consumer relationships between the nodes of the graph. The structure is inspired by an adjacency graph -

As an example a dummy input_d and its resulting liason_q_graph is shown

input_d = \
{
    'nodes': {
        'n1': {'coro': ..., 'args': {...} },
        'n2': {'coro': ...},
        'n3': {'coro': ...},
        'n4': {'coro': ..., 'properties': {'aggregate_inputs': False }},
    },
    'graph': {
        'n1': ('n2', ),
        'n2': ('n3', 'n4'),
        'n3': ('n4', ),
        'n4': None,
    },
   
}
self.__liason_q_graph = \
[[None, <asyncio.Queue at 0xadd1>, None, None],
 [None, None, <asyncio.Queue at 0xadd2>, <asyncio.Queue at 0xadd3>],
 [None, None, None, <asyncio.Queue at 0xadd3>],
 [None, None, None, None]] 

Note, that when aggregate_inputs is set to False, the inputs to that node are treated mutually independent. For example, in the above graph the input queue of node n4 would look like -

# with aggregate_inputs: False         with aggregate_inputs: True

   ┌-------------┐                     ┌------------------------┐
   | (output n2) |                     | (output n2, output n3) |
   ┌-------------┐                     ┌------------------------┐
   | (output n3) |                     | (output n2, output n3) |
   ┌-------------┐                     └------------------------┘
   | (output n3) |                                           
   ┌-------------┐                                           
   | (output n2) |                                           
   └-------------┘                                           

__node_list : list

A list of Processor instances correspoding to each node.

__coro_map : function

Function that takes the input_d['nodes']['<node_name>']['coro'] as input and returns the appropriate coroutine as output.

__env_vars : dict

A dictionary containing variables that the caller of Plumber might want to pass to the Processor instances. This dict is set as the Processor._env_vars attribute of every node in the graph.

__input_d : dict

The input data structure containing the node descriptions and the connecting graph

input_d = \
{
    nodes: {...},
    graph: {...},
}

Each item in the nodes dict is in turn a dict describing the characteristics of that particular node. The following characteristics can be set for each node -

  1. coro ~> The value of this key is fed as input to the coro_map to point to the function associated with that particular node.
  2. args ~> This maps directly to the arguments of the Processor._processor coroutine.
  3. properties ~> This dict is used to manipulate the creation of the liason_q_graph. The only property currently supported is aggregate_inputs

Methods

__init__(...)

def __init__(self, input_d:dict=None, coro_map=None, env_vars:dict=None):
    ...

The Plumber class constructor takes three arguments -

  1. input_d ~> The input data structure containing node info and the graph for the pipeline
  2. coro_map ~> Function to map the input_d['nodes'][<'node_name'>]['coro'] specified in the input_d to actual coroutines
  3. env_vars ~> Dictionary for passing local scope variables to Processor instances.

The __init__ function also calls the _parse_input_graph function to create the liason_q_graph for the given input.

create_pipeline()

def create_pipeline():

Calls _create_pipline with __input_d['nodes'] and __liason_q_graph as the arguments

_create_pipeline(...)

def _create_pipeline(self, nodes_d:dict=None, liason_g:list=None):

Given the liason queue graph, this method instantiates the Processor / InputProcessor classes with the coroutine objects returned by __coro_map(nodes_d['<node_name>']['coro']) and its arguments provided by nodes_d['<node_name>']['args'].

The env_vars dict is also passed to each Processor / InputProcessor instance.

_parse_input_graph(...)

def _parse_input_graph(self, input_d:dict):

This method parses the input_d dict and builds the __liason_q_graph. The properties key of each input_d['nodes']['<node_name>'] is used here to manipulate the graph creation process.