In [None]:
#| default_exp query

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
#|hide
from nbdev import nbdev_export
from fastcore.test import test_eq, test_fail

# query
> Reference API related to QueryModel related code

In [None]:
#|export
import os
from typing import Optional, Dict, Callable, List, Tuple, Union
from pandas import DataFrame
from fastcore.utils import patch
from vespa.io import VespaQueryResponse
from vespa.application import Vespa

## Match Filters

In [None]:
#|export
class MatchFilter(object):
    def __init__(self) -> None:    
        "Abstract class for match filters."
        pass

In [None]:
#|export
#|hide
@patch
def create_match_filter(
    self: MatchFilter, 
    query: str  # Query input.
) -> str:  # Part of the YQL expression related to the filter.
    "Abstract method to be implemented that creates part of the YQL expression related to the filter."
    raise NotImplementedError

In [None]:
#|export
#|hide
@patch
def get_query_properties(
    self: MatchFilter, 
    query: Optional[str] = None  # Query input.
) -> Dict:  # Contains the relevant request properties associated with the filter.
    "Abstract method to be implemented that get the relevant request properties associated with the filter."
    raise NotImplementedError

In [None]:
#|export
class AND(MatchFilter):
    def __init__(self) -> None:
        "Filter that match document containing all the query terms."
        super().__init__()

Usage: The `AND` filter is usually used when specifying query models.

In [None]:
and_filter = AND()

In [None]:
#|export
#|hide
@patch
def create_match_filter(
    self: AND, 
    query: str  # Query input.  
) -> str:  # Part of the YQL expression related to the AND filter.
    "Creates part of the YQL expression related to the AND filter"
    return '(userInput("{}"))'.format(query)

In [None]:
#|export
#|hide
@patch
def get_query_properties(
    self: AND, 
    query: Optional[str] = None  # Query input.
) -> Dict:  # Get the relevant request properties associated with the AND filter.
    "Get the relevant request properties associated with the AND filter."
    return {}


In [None]:
#|hide
query = "this is  a test"
test_eq(
    and_filter.create_match_filter(query),
    '(userInput("this is  a test"))',
)
test_eq(and_filter.get_query_properties(query), {})

In [None]:
#|export
class OR(MatchFilter):
    def __init__(self) -> None:
        "Filter that match any document containing at least one query term."
        super().__init__()

Usage: The `OR` filter is usually used when specifying query models.

In [None]:
or_filter = OR()

In [None]:
#|export 
#|hide
@patch
def create_match_filter(
    self: OR, 
    query: str  # Query input.
) -> str:  # Part of the YQL expression related to the OR filter.
    "Creates part of the YQL expression related to the OR filter"    
    return '({{grammar: "any"}}userInput("{}"))'.format(query)

In [None]:
#|export
#|hide
@patch
def get_query_properties(
    self: OR, 
    query: Optional[str] = None  # Query input.
) -> Dict:  # Get the relevant request properties associated with the OR filter.
    "Get the relevant request properties associated with the OR filter."    
    return {}

In [None]:
#|hide
query = "this is  a test"
test_eq(
    or_filter.create_match_filter(query=query),
    '({grammar: "any"}userInput("this is  a test"))',
)
test_eq(or_filter.get_query_properties(query=query), {})

In [None]:
#|export
class WeakAnd(MatchFilter):
    def __init__(
        self, 
        hits: int,  # Lower bound on the number of hits to be retrieved. 
        field: str = "default"  # Which Vespa field to search.
    ) -> None:
        """
        Match documents according to the weakAND algorithm.

        Reference: [https://docs.vespa.ai/en/using-wand-with-vespa.html](https://docs.vespa.ai/en/using-wand-with-vespa.html)
        """
        super().__init__()
        self.hits = hits
        self.field = field

Usage: The `WeakAnd` filter is usually used when specifying query models.

In [None]:
weakand_filter = WeakAnd(hits=10, field="default")

In [None]:
#|export
#|hide
@patch
def create_match_filter(
    self: WeakAnd, 
    query: str  # Query input.
) -> str:  # Part of the YQL expression related to the WeakAnd filter.
    "Creates part of the YQL expression related to the WeakAnd filter"
    query_tokens = query.split(" ")
    terms = ", ".join(
        ['{} contains "{}"'.format(self.field, token) for token in query_tokens]
    )
    return '({{targetHits: {}}}weakAnd({}))'.format(self.hits, terms)


In [None]:
#|export
#|hide
@patch
def get_query_properties(
    self: WeakAnd, 
    query: Optional[str] = None  # Query input.
) -> Dict:  # Get the relevant request properties associated with the WeakAnd filter.
    "Get the relevant request properties associated with the WeakAnd filter."        
    return {}

In [None]:
#|hide
weakand_filter = WeakAnd(hits=10, field="field_name")
query = "this is  a test"
test_eq(
    weakand_filter.create_match_filter(query=query),
    '({targetHits: 10}weakAnd(field_name contains "this", field_name contains "is", field_name contains "", '
    'field_name contains "a", field_name contains "test"))',
)
test_eq(weakand_filter.get_query_properties(query=query), {})

In [None]:
#|export
class ANN(MatchFilter):
    def __init__(
        self,
        doc_vector: str,  # Name of the document field to be used in the distance calculation.
        query_vector: str,  # Name of the query field to be used in the distance calculation.
        hits: int,  # Lower bound on the number of hits to return.
        label: str,  # A label to identify this specific operator instance.
        approximate: bool = True,  # True to use approximate nearest neighbor and False to use brute force. Default to True.
    ) -> None:
        """
        Match documents according to the nearest neighbor operator.

        Reference: [https://docs.vespa.ai/en/reference/query-language-reference.html](https://docs.vespa.ai/en/reference/query-language-reference.html)
        """
        super().__init__()
        self.doc_vector = doc_vector
        self.query_vector = query_vector
        self.hits = hits
        self.label = label
        self.approximate = approximate
        self._approximate = "true" if self.approximate is True else "false"

Usage: The `ANN` filter is usually used when specifying query models.

By default, the `ANN` operator uses approximate nearest neighbor:

In [None]:
match_filter = ANN(
    doc_vector="doc_vector",
    query_vector="query_vector",
    hits=10,
    label="label",
)

Brute-force can be used by specifying `approximate=False`:

In [None]:
ann_filter = ANN(
    doc_vector="doc_vector",
    query_vector="query_vector",
    hits=10,
    label="label",
    approximate=False,
)

In [None]:
#|export
#|hide
@patch
def create_match_filter(
    self: ANN, 
    query: str  # Query input is ignored in the ANN case.
) -> str:  # Part of the YQL expression related to the ANN filter.
    "Creates part of the YQL expression related to the ANN filter"    
    return '({{targetHits: {}, label: "{}", approximate: {}}}nearestNeighbor({}, {}))'.format(
        self.hits, self.label, self._approximate, self.doc_vector, self.query_vector
    )

In [None]:
#|export
#|hide
@patch
def get_query_properties(
    self: ANN, 
    query: Optional[str] = None  # Query input is ignored in the ANN case.
) -> Dict[str, str]:  # Get the relevant request properties associated with the ANN filter.
    "Get the relevant request properties associated with the ANN filter."            
    return {}

In [None]:
#|hide
query = "query string has no effect here"
match_filter = ANN(
    doc_vector="doc_vector",
    query_vector="query_vector",
    hits=10,
    label="label",
)
test_eq(
    match_filter.create_match_filter(query=query),
    '({targetHits: 10, label: "label", approximate: true}nearestNeighbor(doc_vector, query_vector))',
)
test_eq(
    match_filter.get_query_properties(query=query),
    {},
)

In [None]:
#|hide
query = "query string has no effect here"
match_filter = ANN(
    doc_vector="doc_vector",
    query_vector="query_vector",
    hits=10,
    label="label",
    approximate=False,
)
test_eq(
    match_filter.create_match_filter(query=query),
    '({targetHits: 10, label: "label", approximate: false}nearestNeighbor(doc_vector, query_vector))',
)
test_eq(
    match_filter.get_query_properties(query=query),
    {},
)

In [None]:
#|export
class Union(MatchFilter):
    def __init__(
        self, 
        *args: MatchFilter  # Match filters to be taken the union of.
    ) -> None:
        "Match documents that belongs to the union of many match filters."
        super().__init__()
        self.operators = args

Usage: The `Union` filter is usually used when specifying query models.

In [None]:
union_filter = Union(
    WeakAnd(hits=10, field="field_name"),
    ANN(
        doc_vector="doc_vector",
        query_vector="query_vector",
        hits=10,
        label="label",
    ),
)

In [None]:
#|export
#|hide
@patch
def create_match_filter(
    self: Union, 
    query: str  # Query input.
) -> str:  # Part of the YQL expression related to the Union filter.
    "Creates part of the YQL expression related to the Union filter"    
    match_filters = []
    for operator in self.operators:
        match_filter = operator.create_match_filter(query=query)
        if match_filter is not None:
            match_filters.append(match_filter)
    return " or ".join(match_filters)

In [None]:
#|export
#|hide
@patch
def get_query_properties(
    self: Union,  # Query input. 
    query: Optional[str] = None  # Get the relevant request properties associated with the Union filter.
) -> Dict[str, str]:  # Get the relevant request properties associated with the Union filter.
    query_properties = {}
    for operator in self.operators:
        query_properties.update(operator.get_query_properties(query=query))
    return query_properties

In [None]:
#|hide
query = "this is  a test"
match_filter = Union(
    WeakAnd(hits=10, field="field_name"),
    ANN(
        doc_vector="doc_vector",
        query_vector="query_vector",
        hits=10,
        label="label",
    ),
)
test_eq(
    match_filter.create_match_filter(query=query),
    '({targetHits: 10}weakAnd(field_name contains "this", field_name contains "is", '
    'field_name contains "", '
    'field_name contains "a", field_name contains "test")) or '
    '({targetHits: 10, label: "label", approximate: true}nearestNeighbor(doc_vector, query_vector))',
)
test_eq(
    match_filter.get_query_properties(query=query),
    {},
)

## Ranking

In [None]:
#|export
class Ranking(object):
    def __init__(
        self, 
        name: str = "default",  # Name of the rank profile as defined in a Vespa search definition.
        list_features: bool = False  # Should the ranking features be returned. Either 'true' or 'false'.
    ) -> None:
        "Define the rank profile to be used during ranking."
        self.name = name
        self.list_features = "false"
        if list_features:
            self.list_features = "true"

Usage: `Ranking` is usually used when specifying query models.

In [None]:
ranking = Ranking(name="bm25", list_features=True)

In [None]:
#|hide
ranking = Ranking(name="rank_profile", list_features=True)
test_eq(ranking.name, "rank_profile")
test_eq(ranking.list_features, "true")

## Query properties

In [None]:
#|export
class QueryProperty(object):
    def __init__(self) -> None:    
        "Abstract class for query property."
        pass    

In [None]:
#|export
#|hide
@patch
def get_query_properties(
    self: QueryProperty, 
    query: Optional[str] = None  # Query input.
) -> Dict:  # Contains the relevant request properties to be included in the query.
    "Extract query property syntax."
    raise NotImplementedError


In [None]:
#|export
class QueryRankingFeature(QueryProperty):
    def __init__(
        self,
        name: str,  # Name of the feature.
        mapping: Callable[[str], List[float]],  # Function mapping a string to a list of floats.
    ) -> None:
        "Include ranking.feature.query into a Vespa query."
        super().__init__()
        self.name = name
        self.mapping = mapping

Usage: `QueryRankingFeature` is usually used when specifying query models.

In [None]:
query_property = QueryRankingFeature(
    name="query_vector", mapping=lambda x: [1, 2, 3]
)

In [None]:
#|export
#|hide
@patch
def get_query_properties(
    self: QueryRankingFeature, 
    query: Optional[str] = None  # Query input.
) -> Dict[str, str]:  # Contains the relevant request properties to be included in the query.
    value = self.mapping(query)
    return {"ranking.features.query({})".format(self.name): str(value)}

In [None]:
#|hide
query = "this is  a test"
query_property = QueryRankingFeature(
    name="query_vector", mapping=lambda x: [1, 2, 3]
)
test_eq(
    query_property.get_query_properties(query=query),
    {"ranking.features.query(query_vector)": "[1, 2, 3]"},
)

## Query model

In [None]:
#|export
class QueryModel(object):
    def __init__(
        self,
        name: str = "default_name",  # Name of the query model. Used to tag model-related quantities, like evaluation metrics.
        query_properties: Optional[List[QueryProperty]] = None,  # Query properties to be included in the queries.
        match_phase: MatchFilter = AND(),  # Define the match criteria.
        ranking: Ranking = Ranking(),  # Define the rank criteria.
        body_function: Optional[Callable[[str], Dict]] = None,  # Function that take query as parameter and returns the body of a Vespa query.
    ) -> None:
        """
        Define a query model.

        A `QueryModel` is an abstraction that encapsulates all the relevant information
        controlling how a Vespa app matches and ranks documents.
        """
        self.name = name
        self.query_properties = query_properties if query_properties is not None else []
        self.match_phase = match_phase
        self.ranking = ranking
        self.body_function = body_function


Usage:

Specify a query model with default configurations:

In [None]:
query_model = QueryModel()

Specify match phase, ranking phase and properties used by them.

In [None]:
query_model = QueryModel(
    query_properties=[
        QueryRankingFeature(name="query_embedding", mapping=lambda x: [1, 2, 3])
    ],
    match_phase=ANN(
        doc_vector="document_embedding",
        query_vector="query_embedding",
        hits=10,
        label="label",
    ),
    ranking=Ranking(name="bm25_plus_embeddings", list_features=True),
)

Specify a query model based on a function that output Vespa YQL.

In [None]:
def body_function(query):
    body = {
        "yql": "select * from sources * where userQuery();",
        "query": query,
        "type": "any",
        "ranking": {"profile": "bm25", "listFeatures": "true"},
    }
    return body

query_model = QueryModel(body_function=body_function)

In [None]:
#|export
#|hide
@patch
def create_body(
    self: QueryModel, 
    query: str  # Query string.
) -> Dict[str, str]:  # Request body
    "Create the appropriate request body to be sent to Vespa."

    if self.body_function:
        body = self.body_function(query)
        return body

    query_properties = {}
    for query_property in self.query_properties:
        query_properties.update(query_property.get_query_properties(query=query))
    query_properties.update(self.match_phase.get_query_properties(query=query))

    match_filter = self.match_phase.create_match_filter(query=query)

    body = {
        "yql": "select * from sources * where {};".format(match_filter),
        "ranking": {
            "profile": self.ranking.name,
            "listFeatures": self.ranking.list_features,
        },
    }
    body.update(query_properties)
    return body

In [None]:
#|hide
query = "this is  a test"
query_model = QueryModel()
test_eq(
    query_model.create_body(query=query),
    {
        "yql": 'select * from sources * where (userInput("this is  a test"));',
        "ranking": {"profile": "default", "listFeatures": "false"},
    },
)

In [None]:
#|hide
query = "this is  a test"
def body_function(query):
    body = {
        "yql": "select * from sources * where userQuery();",
        "query": query,
        "type": "any",
        "ranking": {"profile": "bm25", "listFeatures": "true"},
    }
    return body

query_model = QueryModel(body_function=body_function)
test_eq(
    query_model.create_body(query=query),
    {
        "yql": "select * from sources * where userQuery();",
        "query": "this is  a test",
        "type": "any",
        "ranking": {"profile": "bm25", "listFeatures": "true"},
    },
)

In [None]:
#|hide
query = "this is  a test"
query_model = QueryModel(
    query_properties=[
        QueryRankingFeature(name="query_vector", mapping=lambda x: [1, 2, 3])
    ],
    match_phase=OR(),
    ranking=Ranking(name="bm25", list_features=True),
)
test_eq(
    query_model.create_body(query=query),
    {
        "yql": 'select * from sources * where ({grammar: "any"}userInput("this is  a test"));',
        "ranking": {"profile": "bm25", "listFeatures": "true"},
        "ranking.features.query(query_vector)": "[1, 2, 3]",
    },
)

In [None]:
#|hide
query = "this is  a test"
query_model = QueryModel(
    query_properties=[
        QueryRankingFeature(name="query_vector", mapping=lambda x: [1, 2, 3])
    ],
    match_phase=ANN(
        doc_vector="doc_vector",
        query_vector="query_vector",
        hits=10,
        label="label",
    ),
    ranking=Ranking(name="bm25", list_features=True),
)
test_eq(
    query_model.create_body(query=query),
    {
        "yql": 'select * from sources * where ({targetHits: 10, label: "label", approximate: true}nearestNeighbor(doc_vector, query_vector));',
        "ranking": {"profile": "bm25", "listFeatures": "true"},
        "ranking.features.query(query_vector)": "[1, 2, 3]",
    },
)

## Send query with QueryModel

In [None]:
#|hide

In [None]:
#|export
def _build_query_body(
    query: str,
    query_model: QueryModel,
    recall: Optional[Tuple] = None,
    **kwargs,
) -> Dict:
    assert query_model is not None, "No 'query_model' specified."
    body = query_model.create_body(query=query)
    if recall is not None:
        body.update(
            {
                "recall": "+("
                + " ".join(
                    ["{}:{}".format(recall[0], str(doc)) for doc in recall[1]]
                )
                + ")"
            }
        )
    body.update(kwargs)
    return body

In [None]:
#|export
def send_query(
    app: Vespa,  # Connection to a Vespa application
    body: Optional[Dict] = None,  # Contains all the request parameters. None when using `query_model`.
    query: Optional[str] = None,  # Query string. None when using `body`.
    query_model: Optional[QueryModel] = None,  # Query model. None when using `body`.
    debug_request: bool = False,  # Return request body for debugging instead of sending the request.
    recall: Optional[Tuple] = None,  # Tuple of size 2 where the first element is the name of the field to use to recall and the second element is a list of the values to be recalled.
    **kwargs,  # Additional parameters to be sent along the request.
) -> VespaQueryResponse:  # Either the request body if debug_request is True or the result from the Vespa application.
    """
    Send a query request to a Vespa application.

    Either send 'body' containing all the request parameters or specify 'query' and 'query_model'.
    """
    body = (
        _build_query_body(query, query_model, recall, **kwargs)
        if body is None
        else body
    )
    if debug_request:
        return VespaQueryResponse(
            json={}, status_code=None, url=None, request_body=body
        )
    else:
        return app.query(body=body)

Usage: Assume `app` is a Vespa connection.

In [None]:
#|hide
# setup a vespa app
from vespa.package import ApplicationPackage, Field, FieldSet, RankProfile
from vespa.deployment import VespaDocker

In [None]:
#|hide
app_package = ApplicationPackage(name="QueryApp")
app_package.schema.add_fields(
    Field(name="doc_id", type="string", indexing=["attribute", "summary"]),
    Field(name="text", type="string", indexing=["index", "summary"], index="enable-bm25"),
)
app_package.schema.add_field_set(
    FieldSet(name="default", fields=["text"])
)
app_package.schema.add_rank_profile(
    RankProfile(
        name="bm25",
        first_phase="bm25(text)",
        summary_features=["bm25(text)"]
    )
)
vespa_docker = VespaDocker(port=8083, cfgsrv_port=19073)
app = vespa_docker.deploy(application_package=app_package)
df = DataFrame(
    data={
        "doc_id": [f"{i}" for i in range(10)],
        "text": [f"this is title {i}" for i in range(10)],
    }
)
responses = app.feed_df(
    df=df, 
    include_id=True, 
    id_field="doc_id"
)

Send request body.

In [None]:
body = {"yql": "select * from sources * where test"}
result = send_query(app=app, body=body)

In [None]:
#|hide
app_test = Vespa(url="http://localhost", port=8080)

body = {"yql": "select * from sources * where test"}
test_eq(
    send_query(app=app_test, body=body, debug_request=True).request_body, body
)

Use `query` and `query_model`:

In [None]:
result = send_query(
    app=app,
    query="this is a test",
    query_model=QueryModel(
        match_phase=OR(), 
        ranking=Ranking()
    ),
    hits=10,
)

Debug the output of the `QueryModel` by setting `debug_request=True`:

In [None]:
send_query(
    app=app,
    query="this is a test",
    query_model=QueryModel(match_phase=OR(), ranking=Ranking()),
    debug_request=True,
    hits=10,
).request_body

In [None]:
#|hide
test_eq(
    send_query(
        app=app_test,
        query="this is a test",
        query_model=QueryModel(match_phase=OR(), ranking=Ranking()),
        debug_request=True,
        hits=10,
    ).request_body,
    {
        "yql": 'select * from sources * where ({grammar: "any"}userInput("this is a test"));',
        "ranking": {"profile": "default", "listFeatures": "false"},
        "hits": 10,
    },
)

Recall documents using the `id` field:

In [None]:
result = send_query(
    app=app,
    query="this is a test",
    query_model=QueryModel(match_phase=OR(), ranking=Ranking()),
    hits=10,
    recall=("id", [1, 5]),
)

In [None]:
#|hide
test_eq(
    send_query(
        app=app_test,
        query="this is a test",
        query_model=QueryModel(match_phase=OR(), ranking=Ranking()),
        debug_request=True,
        hits=10,
        recall=("id", [1, 5]),
    ).request_body,
    {
        "yql": 'select * from sources * where ({grammar: "any"}userInput("this is a test"));',
        "ranking": {"profile": "default", "listFeatures": "false"},
        "hits": 10,
        "recall": "+(id:1 id:5)",
    },
)

Use a `body_function` to specify a `QueryModel`:

In [None]:
def body_function(query):
    body = {
        "yql": "select * from sources * where userQuery();",
        "query": query,
        "type": "any",
        "ranking": {"profile": "bm25", "listFeatures": "true"},
    }
    return body

query_model = QueryModel(body_function=body_function)

result = send_query(
        app=app,
        query="this is a test",
        query_model=query_model,
        hits=10
)

In [None]:
#|hide
def body_function(query):
    body = {
        "yql": "select * from sources * where userQuery();",
        "query": query,
        "type": "any",
        "ranking": {"profile": "bm25", "listFeatures": "true"},
    }
    return body

query_model = QueryModel(body_function=body_function)

test_eq(
    send_query(
        app=app,
        query="this is a test",
        query_model=query_model,
        debug_request=True,
        hits=10,
        recall=("id", [1, 5]),
    ).request_body,
    {
        "yql": "select * from sources * where userQuery();",
        "query": "this is a test",
        "type": "any",
        "ranking": {"profile": "bm25", "listFeatures": "true"},
        "hits": 10,
        "recall": "+(id:1 id:5)",
    },
)

In [None]:
#|hide
test_fail(
    send_query, 
    kwargs={"app": app, "query": "this should not work"}, 
    contains="No 'query_model' specified."
)

In [None]:
#|export
def send_query_batch(
    app,  # Connection to a Vespa application
    body_batch: Optional[List[Dict]] = None,  # Contains all the request parameters. Set to None if using 'query_batch'.
    query_batch: Optional[List[str]] = None,  # Query strings. Set to None if using 'body_batch'.
    query_model: Optional[QueryModel] = None,  # Query model to use when sending query strings. Set to None if using 'body_batch'.
    recall_batch: Optional[List[Tuple]] = None,  # One tuple for each query. Tuple of size 2 where the first element is the name of the field to use to recall and the second element is a list of the values to be recalled.
    asynchronous=True,  # Set True to send data in async mode. Default to True.
    connections: Optional[int] = 100,  # Number of allowed concurrent connections, valid only if `asynchronous=True`.
    total_timeout: int = 100,  # Total timeout in secs for each of the concurrent requests when using `asynchronous=True`.
    **kwargs,  # Additional parameters to be sent along the request.
) -> List[VespaQueryResponse]:  # HTTP POST responses.
    "Send queries in batch to a Vespa app."

    if body_batch:
        assert (
            query_batch is None
        ), "'query_batch' has no effect if 'body_batch' is not None."
    elif query_batch:
        assert (
            body_batch is None
        ), "'body_batch' has no effect if 'query_batch' is not None."
        assert (
            query_model is not None
        ), "Specify a 'query_model' when using 'query_batch' argument."
        number_of_queries = len(query_batch)

        if recall_batch:
            assert (
                len(recall_batch) == number_of_queries
            ), "Specify one recall tuple for each query in the batch."
            body_batch = [
                _build_query_body(
                    query=query, 
                    query_model=query_model, 
                    recall=recall,
                    **kwargs
                ) for query, recall in zip(query_batch, recall_batch)
            ]
        else:
            body_batch = [
                _build_query_body(
                    query=query, 
                    query_model=query_model, 
                    **kwargs
                ) for query in query_batch
            ]
    else:
        ValueError("Specify either 'query_batch' or 'body_batch'.")

    return app.query_batch(
        body_batch=body_batch,
        asynchronous=asynchronous,
        connections=connections,
        total_timeout=total_timeout,
    )

Use `body_batch` to send a batch of body requests.

In [None]:
body_batch = [
    {"yql": "select * from sources * where test"},
    {"yql": "select * from sources * where test2"}
]
result = send_query_batch(app=app, body_batch=body_batch)

In [None]:
#|hide
body_batch = [
    {"yql": "select * from sources * where test"},
    {"yql": "select * from sources * where test2"}
]
result = send_query_batch(app=app, body_batch=body_batch)
test_eq(len(result), 2)

Use `query_batch` to send a batch of query strings to be ranked according a `QueryModel`.

In [None]:
result = send_query_batch(
    app=app,
    query_batch=["this is a test", "this is a test 2"],
    query_model=QueryModel(
        match_phase=OR(), 
        ranking=Ranking()
    ),
    hits=10,
)

In [None]:
#|hide
result = send_query_batch(
    app=app,
    query_batch=["this is a test", "this is a test 2"],
    query_model=QueryModel(
        match_phase=OR(), 
        ranking=Ranking()
    ),
    hits=10,
)
test_eq(len(result), 2)

Use `recall_batch` to send one tuple for each query in `query_batch`.

In [None]:
result = send_query_batch(
    app=app,
    query_batch=["this is a test", "this is a test 2"],
    query_model=QueryModel(match_phase=OR(), ranking=Ranking()),
    hits=10,
    recall_batch=[("doc_id", [2, 7]), ("doc_id", [0, 5])],
)

In [None]:
#|hide
result = send_query_batch(
    app=app,
    query_batch=["this is a test 2", "this is a test 0"],
    query_model=QueryModel(match_phase=OR(), ranking=Ranking()),
    hits=10,
    recall_batch=[("doc_id", [2, 7]), ("doc_id", [0, 5])],
)
test_eq(['2', '7', '0', '5'], [result[i].hits[j]["fields"]["doc_id"] for i in range(2) for j in range(2)])

## Collect Vespa features

In [None]:
#|export
def _annotate_data(
    hits, query_id, id_field, relevant_id, fields, relevant_score, default_score
):
    data = []
    for h in hits:
        record = {}
        record.update({"document_id": h["fields"][id_field]})
        record.update({"query_id": query_id})
        record.update(
            {
                "label": relevant_score
                if h["fields"][id_field] == relevant_id
                else default_score
            }
        )
        for field in fields:
            field_value = h["fields"].get(field, None)
            if field_value:
                if isinstance(field_value, dict):
                    record.update(field_value)
                else:
                    record.update({field: field_value})
        data.append(record)
    return data


In [None]:
#|export
def collect_vespa_features(
    app,
    labeled_data: Union[List[Dict], DataFrame],  # Labelled data containing query, query_id and relevant ids. See examples about data format.
    id_field: str,  # The Vespa field representing the document id.
    query_model: QueryModel,  # Query model.
    number_additional_docs: int,  # Number of additional documents to retrieve for each relevant document. Duplicate documents will be dropped.
    fields: List[str],  # Vespa fields to collect, e.g. ["rankfeatures", "summaryfeatures"]
    keep_features: Optional[List[str]] = None,  # List containing the names of the features that should be returned. Default to None, which return all the features contained in the 'fields' argument.
    relevant_score: int = 1,  # Score to assign to relevant documents. Default to 1.
    default_score: int = 0,  # Score to assign to the additional documents that are not relevant. Default to 0.
    **kwargs,  # Extra keyword arguments to be included in the Vespa Query.
) -> DataFrame:  # DataFrame containing document id (document_id), query id (query_id), scores (relevant) and vespa rank features returned by the Query model RankProfile used.
    """
    Collect Vespa features based on a set of labelled data.

    labeled_data can be a DataFrame or a List of Dict:

    >>> labeled_data_df = DataFrame(
    ...     data={
    ...         "qid": [0, 0, 1, 1],
    ...         "query": ["Intrauterine virus infections and congenital heart disease", "Intrauterine virus infections and congenital heart disease", "Clinical and immunologic studies in identical twins discordant for systemic lupus erythematosus", "Clinical and immunologic studies in identical twins discordant for systemic lupus erythematosus"],
    ...         "doc_id": [0, 3, 1, 5],
    ...         "relevance": [1,1,1,1]
    ...     }
    ... )

    >>> labeled_data = [
    ...     {
    ...         "query_id": 0,
    ...         "query": "Intrauterine virus infections and congenital heart disease",
    ...         "relevant_docs": [{"id": 0, "score": 1}, {"id": 3, "score": 1}]
    ...     },
    ...     {
    ...         "query_id": 1,
    ...         "query": "Clinical and immunologic studies in identical twins discordant for systemic lupus erythematosus",
    ...         "relevant_docs": [{"id": 1, "score": 1}, {"id": 5, "score": 1}]
    ...     }
    ... ]
    """

    if isinstance(labeled_data, DataFrame):
        labeled_data = parse_labeled_data(df=labeled_data)

    flat_data = [
        (
            data["query_id"],
            data["query"],
            relevant_doc["id"],
            relevant_doc.get("score", relevant_score),
        )
        for data in labeled_data
        for relevant_doc in data["relevant_docs"]
    ]

    queries = [x[1] for x in flat_data]
    relevant_search = send_query_batch(
        app=app,
        query_batch=queries,
        query_model=query_model,
        recall_batch=[(id_field, [x[2]]) for x in flat_data],
        **kwargs,
    )
    result = []
    for ((query_id, query, relevant_id, relevant_score), query_result) in zip(
        flat_data, relevant_search
    ):
        result.extend(
            _annotate_data(
                hits=query_result.hits,
                query_id=query_id,
                id_field=id_field,
                relevant_id=relevant_id,
                fields=fields,
                relevant_score=relevant_score,
                default_score=default_score,
            )
        )
    if number_additional_docs > 0:
        additional_hits_result = send_query_batch(
            app=app,
            query_batch=queries,
            query_model=query_model,
            hits=number_additional_docs,
            **kwargs,
        )
        for ((query_id, query, relevant_id, relevant_score), query_result) in zip(
            flat_data, additional_hits_result
        ):
            result.extend(
                _annotate_data(
                    hits=query_result.hits,
                    query_id=query_id,
                    id_field=id_field,
                    relevant_id=relevant_id,
                    fields=fields,
                    relevant_score=relevant_score,
                    default_score=default_score,
                )
            )
    df = DataFrame.from_records(result)
    df = df.drop_duplicates(["document_id", "query_id", "label"])
    df = df.sort_values("query_id")
    if keep_features:
        df = df[["document_id", "query_id", "label"] + keep_features]
    return df

In [None]:
#|hide
labeled_data = [
    {
        "query_id": 0,
        "query": "give me title 1",
        "relevant_docs": [{"id": "1", "score": 1}],
    },
    {
        "query_id": 1,
        "query": "give me title 3",
        "relevant_docs": [{"id": "3", "score": 1}],
    },
]
rank_features = collect_vespa_features(
    app=app,
    labeled_data=labeled_data,
    id_field="doc_id",
    query_model=QueryModel(
        match_phase=OR(), ranking=Ranking(name="bm25", list_features=True)
    ),
    number_additional_docs=2,
    fields=["rankfeatures"],
)
test_eq(rank_features.shape[0] > 2, True)
test_eq(rank_features.shape[1] > 3, True)

In [None]:
#|hide
labeled_data = [
    {
        "query_id": 0,
        "query": "give me title 1",
        "relevant_docs": [{"id": "1", "score": 1}],
    },
    {
        "query_id": 1,
        "query": "give me title 3",
        "relevant_docs": [{"id": "3", "score": 1}],
    },
]
rank_features = collect_vespa_features(
    app=app,
    labeled_data=labeled_data,
    id_field="doc_id",
    query_model=QueryModel(
        match_phase=OR(), ranking=Ranking(name="bm25", list_features=True)
    ),
    number_additional_docs=2,
    fields=["rankfeatures"],
    keep_features=["textSimilarity(text).score"],
)
test_eq(rank_features.shape[0] > 2, True)
# It should have at least one rank feature in addition to document_id, query_id and label
test_eq(rank_features.shape[1] == 4, True)

In [None]:
#|hide
labeled_data = [
    {
        "query_id": 0,
        "query": "give me title 1",
        "relevant_docs": [{"id": "1", "score": 1}],
    }
]
rank_features = collect_vespa_features(
    app=app,
    labeled_data=labeled_data,
    id_field="doc_id",
    query_model=QueryModel(
        match_phase=OR(), ranking=Ranking(name="bm25", list_features=True)
    ),
    number_additional_docs=12,
    fields=["rankfeatures"],
    keep_features=["textSimilarity(text).score"],
)
document_ids = rank_features.document_id.tolist()
test_eq(len(document_ids), len(set(document_ids)))

In [None]:
#|export
def store_vespa_features(
    app,
    output_file_path: str,
    labeled_data,
    id_field: str,
    query_model: QueryModel,
    number_additional_docs: int,
    fields: List[str],
    keep_features: Optional[List[str]] = None,
    relevant_score: int = 1,
    default_score: int = 0,
    batch_size=1000,
    **kwargs,
):
    """
    Retrieve Vespa rank features and store them in a .csv file.

    :param output_file_path: Path of the .csv output file. It will create the file of it does not exist and
        append the vespa features to an pre-existing file.
    :param labeled_data: Labelled data containing query, query_id and relevant ids. See details about data format.
    :param id_field: The Vespa field representing the document id.
    :param query_model: Query model.
    :param number_additional_docs: Number of additional documents to retrieve for each relevant document.
    :param fields: List of Vespa fields to collect, e.g. ["rankfeatures", "summaryfeatures"]
    :param keep_features: List containing the names of the features that should be returned. Default to None,
        which return all the features contained in the 'fields' argument.
    :param relevant_score: Score to assign to relevant documents. Default to 1.
    :param default_score: Score to assign to the additional documents that are not relevant. Default to 0.
    :param batch_size: The size of the batch of labeled data points to be processed.
    :param kwargs: Extra keyword arguments to be included in the Vespa Query.
    :return: returns 0 upon success.
    """

    if isinstance(labeled_data, DataFrame):
        labeled_data = parse_labeled_data(df=labeled_data)

    mini_batches = [
        labeled_data[i : i + batch_size]
        for i in range(0, len(labeled_data), batch_size)
    ]
    for idx, mini_batch in enumerate(mini_batches):
        vespa_features = collect_vespa_features(
            app=app,
            labeled_data=mini_batch,
            id_field=id_field,
            query_model=query_model,
            number_additional_docs=number_additional_docs,
            fields=fields,
            keep_features=keep_features,
            relevant_score=relevant_score,
            default_score=default_score,
            **kwargs,
        )
        if os.path.isfile(output_file_path):
            vespa_features.to_csv(
                path_or_buf=output_file_path, header=False, index=False, mode="a"
            )
        else:
            vespa_features.to_csv(
                path_or_buf=output_file_path, header=True, index=False, mode="w"
            )
        print(
            "Rows collected: {}.\nBatch progress: {}/{}.".format(
                vespa_features.shape[0],
                idx + 1,
                len(mini_batches),
            )
        )
    return 0


In [None]:
#|hide
from pandas import read_csv

labeled_data = [
    {
        "query_id": 0,
        "query": "give me title 1",
        "relevant_docs": [{"id": "1", "score": 1}],
    },
    {
        "query_id": 1,
        "query": "give me title 3",
        "relevant_docs": [{"id": "3", "score": 1}],
    },
]

store_vespa_features(
    app=app,
    output_file_path="vespa_features.csv",
    labeled_data=labeled_data,
    id_field="doc_id",
    query_model=QueryModel(
        match_phase=OR(), ranking=Ranking(name="bm25", list_features=True)
    ),
    number_additional_docs=2,
    fields=["rankfeatures", "summaryfeatures"],
)
rank_features = read_csv("vespa_features.csv")
# at least two relevant docs
test_eq(rank_features.shape[0] > 2, True)
# at least one feature besides document_id, query_id and label
test_eq(rank_features.shape[1] > 3, True)

In [None]:
#|hide
vespa_docker.container.stop(timeout=600)
vespa_docker.container.remove()

In [None]:
#|hide
nbdev_export()