Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Async version of opensearch module not implemented #10360

Closed
ahs8w opened this issue Jan 30, 2024 · 4 comments
Closed

[Bug]: Async version of opensearch module not implemented #10360

ahs8w opened this issue Jan 30, 2024 · 4 comments
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized

Comments

@ahs8w
Copy link
Contributor

ahs8w commented Jan 30, 2024

Bug Description

Neither OpensearchVectorClient nor OpensearchVectorStore implement async functions

Version

0.9.28.post2

Steps to Reproduce

Follow instructions here to set up opensearch and define query_engine

async def task(query: str):
    print(f"task {query} started.")
    # await asyncio.sleep(3)  # testing method
    await query_engine.aretrieve(query)  # this currently blocks
    print(f"task {query} finished.")

async def spawn_task(queries):
    task_list = [asyncio.create_task(task(n)) for n in queries]
    await asyncio.gather(*task_list)

asyncio.run(spawn_task(queries))

Relevant Logs/Tracbacks

No response

@ahs8w ahs8w added bug Something isn't working triage Issue needs to be triaged/prioritized labels Jan 30, 2024
Copy link

dosubot bot commented Jan 30, 2024

🤖

Hello @ahs8w! I'm Dosu, an AI assistant here to help you while we wait for a human maintainer. I'm pretty handy with bug squashing, answering questions, and guiding you on your journey to becoming a contributor. I'm currently looking into your issue and will be back with a more complete answer shortly. Hang tight!

@ahs8w
Copy link
Contributor Author

ahs8w commented Jan 30, 2024

We currently have this fixed in our code by creating new Async* classes inherited from the opensearch module classes with a monkeypatched function _get_opensearch_client() which instantiates the AsyncOpenSearch client from the opensearch-py library. The aquery functions are the exact same as the synchronous versions except for the addition of the async and await keywords.

import warnings

from typing import cast, Any, Optional, List
from opensearchpy import AsyncOpenSearch

from llama_index.vector_stores import OpensearchVectorClient, OpensearchVectorStore
from llama_index.vector_stores.types import (
    MetadataFilters,
    VectorStoreQuery,
    VectorStoreQueryMode,
    VectorStoreQueryResult,
)
from llama_index.vector_stores.opensearch import (
    INVALID_HYBRID_QUERY_ERROR,
    _hybrid_search_query,
    _knn_search_query,
    metadata_dict_to_node,
    TextNode
)
# Import module to monkeypatch global function _get_opensearch_client()
from llama_index.vector_stores import opensearch 

def _get_opensearch_client(opensearch_url: str, **kwargs: Any) -> Any:
    """Get OpenSearch client from the opensearch_url, otherwise raise error."""

    try:
        client = AsyncOpenSearch(opensearch_url, **kwargs)
    except ValueError as e:
        raise ValueError(
            f"OpenSearch client string provided is not in proper format. "
            f"Got error: {e} "
        )
    return client


class AsyncOpensearchVectorClient(OpensearchVectorClient):
    def __init__(self, *args, **kwargs):
        # monkeypatch global function in different module
        # Globals in Python are global to a module, not across all modules!!!
        opensearch._get_opensearch_client = _get_opensearch_client
        with warnings.catch_warnings():
            # RuntimeWarning: coroutine 'IndicesClient.get' was never awaited
            # Don't care about awaiting init functions right now
            warnings.simplefilter("ignore")
            super().__init__(*args, **kwargs)

    async def aquery(
        self,
        query_mode: VectorStoreQueryMode,
        query_str: Optional[str],
        query_embedding: List[float],
        k: int,
        filters: Optional[MetadataFilters] = None,
    ) -> VectorStoreQueryResult:
        if query_mode == VectorStoreQueryMode.HYBRID:
            if query_str is None or self._search_pipeline is None:
                raise ValueError(INVALID_HYBRID_QUERY_ERROR)
            search_query = _hybrid_search_query(
                self._text_field,
                query_str,
                self._embedding_field,
                query_embedding,
                k,
                filters=filters,
            )
            params = {"search_pipeline": self._search_pipeline}
        else:
            search_query = _knn_search_query(
                self._embedding_field, query_embedding, k, filters=filters
            )
            params = None

        res = await self._os_client.search(
            index=self._index, body=search_query, params=params
        )
        nodes = []
        ids = []
        scores = []
        for hit in res["hits"]["hits"]:
            source = hit["_source"]
            node_id = hit["_id"]
            text = source[self._text_field]
            metadata = source.get("metadata", None)

            try:
                node = metadata_dict_to_node(metadata)
                node.text = text
            except Exception:
                # TODO: Legacy support for old nodes
                node_info = source.get("node_info")
                relationships = source.get("relationships") or {}
                start_char_idx = None
                end_char_idx = None
                if isinstance(node_info, dict):
                    start_char_idx = node_info.get("start", None)
                    end_char_idx = node_info.get("end", None)

                node = TextNode(
                    text=text,
                    metadata=metadata,
                    id_=node_id,
                    start_char_idx=start_char_idx,
                    end_char_idx=end_char_idx,
                    relationships=relationships,
                    extra_info=source,
                )
            ids.append(node_id)
            nodes.append(node)
            scores.append(hit["_score"])
        return VectorStoreQueryResult(nodes=nodes, ids=ids, similarities=scores)


class AsyncOpensearchVectorStore(OpensearchVectorStore):
    
    async def aquery(self, query: VectorStoreQuery, **kwargs: Any) -> VectorStoreQueryResult:
        """Query index for top k most similar nodes.
    
        Args:
            query (VectorStoreQuery): Store query object.
    
        """
        query_embedding = cast(List[float], query.query_embedding)
    
        return await self._client.aquery(
            query.mode,
            query.query_str,
            query_embedding,
            query.similarity_top_k,
            filters=query.filters,
        )

@ahs8w
Copy link
Contributor Author

ahs8w commented Jan 30, 2024

This should be a relatively simple implementation on the llama-index side which I may potentially be able to help with. We were only interested in asynchronous query methods though so I haven't looked into the other db functions and how to make them async.

Copy link

dosubot bot commented Jan 30, 2024

🤖

Thank you for your detailed report and the proposed solution. It seems like you've identified a gap in the current implementation of the OpensearchVectorClient and OpensearchVectorStore classes in the LlamaIndex codebase, specifically the lack of asynchronous functionality.

Your proposed solution of creating new AsyncOpensearchVectorClient and AsyncOpensearchVectorStore classes that inherit from the existing classes and override the query methods with asynchronous versions seems like a viable approach. The monkeypatching of the _get_opensearch_client() function to use the AsyncOpenSearch client from the opensearch-py library is also a clever workaround.

However, it's important to note that while this solution may work for your specific use case, it might not be the best approach for integrating asynchronous functionality into the LlamaIndex codebase as a whole. The introduction of asynchronous methods would likely require a more comprehensive refactoring of the codebase to ensure compatibility and maintainability.

As a maintainer of the LlamaIndex repository, I can't create pull requests or issues. However, I would recommend you to create a pull request with your proposed changes. This would allow the maintainers and contributors of the LlamaIndex repository to review your code and potentially integrate it into the main codebase.

Before you do that, please make sure to thoroughly test your code to ensure it works as expected and doesn't introduce any new issues. Also, please follow the contribution guidelines of the LlamaIndex repository when preparing your pull request.

For more information on how to create a pull request, you can refer to the GitHub documentation.

I hope this helps! If you have any other questions or need further clarification, feel free to ask.

Here for your reference.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized
Projects
None yet
Development

No branches or pull requests

1 participant