-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Framework] Port 7567 bug ocean integration is being terminated due to OOM #528
[Framework] Port 7567 bug ocean integration is being terminated due to OOM #528
Conversation
c79920d
to
40bd6fc
Compare
…ntegration-is-being-terminated-due-to-OOM # Conflicts: # port_ocean/core/handlers/entity_processor/jq_entity_processor.py
…ntegration-is-being-terminated-due-to-OOM
…ntegration-is-being-terminated-due-to-OOM
…ntegration-is-being-terminated-due-to-OOM
…ntegration-is-being-terminated-due-to-OOM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bump version and add changelog
…ntegration-is-being-terminated-due-to-OOM
port_ocean/core/utils.py
Outdated
elif obj != before_dict[key]: | ||
modified.append(obj) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not using is_same_entity
?
using trying to evaluate the full object is much more expensive than just evaluating identifier
and blueprint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
saw that no need for the elif, could be replace simply with else
async def process_in_queue( | ||
objects_to_process: list[Any], | ||
func: Callable[..., Coroutine[Any, Any, T]], | ||
*args: Any, | ||
concurrency: int = 50, | ||
) -> list[T]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function will be available for integrations as well ( as its in the port_ocean/utils
), lets make sure we add documentation to the function and an example, super important to add the why & when to use the function.
port_ocean/utils/queue_utils.py
Outdated
for i in range(concurrency): | ||
await queue.put(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the worker thread will exit the loop and stop working, if we wont do that then raw_params = await queue.get()
will forever wait for more tasks and leave 50 workers threads open
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then add comment as it is not straight forward
…ntegration-is-being-terminated-due-to-OOM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor
port_ocean/utils/queue_utils.py
Outdated
for i in range(concurrency): | ||
await queue.put(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then add comment as it is not straight forward
Description
What - ocean integration is being terminated due to OOM whene dealing with large amount of entities to process
Why - With asyncio.gather, all items are gathered into a single list before being processed, potentially causing memory issues if the list is too large, while asyncio.queue doesn't require holding all items in memory at once.
Type of change
Please leave one option from the following and delete the rest:
Screenshots
Include screenshots from your environment showing how the resources of the integration will look.
API Documentation
Provide links to the API documentation used for this integration.