# Pipeline

In [1]:
__author__ = 'Nathaniel Starkman'
__version__ = 'Apr 22, 2020'

**Pipeline Function**

- decorate a function by call, or pie syntax.
- look exactly like original function
- can log function I/O
- changes function docstring to say that it is a pipeline function
- changes function signature to document
- decorator existence be disabled like with `wrapt`'s enabled argument.
- function's output (if decorator enabled):
    - puts function output into dictionary (or some key-accessible container)
    - knows output variable names or constructs names, for the output dictionary
    - gives all variable names unique hashes so can refer to output of function called many times
    - has output defaults for the storage dict, if a function doesn't always return a value (ex return x if y is None else return x, y)
    - can accept a hook function that mediates from function output to the storage dictionary  
      this is done so that more simple functions can be decorated.
      this should actually be a registry, so can specify different hooks to go to different functions
      like decorate(func1, out_hooks={func2: hook1to2, func2: hook1to3})
      note hook function can be used for modifying values. both good and dangerous.
    - somehow deal with fact that functions can return very different things. Maybe delegate to hook function.
    - note hook functions can themselves be pipelines.

- function's input (if decorator enabled):
    - introspects function signature
    - use a bind_partial so can have defaults that don't need to pass.
    - can accept a hook function that mediates from previous function output storage dictionary  
      this is done so that more simple functions can be decorated.
      this should actually be a registry, so can specify different hooks to go to different functions
      like decorate(func3, in_hooks={func1: hook3from1, func2: hook3from2})
      note hook function can be used for modifying values. both good and dangerous.


**Pipeline**

The actual pipeline of functions.

- accept graph of pipeline functions, feeding from one to other.
    - either by constructing step by step  
      - one idea
      ```pipe = Pipeline()
      rgnum1 = pipe.startswith(func1)  # returns unique hash for func1
      rgnum2 = pipe.connect(func1, func2)  # can only use func1 as name if never used again, otherwise use unique registration number
      rgnum3, rgnum4 = pipe.connect(rgnum2, [func3, func4])  # will pass output of func2 to both
      pipe.connect(rgnum3, func5)
      pipe.connect(rgnum4, func5, output_hook=lambda: `change variable names`)
      pipe.endswith(func5)
      ```
      - another idea
      ```
      pipe = Pipeline(functions=[func1, (func2, 2), func3], startswith=func1, endswith=func3)  # tuple for number times used. different regnums
      rgnum1, (rgnum2_1, rgnum2_2), rgnum3 = pipes.rgnums
      pipe.connect(rgnum1, rgnum2_1)
      pipe.connect(rgnum2_1, rgnum3)
      ```
    - or by a .ini file
    - or by a graphviz diagram mini-language string
- acts like a function, calling it starts at beginning and progresses through
- resolves call order, so can do something like
  ```
  func1 -> func2 -> func3 -> func4 --> func6
       |                            |
        -> func5 -------------------
  ```

<br><br>

- - - 


## Prepare

### Imports

In [2]:
import wrapt
import os
import typing as T

from utilipy.utils import functools, inspect

from utilipy import ipython
ipython.set_autoreload(2)

from utilipy.pipeline import PipelineNode

set autoreload to 1
set autoreload to 2


In [3]:

outputs = inspect.FullerSignature(
    parameters=[
        inspect.Parameter(name='z', kind=inspect.POSITIONAL_OR_KEYWORD),
        inspect.Parameter(name='test', kind=inspect.POSITIONAL_OR_KEYWORD, default='TEST')
    ],
    return_annotation=f"output."
)

def input_changer(sig):  # doesn't apply b/c first !
    sig = sig.modify_parameter('x', annotation='CHANGED')
    return sig
# /def
    

@PipelineNode.decorator(
    outputs=outputs, input_defaults={'x': 1, 'y': 3},
    input_hook=input_changer, enabled=True
)
def test_function(x, y, _return_test=False):
    """Hello"""
    if _return_test:
        return x + y, 'new val'
    return x + y

out = test_function.run(None, x=2)

def input_changer(sig):  # doesn't apply b/c first !
    sig = sig.modify_parameter('test', annotation='CHANGED Test')
    return sig
# /def

def output_changer(sig):
    sig = sig.modify_parameter('z', annotation='CHANGED')
    return sig
# /def

@PipelineNode.decorator(
    input_defaults={'z': -1}, outputs=['z'],
    input_hook=input_changer, output_hook=output_changer,
    carry_through_unused=True
)
def test_function2(z):
    return z +2 

test_function2.run(out)

<FullerSignature (z: 'CHANGED' = 7, test: 'CHANGED Test' = 'TEST') -> '300928423'>

In [4]:
test_function._hash

300928342

<br><br>

- - - 


In [5]:
class Pipeline:
    pass

class SequentialPipeline(Pipeline):
    """A simplified pipeline."""
    
    def __init__(self, order=[]):
        """

        Parameters
        ----------
        order:
            the order of the functions.
            
            
        Notes
        -----
        unique hash for functions by hashing their internal hash + order connection added
        this allows a function to be connected to many things and have a unique hash

        """
        self._funcs = {}
        self._order = []
        
        for i, func in enumerate(order):
            key = hash((func._hash, i))
            self._funcs[key] = func

            self._order.append(key)
          
        return

    # /def
    
    def run(self, signature, **kwargs):
        """Run as pipeline function.
        
        Parameters
        ----------
        signature : :class:`~utilipy.utils.inspect.Signature`, optional
            if included, ignores `kwargs`
        kwargs : Any
            key-word arguments
            used to modify signature.
            deprecate (?)
        
        Returns
        -------
        :class:`~utilipy.utils.inspect.FullerSignature`

        """
        out = self._funcs[self._order[0]].run(signature, **kwargs)
        
        for key in self._order[1:]:
            out = self._funcs[key].run(out)
            
        return out

    # /def

# /class

In [6]:
pipe = SequentialPipeline([test_function, *(test_function2,)*3])

pipe.run(None, x=0, y=0)

<FullerSignature (z: 'CHANGED' = 6, test: 'CHANGED Test' = 'TEST') -> '300928423'>

<br><br>

- - - 
- - - 

<span style='font-size:40px;font-weight:650'>
    END
</span>

In [7]:
out

<FullerSignature (z=5, test='TEST') -> 'output.'>