Skip to content

Commit

Permalink
Merge branch 'master' into feat-real-delete-kv
Browse files Browse the repository at this point in the history
  • Loading branch information
cristianmtr committed Mar 2, 2021
2 parents c0556eb + ab537dc commit c2e896a
Show file tree
Hide file tree
Showing 54 changed files with 481 additions and 370 deletions.
2 changes: 1 addition & 1 deletion .github/i18n/README.de.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ from jina.flow import Flow
f = Flow.load_config('index.yml')

with f:
f.index(input_fn)
f.index(inputs)
```

</td>
Expand Down
2 changes: 1 addition & 1 deletion .github/i18n/README.fr.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ from jina.flow import Flow
f = Flow.load_config('index.yml')

with f:
f.index(input_fn)
f.index(inputs)
```

</td>
Expand Down
2 changes: 1 addition & 1 deletion .github/i18n/README.ja.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ from jina.flow import Flow
f = Flow.load_config('index.yml')

with f:
f.index(input_fn)
f.index(inputs)
```

</td>
Expand Down
2 changes: 1 addition & 1 deletion .github/i18n/README.ru.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ from jina.flow import Flow
f = Flow.load_config('index.yml')

with f:
f.index(input_fn)
f.index(inputs)
```

</td>
Expand Down
2 changes: 1 addition & 1 deletion .github/i18n/README.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ from jina.flow import Flow
f = Flow.load_config('index.yml')

with f:
f.index(input_fn)
f.index(inputs)
```

</td>
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -624,13 +624,13 @@ Unlike `Flow`, the CRUD of `AsyncFlow` accepts input and output functions as [as
```python
from jina import AsyncFlow

async def input_fn():
async def input_function():
for _ in range(10):
yield Document()
await asyncio.sleep(0.1)

with AsyncFlow().add() as f:
async for resp in f.index(input_fn):
async for resp in f.index(input_function):
print(resp)
```

Expand Down
62 changes: 31 additions & 31 deletions jina/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Iterable

from . import request
from .base import BaseClient, CallbackFnType, InputFnType
from .base import BaseClient, CallbackFnType, InputType
from .helper import callback_exec
from .request import GeneratorSourceType
from .websocket import WebSocketClientMixin
Expand All @@ -28,96 +28,96 @@ async def _get_results(self, *args, **kwargs):
if self.args.return_results:
return result

@deprecated_alias(buffer=('input_fn', 1), callback=('on_done', 1), output_fn=('on_done', 1))
def train(self, input_fn: InputFnType,
@deprecated_alias(input_fn=('inputs', 0), buffer=('inputs', 1), callback=('on_done', 1), output_fn=('on_done', 1))
def train(self, inputs: InputType,
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
"""Issue 'train' request to the Flow.
:param input_fn: the input function that generates the content
:param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document
:param on_done: the function to be called when the :class:`Request` object is resolved.
:param on_error: the function to be called when the :class:`Request` object is rejected.
:param on_always: the function to be called when the :class:`Request` object is is either resolved or rejected.
:param kwargs:
:return:
:param kwargs: additional parameters
:return: None
"""
self.mode = RequestType.TRAIN
return run_async(self._get_results, input_fn, on_done, on_error, on_always, **kwargs)
return run_async(self._get_results, inputs, on_done, on_error, on_always, **kwargs)

@deprecated_alias(buffer=('input_fn', 1), callback=('on_done', 1), output_fn=('on_done', 1))
def search(self, input_fn: InputFnType,
@deprecated_alias(input_fn=('inputs', 0), buffer=('inputs', 1), callback=('on_done', 1), output_fn=('on_done', 1))
def search(self, inputs: InputType,
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
"""Issue 'search' request to the Flow.
:param input_fn: the input function that generates the content
:param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document
:param on_done: the function to be called when the :class:`Request` object is resolved.
:param on_error: the function to be called when the :class:`Request` object is rejected.
:param on_always: the function to be called when the :class:`Request` object is is either resolved or rejected.
:param kwargs:
:return:
:param kwargs: additional parameters
:return: None
"""
self.mode = RequestType.SEARCH
self.add_default_kwargs(kwargs)
return run_async(self._get_results, input_fn, on_done, on_error, on_always, **kwargs)
return run_async(self._get_results, inputs, on_done, on_error, on_always, **kwargs)

@deprecated_alias(buffer=('input_fn', 1), callback=('on_done', 1), output_fn=('on_done', 1))
def index(self, input_fn: InputFnType,
@deprecated_alias(input_fn=('inputs', 0), buffer=('inputs', 1), callback=('on_done', 1), output_fn=('on_done', 1))
def index(self, inputs: InputType,
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
"""Issue 'index' request to the Flow.
:param input_fn: the input function that generates the content
:param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document
:param on_done: the function to be called when the :class:`Request` object is resolved.
:param on_error: the function to be called when the :class:`Request` object is rejected.
:param on_always: the function to be called when the :class:`Request` object is is either resolved or rejected.
:param kwargs:
:return:
:param kwargs: additional parameters
:return: None
"""
self.mode = RequestType.INDEX
return run_async(self._get_results, input_fn, on_done, on_error, on_always, **kwargs)
return run_async(self._get_results, inputs, on_done, on_error, on_always, **kwargs)

@deprecated_alias(buffer=('input_fn', 1), callback=('on_done', 1), output_fn=('on_done', 1))
def update(self, input_fn: InputFnType,
@deprecated_alias(input_fn=('inputs', 0), buffer=('inputs', 1), callback=('on_done', 1), output_fn=('on_done', 1))
def update(self, inputs: InputType,
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
"""Issue 'update' request to the Flow.
:param input_fn: the input function that generates the content
:param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document
:param on_done: the function to be called when the :class:`Request` object is resolved.
:param on_error: the function to be called when the :class:`Request` object is rejected.
:param on_always: the function to be called when the :class:`Request` object is is either resolved or rejected.
:param kwargs:
:return:
:param kwargs: additional parameters
:return: None
"""
self.mode = RequestType.UPDATE
return run_async(self._get_results, input_fn, on_done, on_error, on_always, **kwargs)
return run_async(self._get_results, inputs, on_done, on_error, on_always, **kwargs)

@deprecated_alias(buffer=('input_fn', 1), callback=('on_done', 1), output_fn=('on_done', 1))
def delete(self, input_fn: Iterable[str],
@deprecated_alias(input_fn=('inputs', 0), buffer=('inputs', 1), callback=('on_done', 1), output_fn=('on_done', 1))
def delete(self, inputs: Iterable[str],
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
"""Issue 'update' request to the Flow.
:param input_fn: the input function that generates the content
:param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document
:param on_done: the function to be called when the :class:`Request` object is resolved.
:param on_error: the function to be called when the :class:`Request` object is rejected.
:param on_always: the function to be called when the :class:`Request` object is is either resolved or rejected.
:param kwargs:
:return:
:param kwargs: additional parameters
:return: None
"""
self.mode = RequestType.DELETE
return run_async(self._get_results, input_fn, on_done, on_error, on_always, **kwargs)
return run_async(self._get_results, inputs, on_done, on_error, on_always, **kwargs)


class WebSocketClient(Client, WebSocketClientMixin):
Expand Down
62 changes: 31 additions & 31 deletions jina/clients/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Module wrapping AsyncIO ops for clients."""
from typing import Iterable

from .base import InputFnType, BaseClient, CallbackFnType
from .base import InputType, BaseClient, CallbackFnType
from .websocket import WebSocketClientMixin
from ..enums import RequestType
from ..helper import deprecated_alias
Expand Down Expand Up @@ -51,100 +51,100 @@ async def concurrent_main():
One can think of :class:`Client` as Jina-managed eventloop, whereas :class:`AsyncClient` is self-managed eventloop.
"""

@deprecated_alias(buffer=('input_fn', 1), callback=('on_done', 1), output_fn=('on_done', 1))
async def train(self, input_fn: InputFnType,
@deprecated_alias(input_fn=('inputs', 0), buffer=('inputs', 1), callback=('on_done', 1), output_fn=('on_done', 1))
async def train(self, inputs: InputType,
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
"""Issue 'train' request to the Flow.
:param input_fn: the input function that generates the content
:param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document
:param on_done: the function to be called when the :class:`Request` object is resolved.
:param on_error: the function to be called when the :class:`Request` object is rejected.
:param on_always: the function to be called when the :class:`Request` object is is either resolved or rejected.
:param kwargs:
:return:
:param kwargs: additional parameters
:yield: result
"""
self.mode = RequestType.TRAIN
async for r in self._get_results(input_fn, on_done, on_error, on_always, **kwargs):
async for r in self._get_results(inputs, on_done, on_error, on_always, **kwargs):
yield r

@deprecated_alias(buffer=('input_fn', 1), callback=('on_done', 1), output_fn=('on_done', 1))
async def search(self, input_fn: InputFnType,
@deprecated_alias(input_fn=('inputs', 0), buffer=('inputs', 1), callback=('on_done', 1), output_fn=('on_done', 1))
async def search(self, inputs: InputType,
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
"""Issue 'search' request to the Flow.
:param input_fn: the input function that generates the content
:param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document
:param on_done: the function to be called when the :class:`Request` object is resolved.
:param on_error: the function to be called when the :class:`Request` object is rejected.
:param on_always: the function to be called when the :class:`Request` object is is either resolved or rejected.
:param kwargs:
:return:
:param kwargs: additional parameters
:yield: result
"""
self.mode = RequestType.SEARCH
self.add_default_kwargs(kwargs)
async for r in self._get_results(input_fn, on_done, on_error, on_always, **kwargs):
async for r in self._get_results(inputs, on_done, on_error, on_always, **kwargs):
yield r

@deprecated_alias(buffer=('input_fn', 1), callback=('on_done', 1), output_fn=('on_done', 1))
async def index(self, input_fn: InputFnType,
@deprecated_alias(input_fn=('inputs', 0), buffer=('inputs', 1), callback=('on_done', 1), output_fn=('on_done', 1))
async def index(self, inputs: InputType,
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
"""Issue 'index' request to the Flow.
:param input_fn: the input function that generates the content
:param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document
:param on_done: the function to be called when the :class:`Request` object is resolved.
:param on_error: the function to be called when the :class:`Request` object is rejected.
:param on_always: the function to be called when the :class:`Request` object is is either resolved or rejected.
:param kwargs:
:return:
:param kwargs: additional parameters
:yield: result
"""
self.mode = RequestType.INDEX
async for r in self._get_results(input_fn, on_done, on_error, on_always, **kwargs):
async for r in self._get_results(inputs, on_done, on_error, on_always, **kwargs):
yield r

@deprecated_alias(buffer=('input_fn', 1), callback=('on_done', 1), output_fn=('on_done', 1))
async def delete(self, input_fn: Iterable[str],
@deprecated_alias(input_fn=('inputs', 0), buffer=('inputs', 1), callback=('on_done', 1), output_fn=('on_done', 1))
async def delete(self, inputs: Iterable[str],
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
"""Issue 'delete' request to the Flow.
:param input_fn: the input function that generates the content
:param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document
:param on_done: the function to be called when the :class:`Request` object is resolved.
:param on_error: the function to be called when the :class:`Request` object is rejected.
:param on_always: the function to be called when the :class:`Request` object is is either resolved or rejected.
:param kwargs:
:return:
:param kwargs: additional parameters
:yield: result
"""
self.mode = RequestType.DELETE
async for r in self._get_results(input_fn, on_done, on_error, on_always, **kwargs):
async for r in self._get_results(inputs, on_done, on_error, on_always, **kwargs):
yield r

@deprecated_alias(buffer=('input_fn', 1), callback=('on_done', 1), output_fn=('on_done', 1))
async def update(self, input_fn: InputFnType,
@deprecated_alias(input_fn=('inputs', 0), buffer=('inputs', 1), callback=('on_done', 1), output_fn=('on_done', 1))
async def update(self, inputs: InputType,
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
"""Issue 'update' request to the Flow.
:param input_fn: the input function that generates the content
:param inputs: input data which can be an Iterable, a function which returns an Iterable, or a single Document
:param on_done: the function to be called when the :class:`Request` object is resolved.
:param on_error: the function to be called when the :class:`Request` object is rejected.
:param on_always: the function to be called when the :class:`Request` object is is either resolved or rejected.
:param kwargs:
:return:
:param kwargs: additional parameters
:yield: result
"""
self.mode = RequestType.UPDATE
async for r in self._get_results(input_fn, on_done, on_error, on_always, **kwargs):
async for r in self._get_results(inputs, on_done, on_error, on_always, **kwargs):
yield r


Expand Down

0 comments on commit c2e896a

Please sign in to comment.