Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Failed to get vector store from OpenSearch within an async endpoint with v0.10.16 #11661

Closed
mw19930312 opened this issue Mar 5, 2024 · 11 comments · Fixed by #11707
Closed
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized

Comments

@mw19930312
Copy link

Bug Description

I'm migrating llama_index to v0.10.16. However, I cannot get a vector store from Opensearch within an endpoint that is triggered asynchronously. I provided the code below, where the function is called in an async endpoint.

Version

v0.10.16

Steps to Reproduce

@sentry_sdk.trace
def get_unstructure_data_retrieval_engine(system_prompt: str, config: TextSearchQueryEngineConfig) -> BaseQueryEngine:
vector_store = opensearch_client.get_vector_store(config.index_name)

Relevant Logs/Tracbacks

2024-03-05T19:35:38.663638Z::error::uvicorn.error::Exception in ASGI application
 {
  "type": "log",
  "level_num": 40,
  "level_name": "error",
  "env": "dev",
  "aws_region": "us-east-2"
}
Traceback (most recent call last):
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/anyio/streams/memory.py", line 98, in receive
    return self.receive_nowait()
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/anyio/streams/memory.py", line 93, in receive_nowait
    raise WouldBlock
anyio.WouldBlock

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/middleware/base.py", line 159, in call_next
    message = await recv_stream.receive()
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/anyio/streams/memory.py", line 118, in receive
    raise EndOfStream
anyio.EndOfStream

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 419, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/middleware/base.py", line 191, in __call__
    response = await self.dispatch_func(request, call_next)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Desktop/platform/gateway/gateway/api/middleware/log_middleware.py", line 48, in dispatch
    response = await call_next(request)
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/middleware/base.py", line 165, in call_next
    raise app_exc
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/middleware/base.py", line 151, in coro
    await self.app(scope, receive_or_disconnect, send_no_error)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/middleware/cors.py", line 83, in __call__
    await self.app(scope, receive, send)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 62, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/routing.py", line 758, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/routing.py", line 778, in app
    await route.handle(scope, receive, send)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/routing.py", line 299, in handle
    await self.app(scope, receive, send)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/routing.py", line 79, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/starlette/routing.py", line 74, in app
    response = await func(request)
               ^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/fastapi/routing.py", line 299, in app
    raise e
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/fastapi/routing.py", line 294, in app
    raw_response = await run_endpoint_function(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/fastapi/routing.py", line 191, in run_endpoint_function
    return await dependant.call(**values)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Desktop/platform/gateway/gateway/api/endpoints/chat.py", line 44, in stream
    return await handle_chat_stream(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/sentry_sdk/tracing_utils_py3.py", line 47, in func_with_tracing
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Desktop/platform/gateway/gateway/messaging.py", line 525, in handle_chat_stream
    chat_engine = get_chat_engine(
                  ^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/sentry_sdk/tracing_utils_py3.py", line 70, in func_with_tracing
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Desktop/platform/gateway/gateway/llama_index/engines.py", line 181, in get_chat_engine
    citation_query_engine = get_unstructure_data_retrieval_engine(system_prompt, config)
                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/sentry_sdk/tracing_utils_py3.py", line 70, in func_with_tracing
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Desktop/platform/gateway/gateway/llama_index/engines.py", line 134, in get_unstructure_data_retrieval_engine
    vector_store = opensearch_client.get_vector_store(config.index_name)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Desktop/platform/gateway/gateway/client/opensearch.py", line 61, in get_vector_store
    store_client = OpensearchVectorClient(
                   ^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/llama_index/vector_stores/opensearch/base.py", line 325, in __init__
    event_loop.run_until_complete(
  File "uvloop/loop.pyx", line 1511, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1504, in uvloop.loop.Loop.run_until_complete
  File "uvloop/loop.pyx", line 1377, in uvloop.loop.Loop.run_forever
  File "uvloop/loop.pyx", line 518, in uvloop.loop.Loop._run
RuntimeError: this event loop is already running.
@mw19930312 mw19930312 added bug Something isn't working triage Issue needs to be triaged/prioritized labels Mar 5, 2024
Copy link

dosubot bot commented Mar 5, 2024

Hey @mw19930312! Great to see you back here, navigating the twists and turns of LlamaIndex with us. Let's dive into this new challenge together.

The error you're encountering, "RuntimeError: this event loop is already running", typically happens when you're trying to run an asyncio event loop within an already running event loop in the same thread. This can occur when using asyncio.get_event_loop().run_until_complete() or similar functions to run asynchronous code from a synchronous context, but the event loop is already running.

In your case, the error is likely caused by the use of asyncio.get_event_loop().run_until_complete() within methods like add, delete, and query of the OpensearchVectorStore class. These methods are designed to provide a synchronous interface to asynchronous operations by starting and running an event loop. However, if these methods are called from an asynchronous context where an event loop is already running (for example, within an async function or an already running event loop), the attempt to start a new event loop with run_until_complete() will lead to the "RuntimeError: this event loop is already running" error.

To resolve this issue, you should refactor your code to avoid starting a new event loop inside an already running one. One approach is to directly use the asynchronous versions of these methods (async_add, adelete, aquery) in asynchronous contexts, thereby avoiding the need to start a new event loop. For synchronous contexts where it's necessary to run asynchronous code, consider using asyncio.run() if you're working with Python 3.7 or newer, which is designed to run an asynchronous coroutine and properly manage the event loop. However, asyncio.run() cannot be called when an event loop is already running, so its use is primarily for entry points into asynchronous programs.

Additionally, the use of nest_asyncio.apply() in the __init__ method of OpensearchVectorStore suggests an attempt to work around the limitations of running nested event loops. While nest_asyncio can be a useful workaround in some scenarios (e.g., in Jupyter notebooks or certain application frameworks that manage their own event loop), it's generally better to structure the code to avoid the need for nested event loops when possible, especially in library or framework code that might be used in a variety of contexts.

You can find more details about this in the source code of the OpensearchVectorStore class.

As for the get_vector_store method in the opensearch_client, its implementation is as follows:

async def get_vector_store(self) -> "OpensearchVectorStore":
    """
    Get the vector store.

    Returns:
        OpensearchVectorStore: The vector store instance.
    """
    if self._vector_store is None:
        self._vector_store = OpensearchVectorStore(client=self)
    return self._vector_store

This method is asynchronous, which means it should be awaited when called within an async function. If you're calling this method from a synchronous context, you'll need to ensure that it's run within an event loop. However, as mentioned above, care should be taken to avoid starting a new event loop if one is already running.

I hope this helps! If you have any further questions or need more clarification, feel free to ask.

Sources

About Dosu

This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

@logan-markewich
Copy link
Collaborator

logan-markewich commented Mar 5, 2024

@mw19930312 try setting the loop type to asyncio

i.e. I would do uvicorn api.main:start --loop asyncio or similar

Then adding nest_asyncio

import nest_asyncio
nest_asyncio.apply()

@mw19930312
Copy link
Author

@logan-markewich Thanks for the reply! Sorry I'm not fully following. Where should I add these lines? I'm putting them into the python file where the async endpoint was called in the gateway. However, the gateway no longer compiles.

Process SpawnProcess-1:
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/python@3.11/3.11.7_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/opt/homebrew/Cellar/python@3.11/3.11.7_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/uvicorn/_subprocess.py", line 78, in subprocess_started
    target(sockets=sockets)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/uvicorn/server.py", line 62, in run
    return asyncio.run(self.serve(sockets=sockets))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.7_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.7_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "uvloop/loop.pyx", line 1517, in uvloop.loop.Loop.run_until_complete
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/uvicorn/server.py", line 69, in serve
    config.load()
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/uvicorn/config.py", line 458, in load
    self.loaded_app = import_from_string(self.app)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/uvicorn/importer.py", line 21, in import_from_string
    module = importlib.import_module(module_str)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.7_1/Frameworks/Python.framework/Versions/3.11/lib/python3.11/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._bootstrap>", line 1204, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1176, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1147, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 690, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 940, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/Users/weimiao/Desktop/platform/gateway/gateway/main.py", line 23, in <module>
    from gateway.api.api import api_router
  File "/Users/weimiao/Desktop/platform/gateway/gateway/api/api.py", line 3, in <module>
    from gateway.api.endpoints import chat, conversation, datasource, docs, health, log, models, prompt, token
  File "/Users/weimiao/Desktop/platform/gateway/gateway/api/endpoints/chat.py", line 13, in <module>
    from gateway.messaging import capture_stream, get_openai_response, handle_chat_stream
  File "/Users/weimiao/Desktop/platform/gateway/gateway/messaging.py", line 52, in <module>
    nest_asyncio.apply()
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/nest_asyncio.py", line 19, in apply
    _patch_loop(loop)
  File "/Users/weimiao/Library/Caches/pypoetry/virtualenvs/gateway-7LwWi3Lq-py3.11/lib/python3.11/site-packages/nest_asyncio.py", line 193, in _patch_loop
    raise ValueError('Can\'t patch loop of type %s' % type(loop))
ValueError: Can't patch loop of type <class 'uvloop.Loop'>

@mw19930312
Copy link
Author

@logan-markewich My endpoint in gateway looks like:

@sentry_sdk.trace
async def handle_chat_stream(xxxx):
    **some dummy code**
    a = get_chat_stream(xxx)
    ** some other code **

where get_chat_stream contains the code that initialize opensearch vectore store that causes the issue. Now, if I add the nest_asyncio import on top of this code block, then gateway codes no longer compiles with errors shown above.

@mw19930312
Copy link
Author

@logan-markewich Can we have more insights on this issue? This is a blocker for us at this time to leverage all new features introduced after v0.10.

@logan-markewich
Copy link
Collaborator

@mw19930312 you are running in fastapi right? You need to set the loop type to asyncio

@logan-markewich
Copy link
Collaborator

If you are using fastapi, how are you launching it?

You can add the loop type on the CLI with
uvicorn api.main:start --loop asyncio

or from code with
unicorn.run(app, loop="asyncio")

@rc19
Copy link
Contributor

rc19 commented Mar 6, 2024

Hey @logan-markewich I see that the async opensearch support was added in llama-index-vector-stores-opensearch = "^0.1.4". Is it possible to use a lower integration version but still be able to use the latest llama-index-core?

I see that in v0.10.16 this issue is expected to be fixed. But for future purposes, how we track the version dependency of integrations vs the core package?

@logan-markewich
Copy link
Collaborator

Each integration package should (in theory) work with any version of core, unless it specifically requires a version of core

In a production app, I would be locking all my versions and only upgrading when I need to, at my own pace

@mw19930312
Copy link
Author

@logan-markewich I added the loop=asyncio into uvicorn.run. But I'm still encountering the same error.

uvicorn.run(
        "gateway.main:app",
        host="0.0.0.0",
        port=8000,
        reload=live_reload,
        workers=settings.UVICORN_WORKER_COUNT,
        log_config=None,
        loop="asyncio"
    )

The error is

> /Users/weimiao/Desktop/platform/gateway/gateway/llama_index/engines.py(134)get_unstructure_data_retrieval_engine()
-> vector_store = opensearch_client.get_vector_store(config.index_name)
(Pdb) n
RuntimeError: This event loop is already running

@mw19930312
Copy link
Author

@logan-markewich Thanks for all the replies! Just an FYI, we got this resolved by setting llama-index-vector-stores-opensearch = "0.1.4" instead of 0.1.15 (which is the default dependency when we use llama-index-core 0.10.16). Not sure what is going on between these two versions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage Issue needs to be triaged/prioritized
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants