Skip to content

Replace ProcessPoolExecutor with asyncio page splitting #92

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
May 30, 2024

Conversation

micmarty-deepsense
Copy link
Contributor

@micmarty-deepsense micmarty-deepsense commented May 23, 2024

  • update the documentation
  • use aiohttp/httpx instead of synchronous requests
  • rate limiting or any other way of controlling concurrency
  • remove all references to threads/processes

How to verify that this PR works

Unit & Integration Tests

make install && make test

Manually

make install
pip install --editable .
python -m timeit --repeat 10 --verbose "$(cat test-client.py)"

Where test-client.py has the following contents:

import os
import sys

import unstructured_client
from unstructured_client import UnstructuredClient

print(unstructured_client.__file__)
from unstructured_client.models import shared
from unstructured_client.models.errors import SDKError

s = UnstructuredClient(api_key_auth=os.environ["UNS_API_KEY"], server_url="http://localhost:8000")

filename = "_sample_docs/layout-parser-paper.pdf"

with open(filename, "rb") as f:
    files = shared.Files(
        content=f.read(),
        file_name=filename,
    )

req = shared.PartitionParameters(
    files=files,
    strategy="fast",
    languages=["eng"],
    split_pdf_page=True,
    split_pdf_concurrency_level=1,
)
resp = s.general.partition(req)
ids = [e.element_id for e in resp.elements]
print(ids)

Comment on lines 260 to 261
loop = asyncio.get_event_loop()
responses = loop.run_until_complete(asyncio.gather(*prt_requests))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand how this hook flow works but I feel like you should start the event loop when you define the tasks and here just wait for the tasks to complete.

Copy link
Contributor Author

@micmarty-deepsense micmarty-deepsense May 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was considering that, but as we discussed that last week - in such scenario before_request would also need to be async method which can't be done. Speakeasy requires hooks to be regular methods.

@micmarty-deepsense micmarty-deepsense force-pushed the mike/async-page-splitting branch from 250c28f to c85a2d3 Compare May 23, 2024 23:21
@micmarty-deepsense micmarty-deepsense force-pushed the mike/async-page-splitting branch from 668c9c7 to 976b238 Compare May 24, 2024 10:50
@micmarty-deepsense micmarty-deepsense changed the title [WIP] Replace ProcessPoolExecutor with asyncio page splitting Replace ProcessPoolExecutor with asyncio page splitting May 24, 2024

@pytest.mark.parametrize("split_pdf", [True, False])
@pytest.mark.parametrize("error_code", [500, 403])
def test_partition_handling_server_error(error_code, split_pdf, monkeypatch, doc_path):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/Unstructured-IO/unstructured-python-client/actions/runs/9225821357/job/25384186276?pr=92

I managed to pass all tests except for this one. @badGarnet , if you're able to identify the source of the problem with a quick glance, let me know. I haven't managed to figure it out myself on time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now this test is passing, I just had to mock httpx response 😅

gen.yaml Outdated
Comment on lines 19 to 20
httpx: ">=0.27.0"
aiolimiter: ">=1.1.0"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two new dependencies: httpx is BSD-3-licensed, aiolimiter has MIT license

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no longer being used, resolving

@@ -135,43 +137,93 @@ def before_request(
fallback_value=DEFAULT_CONCURRENCY_LEVEL,
max_allowed=MAX_CONCURRENCY_LEVEL,
)
limiter = AsyncLimiter(max_rate=concurrency_level, time_period=1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so by default it's 15 requests per second

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to set a hard limit to it? If it's per second then we could get over 15 pending requests with this approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +161 to +164
response = requests.Response()
response.status_code = status_code
response._content = json.dumps(json_response).encode() # pylint: disable=W0212
response.headers["Content-Type"] = "application/json"
return response
Copy link
Contributor Author

@micmarty-deepsense micmarty-deepsense May 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish speakeasy supported async client generation...
They rely on synchronous requests library, forcing me to do the conversion here.

@micmarty-deepsense micmarty-deepsense force-pushed the mike/async-page-splitting branch from 74613b8 to 473560e Compare May 28, 2024 14:11
Copy link
Contributor

@mpolomdeepsense mpolomdeepsense left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 👍

page_number = page_index + starting_page_number
# Check if this page is the last one
print(f"Page {page_number} of {all_pages_number}")
if page_index == all_pages_number - 1:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ds-filipknefel you mentioned that there's a bug on the main branch. I'll incorporate your fix on Friday.

Copy link
Collaborator

@badGarnet badGarnet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blocking for now as scale testing reveals some potential issues when using this code:

  • locust base setup can't run the client with page splitting -> raises error that There is no current event loop in thread 'Dummy-2'. Investigating this more at the moment.

Copy link
Collaborator

@badGarnet badGarnet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue with locust was due to nesting of async event loops; not a problem for user experience

Update readme to highlight how to use the new page splitting logic
safely.
@badGarnet badGarnet merged commit cb94407 into main May 30, 2024
@badGarnet badGarnet deleted the mike/async-page-splitting branch May 30, 2024 22:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants