Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pass request-headers as metadata #6126

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion jina/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime as _datetime
import os as _os
import sys as _sys
from pathlib import Path as _Path
import datetime as _datetime

__windows__ = _sys.platform == 'win32'
__uptime__ = _datetime.datetime.now().isoformat()
Expand Down Expand Up @@ -53,6 +53,7 @@
__args_executor_func__ = {
'docs',
'parameters',
'headers',
'docs_matrix',
}
__args_executor_init__ = {'metas', 'requests', 'runtime_args'}
Expand Down
5 changes: 4 additions & 1 deletion jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,10 @@
)
app_kwargs['response_class'] = DocArrayResponse

from fastapi import Request

Check warning on line 190 in jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py#L190

Added line #L190 was not covered by tests

@app.api_route(**app_kwargs)
async def post(body: input_model, response: Response):
async def post(body: input_model, response: Response, request: Request):

Check warning on line 193 in jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/http_fastapi_app_docarrayv2.py#L193

Added line #L193 was not covered by tests
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this means the header will only be available when gateway is enabled?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway - having access to HTTP header is very important for our production application - we are an infra team supporting different businesses, and we use headers to communicate extra information like auth, tracing id, etc.

@JoanFM

Copy link
Contributor Author

@NarekA NarekA Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should work for standalone-fast-api deployments too. See this change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hey @cenhao ,

The feature should be available with and without gateway.

Btw, would be cool to know about ur usecase and how u use/plan to use Jina

target_executor = None
req_id = None
if body.header is not None:
Expand All @@ -208,6 +210,7 @@
docs,
exec_endpoint=endpoint_path,
parameters=body.parameters,
headers=request.headers,
target_executor=target_executor,
request_id=req_id,
return_results=True,
Expand Down
18 changes: 18 additions & 0 deletions jina/serve/runtimes/gateway/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
AsyncIterator,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -209,6 +210,7 @@
exec_endpoint: Optional[str] = None,
target_executor: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
results_in_order: bool = False,
return_type: Type[DocumentArray] = DocumentArray,
) -> AsyncIterator[Tuple[Union[DocumentArray, 'Request'], 'ExecutorError']]:
Expand All @@ -221,6 +223,7 @@
:param exec_endpoint: The Executor endpoint to which to send the Documents
:param target_executor: A regex expression indicating the Executors that should receive the Request
:param parameters: Parameters to be attached to the Requests
:param headers: Http request headers
:param results_in_order: return the results in the same order as the request_iterator
:param return_type: the DocumentArray type to be returned. By default, it is `DocumentArray`.
:yield: tuple of Documents or Responses and unpacked error from Executors if any
Expand All @@ -232,6 +235,7 @@
exec_endpoint=exec_endpoint,
target_executor=target_executor,
parameters=parameters,
headers=headers,
results_in_order=results_in_order,
return_type=return_type,
):
Expand All @@ -256,6 +260,7 @@
exec_endpoint: Optional[str] = None,
target_executor: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
request_id: Optional[str] = None,
return_type: Type[DocumentArray] = DocumentArray,
) -> AsyncIterator[Tuple[Union[DocumentArray, 'Request'], 'ExecutorError']]:
Expand All @@ -267,6 +272,7 @@
:param exec_endpoint: The Executor endpoint to which to send the Documents
:param target_executor: A regex expression indicating the Executors that should receive the Request
:param parameters: Parameters to be attached to the Requests
:param headers: Http request headers
:param request_id: Request ID to add to the request streamed to Executor. Only applicable if request_size is equal or less to the length of the docs
:param return_type: the DocumentArray type to be returned. By default, it is `DocumentArray`.
:yield: tuple of Documents or Responses and unpacked error from Executors if any
Expand All @@ -282,6 +288,8 @@
req.header.target_executor = target_executor
if parameters:
req.parameters = parameters
if headers:
req.headers = headers

Check warning on line 292 in jina/serve/runtimes/gateway/streamer.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/streamer.py#L291-L292

Added lines #L291 - L292 were not covered by tests

async for result in self.rpc_stream_doc(request=req, return_type=return_type):
error = None
Expand All @@ -306,6 +314,7 @@
exec_endpoint: Optional[str] = None,
target_executor: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
results_in_order: bool = False,
request_id: Optional[str] = None,
return_type: Type[DocumentArray] = DocumentArray,
Expand All @@ -319,6 +328,7 @@
:param exec_endpoint: The Executor endpoint to which to send the Documents
:param target_executor: A regex expression indicating the Executors that should receive the Request
:param parameters: Parameters to be attached to the Requests
:param headers: Http request headers
:param results_in_order: return the results in the same order as the request_iterator
:param request_id: Request ID to add to the request streamed to Executor. Only applicable if request_size is equal or less to the length of the docs
:param return_type: the DocumentArray type to be returned. By default, it is `DocumentArray`.
Expand All @@ -339,6 +349,8 @@
req.header.target_executor = target_executor
if parameters:
req.parameters = parameters
if headers:
req.headers = headers

Check warning on line 353 in jina/serve/runtimes/gateway/streamer.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/streamer.py#L352-L353

Added lines #L352 - L353 were not covered by tests
yield req
else:
from docarray import BaseDoc
Expand All @@ -361,6 +373,8 @@
req.header.target_executor = target_executor
if parameters:
req.parameters = parameters
if headers:
req.headers = headers

Check warning on line 377 in jina/serve/runtimes/gateway/streamer.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/streamer.py#L376-L377

Added lines #L376 - L377 were not covered by tests
yield req
else:
req = DataRequest()
Expand All @@ -374,6 +388,8 @@
req.header.target_executor = target_executor
if parameters:
req.parameters = parameters
if headers:
req.headers = headers

Check warning on line 392 in jina/serve/runtimes/gateway/streamer.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/gateway/streamer.py#L391-L392

Added lines #L391 - L392 were not covered by tests
yield req

async for resp in self.rpc_stream(
Expand Down Expand Up @@ -438,6 +454,7 @@
request_size: int = 100,
on: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
return_type: Type[DocumentArray] = DocumentArray,
**kwargs,
):
Expand Down Expand Up @@ -505,6 +522,7 @@
inputs: 'Document',
on: Optional[str] = None,
parameters: Optional[Dict] = None,
headers: Optional[Mapping[str, str]] = None,
**kwargs,
):
req: SingleDocumentRequest = SingleDocumentRequest(inputs.to_protobuf())
Expand Down
6 changes: 5 additions & 1 deletion jina/serve/runtimes/worker/http_fastapi_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,18 @@

app_kwargs['response_class'] = DocArrayResponse

from fastapi import Request

Check warning on line 89 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L89

Added line #L89 was not covered by tests

@app.api_route(**app_kwargs)
async def post(body: input_model, response: Response):
async def post(body: input_model, response: Response, request: Request):

Check warning on line 92 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L92

Added line #L92 was not covered by tests

req = DataRequest()
if body.header is not None:
req.header.request_id = body.header.request_id

if body.parameters is not None:
req.parameters = body.parameters
req.headers = request.headers

Check warning on line 100 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L100

Added line #L100 was not covered by tests
req.header.exec_endpoint = endpoint_path
data = body.data
if isinstance(data, list):
Expand Down Expand Up @@ -149,6 +152,7 @@
body = Document.from_pydantic_model(body)
req = DataRequest()
req.header.exec_endpoint = endpoint_path
req.headers = request.headers

Check warning on line 155 in jina/serve/runtimes/worker/http_fastapi_app.py

View check run for this annotation

Codecov / codecov/patch

jina/serve/runtimes/worker/http_fastapi_app.py#L155

Added line #L155 was not covered by tests
if not docarray_v2:
req.data.docs = DocumentArray([body])
else:
Expand Down
Loading
Loading