-
Notifications
You must be signed in to change notification settings - Fork 4.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: OpenSearch Vector Store have a lot of connection problem after #11513 #13692
Comments
To address the connection problems with the OpenSearch vector store after the implementation of #11513, especially when using LlamaIndex with FastAPI, you need to ensure that the event loop is managed correctly within the FastAPI context. The errors you're encountering, such as "This event loop is already running" and "ConnectionError(Timeout context manager should be used inside a task) caused by: RuntimeError(Timeout context manager should be used inside a task)," are typically related to improper handling of asynchronous operations. Here is a revised approach to handle the event loop properly:
Below is an example of how you can modify your code to handle these issues: import asyncio
import logging
import pytest
import uuid
from typing import List, Generator
from llama_index.legacy.schema import NodeRelationship, RelatedNodeInfo, TextNode
from llama_index.legacy.vector_stores.opensearch import (
OpensearchVectorClient,
OpensearchVectorStore,
)
from llama_index.legacy.vector_stores.types import VectorStoreQuery
logging.basicConfig(level=logging.DEBUG)
try:
from opensearchpy import AsyncOpenSearch
async def check_opensearch():
os_client = AsyncOpenSearch("localhost:9200")
await os_client.info()
await os_client.close()
asyncio.run(check_opensearch())
opensearch_not_available = False
except (ImportError, Exception):
opensearch_not_available = True
@pytest.mark.skipif(opensearch_not_available, reason="opensearch is not available")
def test_connection() -> None:
assert True
@pytest.fixture()
def index_name() -> str:
"""Return the index name."""
return f"test_{uuid.uuid4().hex}"
@pytest.fixture()
def os_store(index_name: str) -> Generator[OpensearchVectorStore, None, None]:
client = OpensearchVectorClient(
endpoint="localhost:9200",
index=index_name,
dim=3,
)
yield OpensearchVectorStore(client)
# teardown step
# delete index
asyncio.run(client._os_client.indices.delete(index=index_name))
# close client aiohttp session
asyncio.run(client._os_client.close())
@pytest.fixture(scope="session")
def node_embeddings() -> List[TextNode]:
return [
TextNode(
text="lorem ipsum",
id_="c330d77f-90bd-4c51-9ed2-57d8d693b3b0",
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-0")},
metadata={
"author": "Stephen King",
"theme": "Friendship",
},
embedding=[1.0, 0.0, 0.0],
),
TextNode(
text="lorem ipsum",
id_="c3d1e1dd-8fb4-4b8f-b7ea-7fa96038d39d",
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-1")},
metadata={
"director": "Francis Ford Coppola",
"theme": "Mafia",
},
embedding=[0.0, 1.0, 0.0],
),
TextNode(
text="lorem ipsum",
id_="c3ew11cd-8fb4-4b8f-b7ea-7fa96038d39d",
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-2")},
metadata={
"director": "Christopher Nolan",
},
embedding=[0.0, 0.0, 1.0],
),
TextNode(
text="I was taught that the way of progress was neither swift nor easy.",
id_="0b31ae71-b797-4e88-8495-031371a7752e",
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-3")},
metadata={
"author": "Marie Curie",
},
embedding=[0.0, 0.0, 0.9],
),
TextNode(
text=(
"The important thing is not to stop questioning."
+ " Curiosity has its own reason for existing."
),
id_="bd2e080b-159a-4030-acc3-d98afd2ba49b",
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-4")},
metadata={
"author": "Albert Einstein",
},
embedding=[0.0, 0.0, 0.5],
),
TextNode(
text=(
"I am no bird; and no net ensnares me;"
+ " I am a free human being with an independent will."
),
id_="f658de3b-8cef-4d1c-8bed-9a263c907251",
relationships={NodeRelationship.SOURCE: RelatedNodeInfo(node_id="test-5")},
metadata={
"author": "Charlotte Bronte",
},
embedding=[0.0, 0.0, 0.3],
),
]
def count_docs_in_index(os_store: OpensearchVectorStore) -> int:
"""Refresh indices and return the count of documents in the index."""
asyncio.run(os_store.client._os_client.indices.refresh(index=os_store.client._index))
count = asyncio.run(os_store.client._os_client.count(index=os_store.client._index))
return count["count"]
@pytest.mark.skipif(opensearch_not_available, reason="opensearch is not available")
def test_functionality(
os_store: OpensearchVectorStore, node_embeddings: List[TextNode]
) -> None:
# add
assert len(os_store.add(node_embeddings)) == len(node_embeddings)
# query
exp_node = node_embeddings[3]
query = VectorStoreQuery(query_embedding=exp_node.embedding, similarity_top_k=1)
query_result = os_store.query(query)
assert query_result.nodes
assert query_result.nodes[0].get_content() == exp_node.text
# delete one node using its associated doc_id
os_store.delete("test-1")
assert count_docs_in_index(os_store) == len(node_embeddings) - 1 This approach ensures that the event loop is properly managed and avoids the "This event loop is already running" and "ConnectionError(Timeout context manager should be used inside a task) caused by: RuntimeError(Timeout context manager should be used inside a task)" errors. If the issue persists, please provide more specific logs or tracebacks to further diagnose the problem.
|
It would be nice to support async in this vector store. If you have any ideas for improvements, open to PRs 😇 |
@logan-markewich I Agree with the async, but I think would be great if the changes were backward compatible or at least made them optional, so it won't break an existing implementation. Unfortunately, I'm not really good with OpenSearch connection or python async handling, so I can't help with this one. |
Bug Description
I noticed that after #11513, my OpenSearch vector store connection became unstable, and we can confirm this by trending bug reports related to this issue recently. (#13358, #11661, etc)
I also see it suggested to use asyncio, but even with that we can still get the errors
I also notice, this error getting worse when we use LlamaIndex with FastAPI,
Version
latest
Steps to Reproduce
Just use OpenSearch vector store long enough that you'll notice it's become unstable.
Relevant Logs/Tracbacks
No response
The text was updated successfully, but these errors were encountered: