Skip to content

Commit

Permalink
Jobergum/streaming mode (#629)
Browse files Browse the repository at this point in the history
* support streaming mode with embedders

* Add support for streaming mode

* wrong import

* too much autocomplete
  • Loading branch information
jobergum committed Nov 12, 2023
1 parent 5b1ee4e commit dd409f5
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 27 deletions.
79 changes: 53 additions & 26 deletions vespa/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,26 +264,27 @@ def get_model_endpoint(self, model_id: Optional[str] = None) -> Optional[Respons

def query(
self,
body: Optional[Dict] = None, **kwargs
body: Optional[Dict] = None, groupname:str=None, **kwargs
) -> VespaQueryResponse:
"""
Send a query request to the Vespa application.
Send 'body' containing all the request parameters.
:param body: Dict containing request parameters.
:param groupname: The groupname used with streaming search.
param kwargs: Extra Vespa Query API parameters.
:return: The response from the Vespa application.
"""
#Use one connection as this is a single query
with VespaSync(self,pool_maxsize=1, pool_connections=1) as sync_app:
return sync_app.query(
body=body, **kwargs
body=body, groupname=groupname, **kwargs
)


def feed_data_point(
self, schema: str, data_id: str, fields: Dict, namespace: str = None, **kwargs
self, schema: str, data_id: str, fields: Dict, namespace: str = None, groupname:str = None, **kwargs
) -> VespaResponse:
"""
Feed a data point to a Vespa app. Will create a new VespaSync with
Expand All @@ -294,6 +295,7 @@ def feed_data_point(
:param data_id: Unique id associated with this data point.
:param fields: Dict containing all the fields required by the `schema`.
:param namespace: The namespace that we are sending data to.
:param groupname: The groupname that we are sending data
:return: VespaResponse of the HTTP POST request.
"""
if not namespace:
Expand All @@ -302,7 +304,7 @@ def feed_data_point(
# single data point
with VespaSync(app=self, pool_connections=1,pool_maxsize=1) as sync_app:
return sync_app.feed_data_point(
schema=schema, data_id=data_id, fields=fields, namespace=namespace, **kwargs
schema=schema, data_id=data_id, fields=fields, namespace=namespace, groupname=groupname, **kwargs
)

def feed_iterable(self,
Expand Down Expand Up @@ -394,15 +396,21 @@ def _submit(doc:dict, sync_session:VespaSync) -> Tuple[str, Union[VespaResponse,
return id, VespaResponse(status_code=499,
json={"id":id, "message":"Missing fields in input dict"},
url="n/a", operation_type=operation_type)
groupname = doc.get("groupname", None)
try:
if operation_type == "feed":
response:VespaResponse = sync_session.feed_data_point(schema=schema, namespace=namespace, data_id=id, fields=fields, **kwargs)
response:VespaResponse = sync_session.feed_data_point(
schema=schema, namespace=namespace,
groupname=groupname, data_id=id, fields=fields, **kwargs)
return (id, response)
elif operation_type == "update":
response:VespaResponse = sync_session.update_data(schema=schema, namespace=namespace, data_id=id, fields=fields, **kwargs)
response:VespaResponse = sync_session.update_data(
schema=schema, namespace=namespace,
groupname=groupname, data_id=id, fields=fields, **kwargs)
return (id, response)
elif operation_type == "delete":
response:VespaResponse = sync_session.delete_data(schema=schema, namespace=namespace, data_id=id, **kwargs)
response:VespaResponse = sync_session.delete_data(
schema=schema, namespace=namespace, data_id=id, groupname=groupname, **kwargs)
return (id, response)
except Exception as e:
return (id, e)
Expand Down Expand Up @@ -433,21 +441,22 @@ def _handle_result_callback(future:Future, callback:Callable):
consumer_thread.join()

def delete_data(
self, schema: str, data_id: str, namespace: str = None, **kwargs
self, schema: str, data_id: str, namespace: str = None, groupname:str=None, **kwargs
) -> VespaResponse:
"""
Delete a data point from a Vespa app.
:param schema: The schema that we are deleting data from.
:param data_id: Unique id associated with this data point.
:param namespace: The namespace that we are deleting data from. If no namespace is provided the schema is used.
:param groupname: The groupname that we are deleting data from.
:param kwargs: Additional arguments to be passed to the HTTP DELETE request https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters
:return: Response of the HTTP DELETE request.
"""

with VespaSync(self, pool_connections=1, pool_maxsize=1) as sync_app:
return sync_app.delete_data(
schema=schema, data_id=data_id, namespace=namespace, **kwargs
schema=schema, data_id=data_id, namespace=namespace, groupname=groupname, **kwargs
)


Expand Down Expand Up @@ -475,7 +484,7 @@ def delete_all_docs(
)

def get_data(
self, schema:str, data_id: str, namespace: str = None, raise_on_not_found:Optional[bool]=False, **kwargs
self, schema:str, data_id: str, namespace: str = None, groupname:str=None, raise_on_not_found:Optional[bool]=False, **kwargs
) -> VespaResponse:
"""
Get a data point from a Vespa app.
Expand All @@ -484,14 +493,16 @@ def get_data(
:param schema: The schema that we are getting data from. Will attempt to infer schema name if not provided.
:param data_id: Unique id associated with this data point.
:param namespace: The namespace that we are getting data from. If no namespace is provided the schema is used.
:param groupname: The groupname that we are getting data from.
:param raise_on_not_found: Raise an exception if the data_id is not found. Default is False.
:param kwargs: Additional arguments to be passed to the HTTP GET request https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters
:return: Response of the HTTP GET request.
"""

with VespaSync(self,pool_connections=1,pool_maxsize=1) as sync_app:
return sync_app.get_data(
schema=schema, data_id=data_id, namespace=namespace, raise_on_not_found=raise_on_not_found, **kwargs
schema=schema, data_id=data_id, namespace=namespace, groupname=groupname,
raise_on_not_found=raise_on_not_found, **kwargs
)

def update_data(
Expand All @@ -501,6 +512,7 @@ def update_data(
fields: Dict,
create: bool = False,
namespace: str = None,
groupname:str=None,
**kwargs
) -> VespaResponse:
"""
Expand All @@ -511,6 +523,7 @@ def update_data(
:param fields: Dict containing all the fields you want to update.
:param create: If true, updates to non-existent documents will create an empty document to update
:param namespace: The namespace that we are updating data. If no namespace is provided the schema is used.
:param groupname: The groupname that we are updating data.
:param kwargs: Additional arguments to be passed to the HTTP PUT request. https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters
:return: Response of the HTTP PUT request.
"""
Expand All @@ -523,6 +536,7 @@ def update_data(
fields=fields,
create=create,
namespace=namespace,
groupname=groupname,
**kwargs
)

Expand Down Expand Up @@ -668,7 +682,7 @@ def predict(self, model_id, function_name, encoded_tokens):
return response

def feed_data_point(
self, schema: str, data_id: str, fields: Dict, namespace: str = None, **kwargs
self, schema: str, data_id: str, fields: Dict, namespace: str = None, groupname:str=None, **kwargs
) -> VespaResponse:
"""
Feed a data point to a Vespa app.
Expand All @@ -677,12 +691,13 @@ def feed_data_point(
:param data_id: Unique id associated with this data point.
:param fields: Dict containing all the fields required by the `schema`.
:param namespace: The namespace that we are sending data to. If no namespace is provided the schema is used.
:param groupname: The group that we are sending data to.
:param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters)
:return: Response of the HTTP POST request.
:raises HTTPError: if one occurred
"""

path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}".format(
self.app.end_point, path
)
Expand All @@ -699,6 +714,7 @@ def feed_data_point(
def query(
self,
body: Optional[Dict] = None,
groupname:str=None,
**kwargs
) -> VespaQueryResponse:
"""
Expand All @@ -707,18 +723,22 @@ def query(
Send 'body' containing all the request parameters.
:param body: Dict containing all the request parameters.
:param groupname: The groupname used in streaming search
:param kwargs: Additional Valid Vespa HTTP Query Api parameters (https://docs.vespa.ai/en/reference/query-api-reference.html)
:return: Either the request body if debug_request is True or the result from the Vespa application
:raises HTTPError: if one occurred
"""

if groupname:
kwargs["streaming.groupname"] = groupname
response = self.http_session.post(self.app.search_end_point, json=body, params=kwargs)
raise_for_status(response)
return VespaQueryResponse(
json=response.json(), status_code=response.status_code, url=str(response.url)
)

def delete_data(
self, schema: str, data_id: str, namespace: str = None,
self, schema: str, data_id: str, namespace: str = None, groupname:str=None,
**kwargs
) -> VespaResponse:
"""
Expand All @@ -732,11 +752,10 @@ def delete_data(
:raises HTTPError: if one occurred
"""

path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}".format(
self.app.end_point, path
)

response = self.http_session.delete(end_point, params=kwargs)
raise_for_status(response)
return VespaResponse(
Expand Down Expand Up @@ -793,20 +812,22 @@ def delete_slice(slice_id):


def get_data(
self, schema: str, data_id: str, namespace: str = None, raise_on_not_found: Optional[bool]=False, **kwargs
self, schema: str, data_id: str, namespace: str = None, groupname:str = None,
raise_on_not_found: Optional[bool]=False, **kwargs
) -> VespaResponse:
"""
Get a data point from a Vespa app.
:param schema: The schema that we are getting data from.
:param data_id: Unique id associated with this data point.
:param namespace: The namespace that we are getting data from.
:param groupname: The groupname used to get data
:param raise_on_not_found: Raise an exception if the document is not found.
:param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters)
:return: Response of the HTTP GET request.
:raises HTTPError: if one occurred
"""
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}".format(
self.app.end_point, path
)
Expand All @@ -827,6 +848,7 @@ def update_data(
fields: Dict,
create: bool = False,
namespace: str = None,
groupname:str = None,
**kwargs
) -> VespaResponse:
"""
Expand All @@ -837,12 +859,13 @@ def update_data(
:param fields: Dict containing all the fields you want to update.
:param create: If true, updates to non-existent documents will create an empty document to update
:param namespace: The namespace that we are updating data.
:param groupname: The groupname used to update data
:param kwargs: Additional HTTP request parameters (https://docs.vespa.ai/en/reference/document-v1-api-reference.html#request-parameters)
:return: Response of the HTTP PUT request.
:raises HTTPError: if one occurred
"""

path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}?create={}".format(
self.app.end_point, path, str(create).lower()
)
Expand Down Expand Up @@ -914,19 +937,22 @@ async def _wait(f, args, **kwargs):
async def query(
self,
body: Optional[Dict] = None,
groupname:str = None,
**kwargs
) -> VespaQueryResponse:
if groupname:
kwargs["streaming.groupname"] = groupname
r = await self.aiohttp_session.post(self.app.search_end_point, json=body, params=kwargs)
return VespaQueryResponse(
json=await r.json(), status_code=r.status, url=str(r.url)
)

@retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3))
async def feed_data_point(
self, schema: str, data_id: str, fields: Dict, namespace: str = None, **kwargs
self, schema: str, data_id: str, fields: Dict, namespace: str = None, groupname:str=None, **kwargs
) -> VespaResponse:

path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}".format(
self.app.end_point, path
)
Expand All @@ -941,9 +967,9 @@ async def feed_data_point(

@retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3))
async def delete_data(
self, schema: str, data_id: str, namespace: str = None, **kwargs
self, schema: str, data_id: str, namespace: str = None, groupname:str=None, **kwargs
) -> VespaResponse:
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}".format(
self.app.end_point, path
)
Expand All @@ -957,9 +983,9 @@ async def delete_data(

@retry(wait=wait_exponential(multiplier=1), stop=stop_after_attempt(3))
async def get_data(
self, schema: str, data_id: str, namespace: str = None, **kwargs
self, schema: str, data_id: str, namespace: str = None, groupname:str=None, **kwargs
) -> VespaResponse:
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}".format(
self.app.end_point, path
)
Expand All @@ -979,9 +1005,10 @@ async def update_data(
fields: Dict,
create: bool = False,
namespace: str = None,
groupname:str=None,
**kwargs
) -> VespaResponse:
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace)
path = self.app.get_document_v1_path(id=data_id, schema=schema, namespace=namespace, group=groupname)
end_point = "{}{}?create={}".format(
self.app.end_point, path, str(create).lower()
)
Expand Down
6 changes: 6 additions & 0 deletions vespa/templates/services.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
{% if schemas %}
<search></search>
<document-api></document-api>
<document-processing></document-processing>
{% endif %}
{% if components %}
{% for component in components %}
Expand All @@ -36,13 +37,18 @@
<content id="{{ application_name }}_content" version="1.0">
<redundancy>1</redundancy>
<documents>
{% set streaming_modes = namespace(total = 0)%}
{% for schema in schemas %}
{% if schema.global_document %}
<document type="{{ schema.name }}" mode="index" global="true"></document>
{% else %}
<document type="{{ schema.name }}" mode="{{ schema.mode }}"></document>
{% endif %}
{% if schema.mode == "streaming" %}{% set streaming_modes.total = 1 + streaming_modes.total %}{% endif %}
{% endfor %}
{% if streaming_modes.total > 0 %}
<document-processing chain="indexing" cluster="{{ application_name }}_container" />
{% endif %}
</documents>
<nodes>
<node distribution-key="0" hostalias="node1"></node>
Expand Down

0 comments on commit dd409f5

Please sign in to comment.