New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP - Improve memory performance on syncing #432
Conversation
https://pulp.plan.io/issues/5688 Required PR: pulp/pulpcore#432 closes #5688
| if self._using_context_manager and self._last_save_time: | ||
| if now - self._last_save_time >= datetime.timedelta(milliseconds=BATCH_INTERVAL): | ||
| if saving_periodically and self._last_save_time: | ||
| if now - self._last_save_time >= datetime.timedelta(milliseconds=batch_interval): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In some loopings, it was creating a lot of queries for updating the done count.
slight improvement in memory consumption
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the issue that the default BATCH_INTERVAL is too slow? If so can we increase that because that is the situation the throttling of progress reports was for. Maybe set BATCH_INTERVAL to 2000?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can increase it, but the problem is that currently, the BATCH_INTERVAL is only taken account when ProgressReport is a context manager, and for RPM almost all ProgressReport are not context manager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The context manager is what I recommend when using progress reports. Can the usage of Progress Reports adopt them?
| for k, v in all_kwargs.items(): | ||
| key = k | ||
| value = v[0] | ||
|
|
||
| for artifact in Artifact.objects.filter(all_artifacts_q): | ||
| if len(v) > 1: | ||
| key = f"{k}__in" | ||
| value = v |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @dralley suggested on the email thread, sometimes OR is not efficient for queries, so I gathered the data to make an IN clause
slight improvement in memory consumption
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm concerned about these changes in this module and the code complexity. This code is already pretty complicated, and having it be less readable will be more difficult in the future. Do we know much this is improving the memory/runtime?
| for k, v in all_kwargs.items(): | ||
| for key, value in v.items(): | ||
| _key = key | ||
| _value = value[0] | ||
|
|
||
| if len(value) > 1: | ||
| _key = f"{key}__in" | ||
| _value = value | ||
|
|
||
| content_q_by_type[k] |= Q(**{_key: _value}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @dralley suggested on the email thread, sometimes OR is not efficient for queries, so I gathered the data to make an IN clause
slight improvement in memory consumption
|
|
||
| for model_type in content_q_by_type.keys(): | ||
| for result in model_type.objects.filter(content_q_by_type[model_type]): | ||
| for result in model_type.objects.filter(content_q_by_type[model_type]).iterator(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoiding to cache queries
|
|
||
| all_artifacts_q |= Q(**{key: value}) | ||
|
|
||
| for artifact in Artifact.objects.filter(all_artifacts_q).iterator(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoiding to cache queries
| @@ -130,7 +148,7 @@ def _add_to_pending(coro): | |||
| content_get_task = None | |||
| else: | |||
| pb.done += task.result() # download_count | |||
| pb.save() | |||
| pb.save(batch_interval=2000) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was saving around 50 times in a row
https://pulp.plan.io/issues/5688 Required PR: pulp/pulpcore#432 closes #5688
CHANGES/5688.bugfix
Outdated
| @@ -0,0 +1 @@ | |||
| Improve memory performance. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it also call out the speed improvements.
| @@ -177,7 +179,7 @@ def __exit__(self, type, value, traceback): | |||
| self.state = TASK_STATES.FAILED | |||
| self.save() | |||
|
|
|||
| def increment(self): | |||
| def increment(self, *args, **kwargs): | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't expect we want to pass through *args and **kwargs.
| @@ -188,7 +190,7 @@ def increment(self): | |||
| if self.total: | |||
| if self.done > self.total: | |||
| _logger.warning(_('Too many items processed for ProgressReport %s') % self.message) | |||
| self.save() | |||
| self.save(*args, **kwargs) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't expect these arg/kwargs.
| @@ -13,6 +12,23 @@ | |||
| __all__ = ["remove_duplicates"] | |||
|
|
|||
|
|
|||
| def batch_qs(qs, batch_size=1000): | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep this internal by renaming batch_qs -> _batch_qs? Once plugins need it we can add it to the plugin API making it public.
| @@ -81,7 +81,7 @@ class MyStage(Stage): | |||
| log.debug(_('%(name)s - next: %(content)s.'), {'name': self, 'content': content}) | |||
| yield content | |||
|
|
|||
| async def batches(self, minsize=50): | |||
| async def batches(self, minsize=500): | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| @@ -163,7 +163,7 @@ def __str__(self): | |||
| return '[{id}] {name}'.format(id=id(self), name=self.__class__.__name__) | |||
|
|
|||
|
|
|||
| async def create_pipeline(stages, maxsize=100): | |||
| async def create_pipeline(stages, maxsize=1000): | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
| for item in repository_version.added(): | ||
| detail_item = item.cast() | ||
| if detail_item.repo_key_fields == (): | ||
| repository_type = repository_version.repository.pulp_type.split(".")[-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was actually working on this in parallel since there was an issue filed for it. Here's my implementation https://github.com/pulp/pulpcore/compare/master...dralley:performance?diff=split&expand=1#diff-fb9784727f42efcd0b0bcae38b9daa30R34-R59
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dralley your solution is much more elegant, I think, from my solution we can take the part when I test if queryset is not empty before calling remove_content, and maybe take the batch queryset also.
Currently remove_content does not test if queryset is empty, and due that it consumes a lot of time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we bring the implementation from @dralley into this PR, or remove this PR's changes and let the other PR go forward in parallel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove this change form this PR, this PR is totally WIP, I tested some tweaks if they reduce memory consumption, I push. When I am able to sync everything without comment any part of the code, I will start to adjust the code. Probably many things will be removed from this PR.
|
Closing in favor of #440 |
| @@ -71,3 +71,20 @@ def get_view_name_for_model(model_obj, view_action): | |||
| if registered_viewset is viewset: | |||
| return '-'.join((base_name, view_action)) | |||
| raise LookupError('view not found') | |||
|
|
|||
|
|
|||
| def batch_qs(qs, batch_size=1000): | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://pulp.plan.io/issues/5688
ref #5688
Please be sure you have read our documentation on creating PRs:
https://docs.pulpproject.org/en/3.0/nightly/contributing/pull-request-walkthrough.html