# Basic feature explanation

## import libraries

In [21]:
import sys,os
sys.path.append(os.getcwd())
import utflow
import functools

import logging
logging.basicConfig(level=logging.DEBUG)

## Basic structure of task definition and execution.

### Step1: first run.

In [22]:
flow = utflow.TaskFlow()

with flow.next("1st") as task:
    with task.start() as _:
        _.x = 5
        _.y = 3
        print("1st: ", _.x, _.y)
        # context '_' is saved as pickle file in '.taskflow' directory after task execution.

with flow.next("2nd") as task:
    with task.start() as _:
        print("2nd: %s,%s"%(_.x, _.y))
        _.x = 10
        _.z = 20

with flow.next("last") as task:
    with task.start() as _:
        print("3rd: x=%s"%_.x)
        print("3rd: z=%s"%_.z)

DEBUG:utflow.task:Dump state of task '1st'
DEBUG:utflow.task:Dump state of task '2nd'
DEBUG:utflow.task:Dump state of task 'last'


1st:  5 3
2nd: 5,3
3rd: x=10
3rd: z=20


### Step2: Restart from '2nd' task

In [23]:
flow = utflow.TaskFlow(start="2nd")

with flow.next("1st") as task:
    with task.start() as _:
        _.x = 5
        _.y = 3
        print("1st: ", _.x, _.y)

with flow.next("2nd") as task:
    with task.start() as _:
        print("2nd: %s,%s"%(_.x, _.y))
        _.x = 10
        _.z = 20

with flow.next("last") as task:
    with task.start() as _:
        print("3rd: x=%s"%_.x)
        print("3rd: z=%s"%_.z)

DEBUG:utflow.taskflow: Skipped task '1st'
DEBUG:utflow.task:Load state of previous task '1st'
DEBUG:utflow.task:Dump state of task '2nd'
DEBUG:utflow.task:Dump state of task 'last'


2nd: 10,3
3rd: x=10
3rd: z=20


### Step3: restart from 'last' task 

In [24]:
flow = utflow.TaskFlow(start="last")

with flow.next("1st") as task:
    with task.start() as _:
        _.x = 5
        _.y = 3
        print("1st: ", _.x, _.y)

with flow.next("2nd") as task:
    with task.start() as _:
        print("2nd: %s,%s"%(_.x, _.y))
        _.x = 10
        _.z = 20

with flow.next("last") as task:
    with task.start() as _:
        print("3rd: x=%s"%_.x)
        print("3rd: z=%s"%_.z)

DEBUG:utflow.taskflow: Skipped task '1st'
DEBUG:utflow.taskflow: Skipped task '2nd'
DEBUG:utflow.task:Load state of previous task '2nd'
DEBUG:utflow.task:Dump state of task 'last'


3rd: x=10
3rd: z=20


### Step4: stop at '2nd' task
Only '1st' task is executed.

In [25]:
flow = utflow.TaskFlow(end="2nd")

with flow.next("1st") as task:
    with task.start() as _:
        _.x = 5
        _.y = 3
        print("1st: ", _.x, _.y)

with flow.next("2nd") as task:
    with task.start() as _:
        print("2nd: %s,%s"%(_.x, _.y))
        _.x = 10
        _.z = 20

with flow.next("last") as task:
    with task.start() as _:
        print("3rd: x=%s"%_.x)
        print("3rd: z=%s"%_.z)

DEBUG:utflow.task:Dump state of task '1st'
DEBUG:utflow.taskflow: Skipped task '2nd'
DEBUG:utflow.taskflow: Skipped task 'last'


1st:  5 3


## Conditional execution of Task

In [26]:
flow = utflow.TaskFlow()

with flow.next("1st") as task:
    with task.start() as _:
        _.x = 5
        _.y = 3
        print("1st: ", _.x, _.y)

# Task is executed if 'x' in context is less than 5.
with flow.next("2nd", when=lambda _: _.x < 5) as task:
    with task.start() as _:
        print("2nd -- x<5")
        print("2nd: %s,%s"%(_.x, _.y))
        _.x = 1
        _.z = 2

# Task is executed if 'x' in context is greater than or equal to 5
with flow.next("2nd", when=lambda _: _.x >=5) as task:
    with task.start() as _:
        print("2nd -- x>=5")
        print("2nd: %s,%s"%(_.x, _.y))
        _.x = 9
        _.z = 4

with flow.next("last") as task:
    with task.start() as _:
        print("3rd: x=%s"%_.x)
        print("3rd: z=%s"%_.z)

DEBUG:utflow.task:Dump state of task '1st'
DEBUG:utflow.task:Duplicate state of task '1st' as '2nd'
DEBUG:utflow.taskflow: Conditionally skipped task '2nd'
DEBUG:utflow.task:Dump state of task '2nd'
DEBUG:utflow.task:Dump state of task 'last'


1st:  5 3
2nd -- x>=5
2nd: 5,3
3rd: x=9
3rd: z=4


## Substitution of context using '<<' operator

In [27]:
kwargs = {"test1": 1, "test2": 2}
flow = utflow.TaskFlow()

state = utflow.Context()
state.xxx = "XXX"

with flow.next() as task:
    with task.start() as _:
        _ << kwargs << state
        print(_.test1)
        print(_.xxx)

DEBUG:utflow.task:Dump state of task '0'


1
XXX


## Pipe like interface for Context and functions using '>>' operator

In [28]:
flow = utflow.TaskFlow()

def hello(greeting, name):
    text = "%s, %s"%(greeting, name)
    return {"text": text, "text2": "%s-%s"%(text, text) }

def to_list(text):
    return [text]

def print_text(text):
    print(text)
    return text

with flow.next() as task:
    with task.start() as _:
        
        _.greeting = "Hello"
        _.name     = "world"
        
        # Passing context to function as arguments, and get result wrapped by context.
        c = _ >> hello
        print(c.text)
        print(c.text2)
        print()
        
        # If result of function call is dict, '>>' put all the keys to target context
        _ >> hello >> _
        print(_.text)
        print(_.text2)
        print()
        
        # Function can be piped. 
        # If result of function call is dict, '>>' operator select appropriate keys from dict, and put them as '**kwargs' in next function call.
        c = _ >> hello >> to_list
        
        # If result of function call is not dict, '>>' put result to '_' attribute.
        print(c._)
        print()
        
        # If result of function call is list, '>>' operator pass list as '*args' in next function call.
        _ >> hello >> to_list >> print_text
        print()
        
        # If result of cuntion call is not a list, nor dict, '>>' operator pass '_' variable as a first argument in next function call.        
        _ >> hello >> to_list >> print_text >> print_text

DEBUG:utflow.task:Dump state of task '0'


Hello, world
Hello, world-Hello, world

Hello, world
Hello, world-Hello, world

['Hello, world']

Hello, world

Hello, world
Hello, world


## Task definition by decorator and function
Instead of calling `with task.start()`, you can use `@flow.start_next()` decorator and function to define a task.

Defined function is called immediately.

### Step1: Start from first task

In [29]:
flow = utflow.TaskFlow()

@flow.start_next()
def test1(_):
    print("Assignment")
    _.a = "call_test"
    _.b = 100

@flow.start_next()
def test2(_):
    print(_.a)
    print(_.b)

DEBUG:utflow.task:Dump state of task 'test1'
DEBUG:utflow.task:Dump state of task 'test2'


Assignment
call_test
100


### Step2: Skipped test1

In [30]:
flow = utflow.TaskFlow(start="test2")

@flow.start_next()
def test1(_):
    print("Assignment")
    _.a = "call_test"
    _.b = 100

@flow.start_next()
def test2(_):
    print(_.a)
    print(_.b)

DEBUG:utflow.taskflow: Skipped task 'test1'
DEBUG:utflow.task:Load state of previous task 'test1'
DEBUG:utflow.task:Dump state of task 'test2'


call_test
100


## Partial binding of function parameters

In [31]:
import functools
from utflow.functools import partial_lazy

def hello(greeting, name):
    text = "%s, %s"%(greeting, name)
    return {"text": text, "text2": "%s-%s"%(text, text) }

def to_obj(text):
    return text

def test3(text, text2):
    print("test3: %s: %s"%(text, text2))
    
def test4(text1, text2):
    print("test4: %s: %s"%(text1, text2))

def endl():
    print()

flow = utflow.TaskFlow()

@flow.start_next()
def test1(_):
    _.greeting = "Hello"
    _.name = "world"
    
    # case1: 'text' and 'text2' are used as input arguments for test3 
    print("case1")
    _ >> hello >> test3 >> endl
    
    # case2: 'text' is given by 'functools.partial' as first positional argument.
    #        'text' field in the result of 'hello' is overwritten.
    print("case2: functools.partial")
    print("       bound positional(text), and additionaly set by kwargs (text, text2)")
    _ >> hello >> functools.partial(test3, ", is the greeting") >> endl
    
    # case3: 'text' and 'text2' are used as input arguments for test3.
    #        bound parameter of 'partial_lazy' is ignored.
    print("case3: partial_lazy")
    print("       bound positional(wildcard), and additionaly set by kwargs (text, text2)")
    print("       positional argument is ignored.")
    _ >> hello >> partial_lazy(test3, ", is the greeting") >> endl
    
    # case4: 'text' is given by 'functools.partial' as keyword argument. 
    #        'text2' is given as a second positional parameter args, which is passed from result of 'to_obj'
    print("case4: functools.partial, bound positional (text1)")
    _ >> hello >> to_obj >> functools.partial(test4, ", is the greeting") >> endl

    # case5: 'text1' is given as a first positional args, which is passed from result of 'to_obj',
    #        'text2' is given by 'functools.partial' as keyword argument. 
    print("case5: functools.partial, bound keyword (text2)")
    _ >> hello >> to_obj >> functools.partial(test4, text2=", is the greeting") >> endl

    # case6: non-named result of 'to_obj' is treated as a first positional args for test4,
    #        and 'text2' is given by 'partial_lazy' as the second positional argument.
    print("case6: partial_lazy, bound positional (wildcard)")
    print("       positional argument is interpreted as second positional variable")
    _ >> hello >> to_obj >> partial_lazy(test4, ", is the greeting") >> endl

    # case7: when using 'partial_lazy' with kwargs, result is same as case5 and case6.
    print("case7: partial_lazy, bound keyword (text2)")
    _ >> hello >> to_obj >> partial_lazy(test4, text2=", is the greeting") >> endl

DEBUG:utflow.task:Dump state of task 'test1'


case1
test3: Hello, world: Hello, world-Hello, world

case2: functools.partial
       bound positional(text), and additionaly set by kwargs (text, text2)
test3: , is the greeting: Hello, world-Hello, world

case3: partial_lazy
       bound positional(wildcard), and additionaly set by kwargs (text, text2)
       positional argument is ignored.
test3: Hello, world: Hello, world-Hello, world

case4: functools.partial, bound positional (text1)
test4: , is the greeting: Hello, world

case5: functools.partial, bound keyword (text2)
test4: Hello, world: , is the greeting

case6: partial_lazy, bound positional (wildcard)
       positional argument is interpreted as second positional variable
test4: Hello, world: , is the greeting

case7: partial_lazy, bound keyword (text2)
test4: Hello, world: , is the greeting



## Task definition by lambda

In [32]:
import functools
from utflow.functools import partial_lazy

def hello(greeting, name):
    text = "%s, %s"%(greeting, name)
    return {"text": text, "text2": "%s-%s"%(text, text) }

def to_obj(text):
    return text

def test3(text, text2):
    print("test3: %s: %s"%(text, text2))
    
def test4(text1, text2):
    print("test4: %s: %s"%(text1, text2))

def endl():
    print()

flow = utflow.TaskFlow()

flow.start_next("test1")(
    lambda _: _(greeting = "Goodby", name = "guys") >> hello >> test3 >> endl
)

flow.start_next("test2")(
    lambda _: _ >> hello >> to_obj >> partial_lazy(test4, ", is the greeting") >> endl
)

DEBUG:utflow.task:Dump state of task 'test1'
DEBUG:utflow.task:Dump state of task 'test2'


test3: Goodby, guys: Goodby, guys-Goodby, guys

test4: Goodby, guys: , is the greeting



<utflow.context.Context at 0x7f190c637550>

# Best practice

## Play with session related resource
If `context` object is passed on `TaskFlow` construction, that is treated as session Context.

All attributes of session Context are referred from `Context` object passed to task, however those value is not saved after task execution.

TCP connection, File IO, and other resources those belongs to process should be stored in session Context object.

### Step1: first run of the connection-managed tasks.

In [33]:
import tempfile
import os

class Session:
    def __init__(self):
        pass
    
    def login(self, credential):
        # Do some initialization to connect other services.
        self.credential = credential
        
    def login2(self, *args, **kwargs):
        self.path = tempfile.mktemp()
        self.connection = open(self.path,"w")
    
    def logout(self):
        os.remove(self.path)
    
    def play1(self, greeting, name):
        result = "%s %s"%(greeting, name)
        print("Test play1[%s]: %s"%(self.credential, result))
        return {"text": result, "text2": "<<<%s>>>"%result }
    
    def play2(self, text, text2):
        print("Test play2[%s]: %s, %s"%(self.credential, text, text2))
        return None

def endl():
    print()

flow = utflow.TaskFlow()


# Execute initializer task, which result is not stored.
# Resource related to process should be initialized here.
# You can use `flow.start_init()` for decorator-style task definition.

with flow.init() as task:
    with task.start() as _:
        _.session = Session()
        _.session.login("XXXXXX")


with flow.next("test1") as task:
    with task.start() as _:
        _(greeting="Good morning,", name = "sir") >> _.session.play1 >> _.session.play2 >> endl

with flow.next("test2") as task:
    with task.start() as _:
        _(greeting="Good afternoon,", name = "dear") >> _.session.play1 >> _.session.play2 >> endl


DEBUG:utflow.task:Dump state of task 'test1'
DEBUG:utflow.task:Dump state of task 'test2'


Test play1[XXXXXX]: Good morning, sir
Test play2[XXXXXX]: Good morning, sir, <<<Good morning, sir>>>

Test play1[XXXXXX]: Good afternoon, dear
Test play2[XXXXXX]: Good afternoon, dear, <<<Good afternoon, dear>>>



### Step2: restart from 'test2' task with another connections

In [34]:
flow = utflow.TaskFlow(start="test2")


# In second run of the proceess, another one-time token 'YYYYYY' is used.
with flow.init() as task:
    with task.start() as _:
        _.session = Session()
        _.session.login("YYYYYY")


with flow.next("test1") as task:
    with task.start() as _:
        _(greeting="Good morning,", name = "sir") >> _.session.play1 >> _.session.play2 >> endl

with flow.next("test2") as task:
    with task.start() as _:
        _(greeting="Good afternoon,", name = "dear") >> _.session.play1 >> _.session.play2 >> endl


DEBUG:utflow.taskflow: Skipped task 'test1'
DEBUG:utflow.task:Load state of previous task 'test1'
DEBUG:utflow.task:Dump state of task 'test2'


Test play1[YYYYYY]: Good afternoon, dear
Test play2[YYYYYY]: Good afternoon, dear, <<<Good afternoon, dear>>>



## Anti patterns

### Anti pattern 1: storing session object in the task.

`TypeError` exception is raised at the end of `test1` because file / connection cannnot be serialized.

In [15]:
flow = utflow.TaskFlow()
session = Session()
session.login2()

@flow.start_next()
def test1(_):
    _.session = session

@flow.start_next()
def test2(_):
    _.session.connection.write("test")

@flow.start_next()
def test3(_):
    _.session.logout()
    _.session.connection.write("test")

DEBUG:utflow.task:Dump state of task 'test1'


TypeError: cannot serialize '_io.TextIOWrapper' object