In [1]:
from synthora.workflows import task
from synthora.workflows.base_task import BaseTask
from synthora.workflows.context.base import BaseContext
from synthora.workflows.scheduler.base import BaseScheduler
from synthora.workflows.scheduler.process_pool import ProcessPoolScheduler
from synthora.workflows.scheduler import ThreadPoolScheduler

workflow的组成：
scheduler调度器，task，context

task是基本执行单元，通常是一个函数
调度器会按照特定的策略，并行或串行调用task
context允许task在运行过程中读取，存储信息，同时实现循环，条件判断等高级功能

如何初始化task,两种方式

In [2]:
@task
def add1(x: int, y: int) -> int:
    return x + y


@task(name="add2")
def add2(x: int, y: int) -> int:
    return x + y

def _add3(x: int, y: int) -> int:
    return x + y

add3 = BaseTask(_add3, name="add3")

Task签名
task签名允许提前设置task参数，允许用户实现更复杂的功能

In [3]:
add1 = add1.s(1)
# or
# add1 = add1.signature(1)

add1(2) # will return 3

3

可以添加immutable参数，使得task忽略外部参数

In [4]:
try:
    add1(1, 2) # will raise an error
except TypeError as e:
    print(e)

add1() takes 2 positional arguments but 3 were given


In [5]:
add2 = add2.si(1, 2)
# or
# add2 = add2.signature(1, 2, immutable=True)
# or
# add2 = add2.signature_immutable(1, 2)
add2(3, 3) # will return 3, because the arguments are ignored

3

串行和并行

有两种添加串行任务的方法，第一种是使用函数

In [6]:



flow = (
    BaseScheduler()
    .add_task(BaseTask(_add3))
    .add_tasks(BaseTask(_add3).s(1), BaseTask(_add3).s(2))
)

flow也可以使用签名

In [7]:
flow = flow.s(1)
flow.run(1)

5

为什么会输出5？

这是因为workflow会将上一个task的返回值作为参数添加到下一个任务中

First Step: flow get input: (1, 1), 传入第一个task中
第一个task返回1 + 1 = 2
第二个task我们预先指定了输入为1，与上一个task的返回值2一起作为参数
第二个task返回2 + 1 = 3
同理，第三个task返回3 + 2 = 5


为了简化workflow的创建流程，你可以使用python运算符声明串行workflow

In [8]:
flow = BaseTask(_add3) >> BaseTask(_add3).s(1) >> BaseTask(_add3).s(2)

flow可以嵌套

In [9]:
flow = flow >> BaseTask(_add3).s(3)

并行任务同样有两种添加方式

In [10]:


flow = ThreadPoolScheduler().add_task_group([
    BaseTask(_add3).s(0),
    BaseTask(_add3).s(1),
    BaseTask(_add3).s(2),
    BaseTask(_add3).s(3),
])

In [11]:
flow.run(1)

[1, 2, 3, 4]

与串行任务不同，并行任务的输入参数会分别传入所有task

task1收到的参数是0, 1
task2收到的是1, 1
...

串行任务同样可以通过表达式声明

In [12]:
flow = BaseTask(_add3).s(0) | BaseTask(_add3).s(1) | BaseTask(_add3).s(2) | BaseTask(_add3).s(3)

并行和串行可以一起使用

In [13]:
flow = (BaseTask(_add3) | BaseTask(_add3)) >> BaseTask(_add3)
flow.run(1, 2)

6

BaseTask(_add3) | BaseTask(_add3) 都会接收到1, 2作为参数，返回1 + 2 = 3
最后一个任务会接受两个3作为输入，返回3 + 3 = 6

为了实现workflow的高级用法，我们引入了context允许跨task的数据管理。
要使用context，请将其作为函数的第一个参数并标注类型

In [14]:



def add(ctx: BaseContext, x: int, y: int) -> int:
    with ctx:
        print(ctx.get_state(f"{x + y}"))
        if "ans" not in ctx:
            ctx["ans"] = [x + y]
        else:
            ctx["ans"] = ctx["ans"] + [x + y]
    return x + y


flow = ProcessPoolScheduler()
(flow | BaseTask(add, "2").s(1) | BaseTask(add, "3").s(2) | BaseTask(add, "4").s(3))


print(flow.run(1), flow.get_context()["ans"])

TaskState.RUNNING
TaskState.RUNNING
TaskState.RUNNING
[2, 3, 4] [2, 3, 4]


您可以使用任何Scheduler，Synthroa会根据scheduler类型自动选择匹配的context

下面让我们使用context完成高级功能，例如loop和if

In [15]:
def add(ctx: BaseContext) -> int:
    with ctx:
        a = ctx.get("a", 1)
        b = ctx.get("b", 1)
        cnt = ctx.get("cnt", 0)
        print(f"This is {cnt}th time")
        ctx["cnt"] = cnt + 1
        ctx["ans"] = a + b
        ctx["a"] = a + 1
        ctx["b"] = b + 1

        if a + b < 5:
            ctx.set_cursor(-1)
    return int(a + b)


flow = BaseTask(add) >> BaseTask(add).si()


print(flow.run(), flow.get_context()["ans"])

This is 0th time
This is 1th time
This is 2th time
This is 3th time
8 8


In [16]:
from typing import Any, List
from synthora.types.enums import TaskState

@task(name="skip")
def task1(a:int, b: int) -> int:
    return a + b


@task
def task2(a: int, b: int) -> int:
    print("hello", a, b)
    return a + b

@task
def task0() -> List[int]:
    print("task0")
    return [1, 2]


@task
def bratch(ctx: BaseContext, previous: Any) -> int:
    print("bratch", previous)
    with ctx:
        ctx.set_state("skip", TaskState.SKIPPED)
    
    return previous

In [17]:
bratch.flat_result = True
flow = task0 >> bratch >> (task1 | task2)
flow.run()

task0
bratch [1, 2]
hello 1 2


[3, 3]

这里我们使用task的name，在bratch中手动跳过了task1的执行