# TaskとFlow

In [1]:
# needs imports
from prefect import Task, task
from prefect import Flow
from prefect import Parameter


## Taskの定義
Taskの定義には関数とclassが利用できる

In [2]:
# prefect task used function
@task
def add(x: int, y: int) -> int:
    return x + y


In [3]:
# prefect task used class
class AddTask(Task):

    def __init__(self, default: int, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.default = default

    def run(self, x: int, y: int=None) -> int:
        if y is None:
            y = self.default
        return x + y

_add = AddTask(default=1)

## Flowの定義
Flowの定義には **Functional API** と **Imperative API** が利用できる

In [4]:
# [Functional API]

with Flow("My first flow!") as functionalFlow:
    first_result = add(1, 2)
    second_result = add(first_result, 100)


# Flowの実行
state = functionalFlow.run()
assert state.is_successful()

first_task_state = state.result[first_result]
assert first_task_state.is_successful()
print("first result: ", first_task_state.result)
assert first_task_state.result == 3


second_task_state = state.result[second_result]
assert second_task_state.is_successful()
print("second result: ", second_task_state.result)
assert second_task_state.result == 103

[2021-08-30 17:24:31+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'My first flow!'
[2021-08-30 17:24:31+0900] INFO - prefect.TaskRunner | Task 'add': Starting task run...
[2021-08-30 17:24:31+0900] INFO - prefect.TaskRunner | Task 'add': Finished task run for task with final state: 'Success'
[2021-08-30 17:24:31+0900] INFO - prefect.TaskRunner | Task 'add': Starting task run...
[2021-08-30 17:24:31+0900] INFO - prefect.TaskRunner | Task 'add': Finished task run for task with final state: 'Success'
[2021-08-30 17:24:31+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
first result:  3
second result:  103


In [5]:
# [Imperative API]

imperativeFlow = Flow("My imperative flow!")
second_add = add.copy()

# add our tasks to the flow
imperativeFlow.add_task(add)
imperativeFlow.add_task(second_add)

# create data bindings
add.bind(x=1, y=2, flow=imperativeFlow)
second_add.bind(x=add, y=100, flow=imperativeFlow)


# Flowの実行
state = imperativeFlow.run()
assert state.is_successful()

first_task_state = state.result[add]
assert first_task_state.is_successful()
print("first result: ", first_task_state.result)
assert first_task_state.result == 3

second_task_state = state.result[second_add]
assert second_task_state.is_successful()
print("second result: ", second_task_state.result)
assert second_task_state.result == 103


[2021-08-30 17:24:31+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'My imperative flow!'
[2021-08-30 17:24:31+0900] INFO - prefect.TaskRunner | Task 'add': Starting task run...
[2021-08-30 17:24:31+0900] INFO - prefect.TaskRunner | Task 'add': Finished task run for task with final state: 'Success'
[2021-08-30 17:24:31+0900] INFO - prefect.TaskRunner | Task 'add': Starting task run...
[2021-08-30 17:24:31+0900] INFO - prefect.TaskRunner | Task 'add': Finished task run for task with final state: 'Success'
[2021-08-30 17:24:31+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
first result:  3
second result:  103


## Parameterの定義
Flowを実行する際に渡すパラメータを定義することができる

In [6]:
# Taskの定義
@task
def say_hello(person: str) -> None:
    print("Hello, {}!".format(person))

# Flowの定義
with Flow("Say Hello!") as flow:
    # nameという名前でParameterを作成
    name = Parameter("name")
    say_hello(name)

# Flowの実行
flow.run(name="ryoasu")

[2021-08-30 17:24:32+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Say Hello!'
[2021-08-30 17:24:32+0900] INFO - prefect.TaskRunner | Task 'name': Starting task run...
[2021-08-30 17:24:32+0900] INFO - prefect.TaskRunner | Task 'name': Finished task run for task with final state: 'Success'
[2021-08-30 17:24:32+0900] INFO - prefect.TaskRunner | Task 'say_hello': Starting task run...
Hello, ryoasu!
[2021-08-30 17:24:32+0900] INFO - prefect.TaskRunner | Task 'say_hello': Finished task run for task with final state: 'Success'
[2021-08-30 17:24:32+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded


<Success: "All reference tasks succeeded.">

## localでPrefect Serverを利用してFlowを登録・実行する
serverをlocalで立ち上げて、Flowを登録してみる。
また、agentを起動してFlowの実行を試す

### local serverの起動してflowを登録

In [7]:
# backendとしてlocal serverを利用する (Prefect cloudを使う場合は`prefect backend cloud`)
!prefect backend server
# serverを起動 (Detached mode)
!prefect server start -d

[32mBackend switched to server[0m






[6BCreating network "prefect-server" with the default driver






[1B
                                            [34m[1mWELCOME TO[0m
  [1m
   _____  _____  ______ ______ ______ _____ _______    _____ ______ _______      ________ _____
  |  __ \|  __ \|  ____|  ____|  ____/ ____|__   __|  / ____|  ____|  __ \ \    / /  ____|  __ \
  | |__) | |__) | |__  | |__  | |__ | |       | |    | (___ | |__  | |__) \ \  / /| |__  | |__) |
  |  ___/|  _  /|  __| |  __| |  __|| |       | |     \___ \|  __| |  _  / \ \/ / |  __| |  _  /
  | |    | | \ \| |____| |    | |___| |____   | |     ____) | |____| | \ \  \  /  | |____| | \ \
  |_|    |_|  \_\______|_|    |______\_____|  |_|    |_____/|______|_|  \_\  \/   |______|_|  \_\

    [0m
   Visit [37m[44m[1mhttp://localhost:8080[0m to get started, or check out the docs at [37m[44m[1mhttps://docs.prefect.io[0m
    


http://localhost:8080 で起動しているサーバーのダッシュボードを確認できる
![img](../assets/images/image_1_1.jpg)

In [8]:
# projectの作成 
!prefect create project "tutorial"

[32mtutorial created[0m


In [9]:
# ↓ Prefect CLIを使ってFlowを登録する場合
# !prefect register --project tutorial -p <path to a file or a directory containing theflow(s)>

# Flowの登録
print(flow.serialized_hash())
flow.register(
    project_name="tutorial", 
    labels=["dev"], 
    add_default_labels=False, 
    idempotency_key=flow.serialized_hash() # 前回登録したFlowと同じhashだった場合、新たなバージョンとして登録されない
)

b91c95c873917abcd57fb9ebb835376807c157d56da839f79d7e1bb50970b6b8
Flow URL: http://localhost:8080/default/flow/3f07ac05-4fe3-4845-9628-a425d2c7113f
 └── ID: 308a0729-b7cb-4cd1-8497-513a3baba176
 └── Project: tutorial
 └── Labels: ['dev']


'308a0729-b7cb-4cd1-8497-513a3baba176'

UI上からもFlowが登録されていることがわかる
| Flow一覧ページ | Flow詳細ページ |
| :---: | :---: |
| ![img](../assets/images/image_1_2.jpg) | ![img](../assets/images/image_1_3.jpg) |


### agentの起動とFlowの実行
Prefect CLIを使ってFlowを実行するAgentの作成する
バックグラウンド実行ができないので、notebook外で以下を実行してagentを起動する
```sh
# devラベルがついているFlowはこのagentで実行して欲しいので同じlabelを付けている
$ prefect agent local start -l dev
```


Python APIを使う場合は以下の通り
```python
from prefect.agent.local import LocalAgent
LocalAgent(labels=["dev"]).start()
```

In [10]:
# CLIから登録したFlowを実行する (ref: https://docs.prefect.io/api/latest/cli/run.html)
!prefect run  --project tutorial -n "Say Hello!" --param name=ryoasu --run-name "say-hello2" --watch

Looking up flow metadata...[0m[32m Done[0m
Creating run for flow 'Say Hello!'...[0m[32m Done[0m
└── Name: say-hello2
└── UUID: acd9b7a1-aab5-45c6-8a63-f07c827ee2dc
└── Labels: ['dev']
└── Parameters: {'name': 'ryoasu'}
└── Context: {}
└── URL: http://localhost:8080/default/flow-run/acd9b7a1-aab5-45c6-8a63-f07c827ee2dc[0m
Watching flow run execution...[0m
[37m└── 08:17:31 | INFO    | Entered state <Scheduled>: Flow run scheduled.[0m
[37m└── 08:17:35 | INFO    | Entered state <Submitted>: Submitted for execution[0m
[37m└── 08:17:35 | INFO    | Submitted for execution: PID: 59970[0m
[37m└── 08:17:36 | INFO    | Beginning Flow run for 'Say Hello!'[0m
[37m└── 08:17:36 | INFO    | Entered state <Running>: Running flow.[0m
[37m└── 08:17:36 | INFO    | Task 'name': Starting task run...[0m
[37m└── 08:17:36 | INFO    | Task 'name': Finished task run for task with final state: 'Success'[0m
[37m└── 08:17:36 | INFO    | Task 'say_hello': Starting task run...[0m
[37m└── 08:1

Flowが実行されたことがUI上からもわかる
![img](../assets/images/image_1_4.jpg)