Skip to content

Commit

Permalink
feat: support provider endpoint in jina executor (#6149)
Browse files Browse the repository at this point in the history
Co-authored-by: Joan Martinez <joan.fontanals.martinez@jina.ai>
Co-authored-by: Jina Dev Bot <dev-bot@jina.ai>
  • Loading branch information
3 people authored Mar 8, 2024
1 parent 69bf8f6 commit 176f953
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 6 deletions.
2 changes: 2 additions & 0 deletions jina/orchestrate/deployments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def __init__(
prefer_platform: Optional[str] = None,
protocol: Optional[Union[str, List[str]]] = ['GRPC'],
provider: Optional[str] = ['NONE'],
provider_endpoint: Optional[str] = None,
py_modules: Optional[List[str]] = None,
quiet: Optional[bool] = False,
quiet_error: Optional[bool] = False,
Expand Down Expand Up @@ -387,6 +388,7 @@ def __init__(
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param py_modules: The customized python modules need to be imported before loading the executor
Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down
9 changes: 9 additions & 0 deletions jina/orchestrate/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def __init__(
prefetch: Optional[int] = 1000,
protocol: Optional[Union[str, List[str]]] = ['GRPC'],
provider: Optional[str] = ['NONE'],
provider_endpoint: Optional[str] = None,
proxy: Optional[bool] = False,
py_modules: Optional[List[str]] = None,
quiet: Optional[bool] = False,
Expand Down Expand Up @@ -274,6 +275,7 @@ def __init__(
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway
Expand Down Expand Up @@ -465,6 +467,7 @@ def __init__(
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway
Expand Down Expand Up @@ -872,6 +875,7 @@ def add(
prefer_platform: Optional[str] = None,
protocol: Optional[Union[str, List[str]]] = ['GRPC'],
provider: Optional[str] = ['NONE'],
provider_endpoint: Optional[str] = None,
py_modules: Optional[List[str]] = None,
quiet: Optional[bool] = False,
quiet_error: Optional[bool] = False,
Expand Down Expand Up @@ -972,6 +976,7 @@ def add(
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param py_modules: The customized python modules need to be imported before loading the executor
Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down Expand Up @@ -1135,6 +1140,7 @@ def add(
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param py_modules: The customized python modules need to be imported before loading the executor
Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down Expand Up @@ -1330,6 +1336,7 @@ def config_gateway(
prefetch: Optional[int] = 1000,
protocol: Optional[Union[str, List[str]]] = ['GRPC'],
provider: Optional[str] = ['NONE'],
provider_endpoint: Optional[str] = None,
proxy: Optional[bool] = False,
py_modules: Optional[List[str]] = None,
quiet: Optional[bool] = False,
Expand Down Expand Up @@ -1401,6 +1408,7 @@ def config_gateway(
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway
Expand Down Expand Up @@ -1501,6 +1509,7 @@ def config_gateway(
Used to control the speed of data input into a Flow. 0 disables prefetch (1000 requests is the default)
:param protocol: Communication protocol of the server exposed by the Gateway. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param proxy: If set, respect the http_proxy and https_proxy environment variables. otherwise, it will unset these proxy variables before start. gRPC seems to prefer no proxy
:param py_modules: The customized python modules need to be imported before loading the gateway
Expand Down
7 changes: 7 additions & 0 deletions jina/parsers/orchestrate/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,13 @@ def mixin_pod_runtime_args_parser(arg_group, pod_type='worker'):
help=f'If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: {[provider.to_string() for provider in list(ProviderType)]}.',
)

arg_group.add_argument(
'--provider-endpoint',
type=str,
default=None,
help=f'If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.',
)

arg_group.add_argument(
'--monitoring',
action='store_true',
Expand Down
22 changes: 16 additions & 6 deletions jina/serve/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -622,17 +622,25 @@ def _validate_sagemaker(self):
if '/invocations' in self.requests:
return

if (
hasattr(self.runtime_args, 'provider_endpoint')
and self.runtime_args.provider_endpoint
):
endpoint_to_use = ('/' + self.runtime_args.provider_endpoint).lower()
if endpoint_to_use in list(self.requests.keys()):
self.logger.warning(
f'Using "{endpoint_to_use}" as "/invocations" route'
)
self.requests['/invocations'] = self.requests[endpoint_to_use]
return

if len(self.requests) == 1:
route = list(self.requests.keys())[0]
self.logger.warning(
f'No "/invocations" route found. Using "{route}" as "/invocations" route'
)
self.logger.warning(f'Using "{route}" as "/invocations" route')
self.requests['/invocations'] = self.requests[route]
return

raise ValueError(
'No "/invocations" route found. Please define a "/invocations" route'
)
raise ValueError('Cannot identify the endpoint to use for "/invocations"')

def _add_dynamic_batching(self, _dynamic_batching: Optional[Dict]):
if _dynamic_batching:
Expand Down Expand Up @@ -994,6 +1002,7 @@ def serve(
prefer_platform: Optional[str] = None,
protocol: Optional[Union[str, List[str]]] = ['GRPC'],
provider: Optional[str] = ['NONE'],
provider_endpoint: Optional[str] = None,
py_modules: Optional[List[str]] = None,
quiet: Optional[bool] = False,
quiet_error: Optional[bool] = False,
Expand Down Expand Up @@ -1094,6 +1103,7 @@ def serve(
:param prefer_platform: The preferred target Docker platform. (e.g. "linux/amd64", "linux/arm64")
:param protocol: Communication protocol of the server exposed by the Executor. This can be a single value or a list of protocols, depending on your chosen Gateway. Choose the convenient protocols from: ['GRPC', 'HTTP', 'WEBSOCKET'].
:param provider: If set, Executor is translated to a custom container compatible with the chosen provider. Choose the convenient providers from: ['NONE', 'SAGEMAKER'].
:param provider_endpoint: If set, Executor endpoint will be explicitly chosen and used in the custom container operated by the provider.
:param py_modules: The customized python modules need to be imported before loading the executor
Note that the recommended way is to only import a single module - a simple python file, if your
Expand Down
1 change: 1 addition & 0 deletions jina/serve/runtimes/worker/request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ def _load_executor(
'replicas': self.args.replicas,
'name': self.args.name,
'provider': self.args.provider,
'provider_endpoint': self.args.provider_endpoint,
'metrics_registry': metrics_registry,
'tracer_provider': tracer_provider,
'meter_provider': meter_provider,
Expand Down
4 changes: 4 additions & 0 deletions jina_cli/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
'--protocol',
'--protocols',
'--provider',
'--provider-endpoint',
'--monitoring',
'--port-monitoring',
'--retries',
Expand Down Expand Up @@ -180,6 +181,7 @@
'--protocol',
'--protocols',
'--provider',
'--provider-endpoint',
'--monitoring',
'--port-monitoring',
'--retries',
Expand Down Expand Up @@ -438,6 +440,7 @@
'--protocol',
'--protocols',
'--provider',
'--provider-endpoint',
'--monitoring',
'--port-monitoring',
'--retries',
Expand Down Expand Up @@ -512,6 +515,7 @@
'--protocol',
'--protocols',
'--provider',
'--provider-endpoint',
'--monitoring',
'--port-monitoring',
'--retries',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# SampleColbertExecutor

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
jtype: SampleColbertExecutor
py_modules:
- executor.py
metas:
name: SampleColbertExecutor
description:
url:
keywords: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import numpy as np
from docarray import BaseDoc, DocList
from docarray.typing import NdArray
from pydantic import Field
from typing import Union, Optional, List
from jina import Executor, requests


class TextDoc(BaseDoc):
text: str = Field(description="The text of the document", default="")


class RerankerInput(BaseDoc):
query: Union[str, TextDoc]

documents: List[TextDoc]

top_n: Optional[int]


class RankedObjectOutput(BaseDoc):
index: int
document: Optional[TextDoc]

relevance_score: float


class EmbeddingResponseModel(TextDoc):
embeddings: NdArray


class RankedOutput(BaseDoc):
results: DocList[RankedObjectOutput]


class SampleColbertExecutor(Executor):
@requests(on="/rank")
def foo(self, docs: DocList[RerankerInput], **kwargs) -> DocList[RankedOutput]:
ret = []
for doc in docs:
ret.append(
RankedOutput(
results=[
RankedObjectOutput(
id=doc.id,
index=0,
document=TextDoc(text="first result"),
relevance_score=-1,
),
RankedObjectOutput(
id=doc.id,
index=1,
document=TextDoc(text="second result"),
relevance_score=-2,
),
]
)
)
return DocList[RankedOutput](ret)

@requests(on="/encode")
def bar(self, docs: DocList[TextDoc], **kwargs) -> DocList[EmbeddingResponseModel]:
ret = []
for doc in docs:
ret.append(
EmbeddingResponseModel(
id=doc.id,
text=doc.text,
embeddings=np.random.random((1, 64)),
)
)
return DocList[EmbeddingResponseModel](ret)
Empty file.
86 changes: 86 additions & 0 deletions tests/integration/docarray_v2/sagemaker/test_colbert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import csv
import io
import os

import requests
from jina.orchestrate.pods import Pod
from jina.parsers import set_pod_parser

sagemaker_port = 8080


def test_provider_sagemaker_pod_rank():
args, _ = set_pod_parser().parse_known_args(
[
"--uses",
os.path.join(
os.path.dirname(__file__), "SampleColbertExecutor", "config.yml"
),
"--provider",
"sagemaker",
"--provider-endpoint",
"rank",
"serve", # This is added by sagemaker
]
)
with Pod(args):
# Test the `GET /ping` endpoint (added by jina for sagemaker)
resp = requests.get(f"http://localhost:{sagemaker_port}/ping")
assert resp.status_code == 200
assert resp.json() == {}

# Test the `POST /invocations` endpoint for inference
# Note: this endpoint is not implemented in the sample executor
resp = requests.post(
f"http://localhost:{sagemaker_port}/invocations",
json={
"data": {
"documents": [
{"text": "the dog is in the house"},
{"text": "hey Peter"},
],
"query": "where is the dog",
"top_n": 2,
}
},
)
assert resp.status_code == 200
resp_json = resp.json()
assert len(resp_json["data"]) == 1
assert resp_json["data"][0]["results"][0]["document"]["text"] == "first result"


def test_provider_sagemaker_pod_encode():
args, _ = set_pod_parser().parse_known_args(
[
"--uses",
os.path.join(
os.path.dirname(__file__), "SampleColbertExecutor", "config.yml"
),
"--provider",
"sagemaker",
"--provider-endpoint",
"encode",
"serve", # This is added by sagemaker
]
)
with Pod(args):
# Test the `GET /ping` endpoint (added by jina for sagemaker)
resp = requests.get(f"http://localhost:{sagemaker_port}/ping")
assert resp.status_code == 200
assert resp.json() == {}

# Test the `POST /invocations` endpoint for inference
# Note: this endpoint is not implemented in the sample executor
resp = requests.post(
f"http://localhost:{sagemaker_port}/invocations",
json={
"data": [
{"text": "hello world"},
]
},
)
assert resp.status_code == 200
resp_json = resp.json()
assert len(resp_json["data"]) == 1
assert len(resp_json["data"][0]["embeddings"][0]) == 64

0 comments on commit 176f953

Please sign in to comment.