Skip to content

Commit

Permalink
feat(gateway): websocket based streaming and client (#1608)
Browse files Browse the repository at this point in the history
* feat(gateway): websocket streaming in gateway server

* feat(gateway): websocket streaming client

* feat(gateway): enabling flow with websocket client

* feat(gateway): enabling flow with websocket client

* test(restful): added restful to flow unit tests

* test(restful): added restful to eval flow integration tests

* test(restful): added restful to evaluation integration tests

* test(restful): added restful to gateway non blocking integration tests

* test(restful): added restful to other integration tests

* ci: change tests timeout to 40 mins

* fix(gateway): allow all encodings

* fix(gateway): send response based on received encodings

* style: fix coding style

* style: fix coding style

* style: fix coding style

* style: fix coding style

* style: fix coding style

* fix(indexer): fix indexer keys assert

* style: fix coding style

Co-authored-by: Han Xiao <artex.xh@gmail.com>
  • Loading branch information
deepankarm and hanxiao committed Jan 9, 2021
1 parent d6d14f2 commit e5df803
Show file tree
Hide file tree
Showing 57 changed files with 773 additions and 292 deletions.
19 changes: 14 additions & 5 deletions .github/workflows/cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ jobs:
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
- name: Run test
- name: Prepare enviroment
run: |
docker login docker.pkg.github.com -u $GITHUB_ACTOR -p $GITHUB_TOKEN
docker pull docker.pkg.github.com/jina-ai/jina/jina:test-pip
Expand All @@ -171,6 +171,10 @@ jobs:
pip install ".[cicd,test]" --no-cache-dir
jina check
export JINA_LOG_VERBOSITY="ERROR"
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Test
run: |
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml -n 1 --timeout=120 -v --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/${{ matrix.test-path }}
timeout-minutes: 20
env:
Expand Down Expand Up @@ -207,20 +211,25 @@ jobs:
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
- name: Prepare enviroment
run: |
docker login docker.pkg.github.com -u $GITHUB_ACTOR -p $GITHUB_TOKEN
docker pull docker.pkg.github.com/jina-ai/jina/jina:test-pip
docker tag docker.pkg.github.com/jina-ai/jina/jina:test-pip jinaai/jina:test-pip
python -m pip install --upgrade pip
pip install ".[cicd,daemon,test]" --no-cache-dir
docker build --build-arg PIP_TAG="[devel]" -f Dockerfiles/pip.Dockerfile -t jinaai/jina:test-pip .
pip install ".[cicd,test,daemon]" --no-cache-dir
jina check
export JINA_LOG_VERBOSITY="ERROR"
pytest --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml -n 1 --timeout=120 -v tests/jinad/${{ matrix.test-path }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Test
run: |
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml -n 1 --timeout=120 -v tests/jinad/${{ matrix.test-path }}
timeout-minutes: 20
env:
JINAHUB_USERNAME: ${{ secrets.JINAHUB_USERNAME }}
JINAHUB_PASSWORD: ${{ secrets.JINAHUB_PASSWORD }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Upload coverage from test to Codecov
uses: codecov/codecov-action@v1
if: ${{ matrix.python-version }} == 3.7
Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ jobs:
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
- name: Prepare enviroment
run: |
docker login docker.pkg.github.com -u $GITHUB_ACTOR -p $GITHUB_TOKEN
docker pull docker.pkg.github.com/jina-ai/jina/jina:test-pip
Expand All @@ -134,13 +134,17 @@ jobs:
pip install ".[cicd,test]" --no-cache-dir
jina check
export JINA_LOG_VERBOSITY="ERROR"
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Test
run: |
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml -n 1 --timeout=120 -v --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/${{ matrix.test-path }}
timeout-minutes: 20
env:
JINAHUB_USERNAME: ${{ secrets.JINAHUB_USERNAME }}
JINAHUB_PASSWORD: ${{ secrets.JINAHUB_PASSWORD }}
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Check file existence
- name: Check codecov file
id: check_files
uses: andstor/file-existence-action@v1
with:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/latency-tracking.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
cd latency
docker build --build-arg JINA_VER=master . -t latency-tracking
docker run -v $(pwd)/output:/workspace/output -v $(pwd)/original:/workspace/original latency-tracking
bash batch.sh 3
bash batch.sh 2
pip install prettytable
python ppstat.py > comment.txt
- id: get-comment-body
Expand Down
2 changes: 1 addition & 1 deletion daemon/api/endpoints/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def __init__(self):
detail=f'Invalid yaml file.')
except FlowStartException as e:
raise HTTPException(status_code=404,
detail=f'Flow couldn\'t get started: {repr(e)}')
detail=f'Flow couldn\'t get started: {e!r}')

return {
'status_code': status.HTTP_200_OK,
Expand Down
4 changes: 2 additions & 2 deletions daemon/api/endpoints/pea.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ async def _create(
pea_id = pea_store._create(pea_arguments=pea_arguments)
except PeaStartException as e:
raise HTTPException(status_code=404,
detail=f'Pea couldn\'t get started: {repr(e)}')
detail=f'Pea couldn\'t get started: {e!r}')
except Exception as e:
daemon_logger.error(f'Got an error while creating a pea {repr(e)}')
daemon_logger.error(f'Got an error while creating a pea {e!r}')
raise HTTPException(status_code=404,
detail=f'Something went wrong')
return {
Expand Down
4 changes: 2 additions & 2 deletions daemon/api/endpoints/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ async def _create(
pod_id = pod_store._create(pod_arguments=pod_arguments)
except PodStartException as e:
raise HTTPException(status_code=404,
detail=f'Pod couldn\'t get started: {repr(e)}')
detail=f'Pod couldn\'t get started: {e!r}')
except Exception as e:
daemon_logger.error(f'Got an error while creating a pod {repr(e)}')
daemon_logger.error(f'Got an error while creating a pod {e!r}')
raise HTTPException(status_code=404,
detail=f'Something went wrong')
return {
Expand Down
10 changes: 5 additions & 5 deletions daemon/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ def _create(self,
JAML.register(Flow)
flow = JAML.load(yamlspec)
except Exception as e:
self.logger.error(f'Got error while loading from yaml {repr(e)}')
self.logger.error(f'Got error while loading from yaml {e!r}')
raise FlowYamlParseException
elif isinstance(config, list):
try:
flow = self._build_with_pods(pod_args=config)
except Exception as e:
self.logger.error(f'Got error while creating flows via pods: {repr(e)}')
self.logger.error(f'Got error while creating flows via pods: {e!r}')
raise FlowCreationException
else:
raise FlowBadInputException(f'Not valid Flow config input {type(config)}')
Expand All @@ -97,7 +97,7 @@ def _create(self,
flow_id = uuid.UUID(flow.args.log_id)
flow = self._start(context=flow)
except Exception as e:
self.logger.critical(f'Got following error while starting the flow: {repr(e)}')
self.logger.critical(f'Got following error while starting the flow: {e!r}')
raise FlowStartException(repr(e))

self._store[flow_id] = {}
Expand Down Expand Up @@ -160,7 +160,7 @@ def _create(self, pod_arguments: Union[Dict, Namespace]):
pod = Pod(pod_arguments)
pod = self._start(context=pod)
except Exception as e:
self.logger.critical(f'Got following error while starting the pod: {repr(e)}')
self.logger.critical(f'Got following error while starting the pod: {e!r}')
raise PodStartException(repr(e))

self._store[pod_id] = {}
Expand Down Expand Up @@ -195,7 +195,7 @@ def _create(self, pea_arguments: Union[Dict, Namespace]):
pea = Pea(pea_arguments)
pea = self._start(context=pea)
except Exception as e:
self.logger.critical(f'Got following error while starting the pea: {repr(e)}')
self.logger.critical(f'Got following error while starting the pea: {e!r}')
raise PeaStartException(repr(e))

self._store[pea_id] = {}
Expand Down
3 changes: 2 additions & 1 deletion extra-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ mock: test
requests: http, devel, test, daemon
prettytable: devel, test
sseclient-py: test
websockets: http, devel, test, daemon
websockets: http, devel, test, daemon, ws
wsproto: http, devel, test, ws, daemon
pydantic: http, devel, test, daemon
python-multipart: http, devel, test, daemon
pytest-custom_exit_code: cicd, test
64 changes: 53 additions & 11 deletions jina/clients/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@

from . import request
from .base import BaseClient, CallbackFnType, InputFnType
from .websockets import WebSocketClientMixin
from .helper import callback_exec
from .request import GeneratorSourceType
from ..enums import RequestType
from ..helper import run_async, deprecated_alias


class Client(BaseClient):
"""A simple Python client for connecting to the gateway.
It manges the asyncio eventloop internally, so all interfaces are synchronous from the outside.
"""A simple Python client for connecting to the gRPC gateway.
It manages the asyncio eventloop internally, so all interfaces are synchronous from the outside.
"""

@deprecated_alias(buffer='input_fn', callback='on_done', output_fn='on_done')
Expand Down Expand Up @@ -70,10 +71,10 @@ def index(self, input_fn: InputFnType = None,

@deprecated_alias(buffer='input_fn', callback='on_done', output_fn='on_done')
def update(self, input_fn: InputFnType = None,
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
"""
:param input_fn: the input function that generates the content
Expand All @@ -88,10 +89,10 @@ def update(self, input_fn: InputFnType = None,

@deprecated_alias(buffer='input_fn', callback='on_done', output_fn='on_done')
def delete(self, input_fn: InputFnType = None,
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
on_done: CallbackFnType = None,
on_error: CallbackFnType = None,
on_always: CallbackFnType = None,
**kwargs) -> None:
"""
:param input_fn: the input function that generates the content
Expand All @@ -102,4 +103,45 @@ def delete(self, input_fn: InputFnType = None,
:return:
"""
self.mode = RequestType.DELETE
return run_async(self._get_results, input_fn, on_done, on_error, on_always, **kwargs)
return run_async(self._get_results, input_fn, on_done, on_error, on_always, **kwargs)


class WebSocketClient(Client, WebSocketClientMixin):
"""A Python Client to stream requests from a Flow with a RESTGateway
:class:`WebSocketClient` shares the same interface as :class:`Client` and provides methods like
:meth:`index`, "meth:`search`, :meth:`train`, :meth:`update` & :meth:`delete`.
It is used by default while running operations when we create a `Flow` with `rest_api=True`
.. highlight:: python
.. code-block:: python
from jina.flow import Flow
f = Flow(rest_api=True).add().add()
with f:
f.index(['abc'])
:class:`WebSocketClient` can also be used to run operations for a remote Flow
.. highlight:: python
.. code-block:: python
# A Flow running on remote
from jina.flow import Flow
f = Flow(rest_api=True, port_expose=34567).add().add()
with f:
f.block()
# Local WebSocketClient running index & search
from jina.clients import WebSocketClient
client = WebSocketClient(...)
client.index(...)
client.search(...)
:class:`WebSocketClient` internally handles an event loop to run operations asynchronously.
"""
44 changes: 44 additions & 0 deletions jina/clients/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .base import InputFnType, BaseClient, CallbackFnType
from .websockets import WebSocketClientMixin
from ..enums import RequestType
from ..helper import deprecated_alias

Expand Down Expand Up @@ -97,3 +98,46 @@ async def index(self, input_fn: InputFnType = None,
"""
self.mode = RequestType.INDEX
return await self._get_results(input_fn, on_done, on_error, on_always, **kwargs)


class AsyncWebSocketClient(AsyncClient, WebSocketClientMixin):
"""
:class:`AsyncWebSocketClient` is the asynchronous version of the :class:`WebSocketClient`.
They share the same interface, except in :class:`AsyncWebSocketClient` :meth:`train`, :meth:`index`, :meth:`search`
methods are coroutines (i.e. declared with the async/await syntax), simply calling them will not schedule them to be executed.
To actually run a coroutine, user need to put them in an eventloop, e.g. via ``asyncio.run()``,
``asyncio.create_task()``.
:class:`AsyncWebSocketClient` can be very useful in the integration settings, where Jina/Flow/Client is NOT the
main logic, but rather served as a part of other program. In this case, users often do not want to let Jina control
the ``asyncio.eventloop``. On contrary, :class:`WebSocketClient` is controlling and wrapping the eventloop
internally, making the Client looks synchronous from outside.
For example, say you have the Flow running in remote. You want to use Client to connect to it do
some index and search, but meanwhile you have some other IO-bounded jobs and want to do them concurrently.
You can use :class:`AsyncWebSocketClient`,
.. highlight:: python
.. code-block:: python
from jina.clients.asyncio import AsyncWebSocketClient
ac = AsyncWebSocketClient(...)
async def jina_client_query():
await ac.search(...)
async def heavylifting():
await other_library.download_big_files(...)
async def concurrent_main():
await asyncio.gather(jina_client_query(), heavylifting())
if __name__ == '__main__':
# under python
asyncio.run(concurrent_main())
One can think of :class:`WebSocketClient` as Jina-managed eventloop,
whereas :class:`AsyncWebSocketClient` is self-managed eventloop.
"""
3 changes: 2 additions & 1 deletion jina/clients/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ def input_fn(self, bytes_gen: InputFnType) -> None:
else:
self._input_fn = bytes_gen

async def _get_results(self, input_fn: Callable,
async def _get_results(self,
input_fn: Callable,
on_done: Callable,
on_error: Callable = None,
on_always: Callable = None, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion jina/clients/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def arg_wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as ex:
err_msg = f'uncaught exception in callback {func.__name__}(): {repr(ex)}'
err_msg = f'uncaught exception in callback {func.__name__}(): {ex!r}'
if continue_on_error:
logger.error(err_msg)
else:
Expand Down
2 changes: 1 addition & 1 deletion jina/clients/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _generate(data: GeneratorSourceType,
yield req
except Exception as ex:
# must be handled here, as grpc channel wont handle Python exception
default_logger.critical(f'input_fn is not valid! {repr(ex)}', exc_info=True)
default_logger.critical(f'input_fn is not valid! {ex!r}', exc_info=True)


def index(*args, **kwargs):
Expand Down
Loading

0 comments on commit e5df803

Please sign in to comment.