Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: wait for floating Executor tasks #5004

Merged
merged 29 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
75a943e
feat: wait for hanging tasks
JoanFM Jul 11, 2022
fda8c5c
test: add tests for feat
JoanFM Jul 11, 2022
c5e2ccb
refactor: make class private
JoanFM Jul 11, 2022
2e6ea86
fix: fix deadlock
JoanFM Jul 11, 2022
2b154b6
fix: fix deadlock
JoanFM Jul 11, 2022
bbc0e3a
fix: fix deadlock with empty request iterator
JoanFM Jul 11, 2022
276d7e3
test: add new test in the floating deployments
JoanFM Jul 12, 2022
ff312a7
fix: fix deadlock hanging
JoanFM Jul 12, 2022
ce3dbce
Merge branch 'master' of https://github.com/jina-ai/jina into feat-fl…
JoanFM Jul 12, 2022
d1b60d4
refactor: rename some variables
JoanFM Jul 12, 2022
86c6ef5
feat: wait all floating tasks at gateway shutdown
JoanFM Jul 12, 2022
b18f5a7
Merge branch 'master' of https://github.com/jina-ai/jina into feat-fl…
JoanFM Jul 12, 2022
62b5250
test: add a failing test with floating exec
JoanFM Jul 12, 2022
77e9da3
feat: do not wait for endpoint discovery, just schedule task
JoanFM Jul 13, 2022
0b448d9
Merge branch 'master' of https://github.com/jina-ai/jina into feat-fl…
JoanFM Jul 14, 2022
509c9bd
fix: fix the async request iterator prefetching behavior
JoanFM Jul 14, 2022
8f07526
refactor: apply suggestions
JoanFM Jul 14, 2022
4c556a0
Merge branch 'master' of https://github.com/jina-ai/jina into feat-fl…
JoanFM Jul 14, 2022
f8b862e
Merge branch 'master' of https://github.com/jina-ai/jina into feat-fl…
JoanFM Jul 19, 2022
2e122a7
Merge branch 'master' of https://github.com/jina-ai/jina into feat-fl…
JoanFM Jul 19, 2022
14053dc
docs: document the floating executor feature
JoanFM Jul 19, 2022
777a114
test: assert floating receives
JoanFM Jul 19, 2022
f20811e
docs: improve feature documentation
JoanFM Jul 20, 2022
ff47714
fix: fix complex topologies
JoanFM Jul 20, 2022
85931b7
fix: fix complex topologies
JoanFM Jul 20, 2022
a74453f
docs: apply suggestions from code review
JoanFM Jul 20, 2022
628b1d3
docs: apply suggestions from code review
JoanFM Jul 20, 2022
9bad700
fix: fix deploy netlify error
JoanFM Jul 20, 2022
7fc28a4
Merge branch 'feat-floating-exec' of https://github.com/jina-ai/jina …
JoanFM Jul 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
114 changes: 114 additions & 0 deletions docs/fundamentals/flow/add-executors.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,120 @@ for further details
```


(floating-executors)=
### Floating Executors

Normally, all Executors form a pipeline that handles and transforms a given request until it is finally returned to the Client.

However, you can add Executors that do not feed their outputs back to the pipeline. Therefore, this output will not form the response for the Client.

This way of adding Executors in a Flow can be used for asynchronous backround tasks that may take some time and that are not needed for the response of the service you are building. For instance,
logging specific information in external services, storing partial results, etc.
JoanFM marked this conversation as resolved.
Show resolved Hide resolved

Those Executors are marked with the `floating` keyword when added to a `Flow`
JoanFM marked this conversation as resolved.
Show resolved Hide resolved

```python
import time
from jina import Flow, Executor, requests, DocumentArray


class FastChangingExecutor(Executor):
@requests()
def foo(self, docs, **kwargs):
for doc in docs:
doc.text = 'Hello World'


class SlowChangingExecutor(Executor):
@requests()
def foo(self, docs, **kwargs):
time.sleep(2)
print(f' Received {docs.texts}')
for doc in docs:
doc.text = 'Change the document but will not affect response'


f = (
Flow()
.add(name='executor0', uses=FastChangingExecutor)
.add(
name='floating_executor',
uses=SlowChangingExecutor,
needs=['gateway'],
floating=True,
)
)
with f:
f.post(on='/endpoint', inputs=DocumentArray.empty(1)) # we need to send a first
JohannesMessner marked this conversation as resolved.
Show resolved Hide resolved
start_time = time.time()
response = f.post(on='/endpoint', inputs=DocumentArray.empty(2))
end_time = time.time()
print(f' Response time took {end_time - start_time}s')
print(f' {response.texts}')
```

```text
Response time took 0.011997222900390625s
['Hello World', 'Hello World']
Received ['Hello World', 'Hello World']
```

In this example you can see how the response is returned without waiting for the `floating` Executor to complete. However, the Flow is not closed until
the request has been handled also by it.


You can plot the Flow and observe how the Executor is floating disconnected from the **Gateway**.

```{figure} flow_floating.svg
:width: 70%

```
A floating Executor can never come before a non-floating Executor in the {ref}`topology <flow-complex-topologies>` of your Flow.

This leads to the following behaviors:

- **Implicit reordering**: When adding a non-floating Executor after a floating Executor without specifying its `needs` parameter, the non-floating Executor is chained after the previous non-floating one.
```python
from jina import Flow

f = Flow().add().add(name='middle', floating=True).add()
f.plot()
```

```{figure} flow_middle_1.svg
:width: 70%

```

- **Chaining floating Executors**: If you want to chain more than one floating Executor, you need to add all of them with the `floating` flag, and explicitly specify the `needs` argument.

```python
from jina import Flow

f = Flow().add().add(name='middle', floating=True).add(needs=['middle'], floating=True)
f.plot()
```

```{figure} flow_chain_floating.svg
:width: 70%

```

- **Overriding of `floating` flag**: If you try to add a floating Executor as part of `needs` parameter of a non-floating Executor, then the floating Executor is not considered floating anymore.

```python
from jina import Flow

f = Flow().add().add(name='middle', floating=True).add(needs=['middle'])
f.plot()
```

```{figure} flow_cancel_floating.svg
:width: 70%

```


## Set configs
You can set and override {class}`~jina.Executor` configs when adding them into a {class}`~jina.Flow`.

Expand Down
1 change: 1 addition & 0 deletions docs/fundamentals/flow/flow_cancel_floating.svg
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.