From d8d4c805d35df25f32da28053f8c4747d6eaba6d Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Tue, 6 Feb 2024 14:31:39 -0500 Subject: [PATCH] chore: use paginator for querysets --- .../commands/dump_data_to_clickhouse.py | 48 +++++++------------ event_sink_clickhouse/sinks/base_sink.py | 25 ++++++++-- .../sinks/course_published.py | 2 +- .../sinks/user_profile_sink.py | 3 ++ 4 files changed, 43 insertions(+), 35 deletions(-) diff --git a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py index 8d6bd3d..08f8424 100644 --- a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py +++ b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py @@ -28,6 +28,7 @@ def dump_target_objects_to_clickhouse( sink=None, + start_pk=None, object_ids=[], objects_to_skip=[], force=False, @@ -50,32 +51,27 @@ def dump_target_objects_to_clickhouse( submitted_objects = [] skipped_objects = [] - index = 0 objects_to_submit = [] - for object_id, should_be_dumped, reason in sink.fetch_target_items( - object_ids, objects_to_skip, force - ): - log.info(f"Iteration {index}: {object_id}") - index += 1 + for object, should_be_dumped, reason in sink.fetch_target_items( + start_pk, object_ids, objects_to_skip, force, batch_size + ): if not should_be_dumped: - skipped_objects.append(object_id) + skipped_objects.append(object.pk) log.info( - f"{sink.model} {index}: Skipping object {object_id}, reason: '{reason}'" + f"{sink.model}: Skipping object {object.pk}, reason: '{reason}'" ) else: - log.info( - f"{sink.model} {index}: Submitting {object_id} for dump to ClickHouse, reason '{reason}'." - ) - objects_to_submit.append(object_id) + objects_to_submit.append(object) if len(objects_to_submit) % batch_size == 0: sink.dump(objects_to_submit, many=True) submitted_objects.extend(objects_to_submit) objects_to_submit = [] + log.info(f"Last IDs: {object.pk}") time.sleep(sleep_time) - submitted_objects.append(str(object_id)) + submitted_objects.append(str(object.pk)) if limit and len(submitted_objects) == limit: log.info( @@ -86,7 +82,6 @@ def dump_target_objects_to_clickhouse( if objects_to_submit: sink.dump(objects_to_submit, many=True) - return submitted_objects, skipped_objects class Command(BaseCommand): @@ -127,6 +122,12 @@ def add_arguments(self, parser): type=str, help="the type of object to dump", ) + parser.add_argument( + "--start_pk", + type=int, + help="the primary key to start at", + default=None, + ) parser.add_argument( "--ids", metavar="KEY", @@ -198,26 +199,13 @@ def handle(self, *args, **options): Sink = get_sink_by_model(options["object"]) sink = Sink(connection_overrides, log) - submitted_objects, skipped_objects = dump_target_objects_to_clickhouse( + dump_target_objects_to_clickhouse( sink, + options["start_pk"], [object_id.strip() for object_id in ids], [object_id.strip() for object_id in ids_to_skip], options["force"], options["limit"], options["batch_size"], + options["sleep_time"], ) - - log.info( - "%d objects submitted for export to ClickHouse. %d objects skipped.", - len(submitted_objects), - len(skipped_objects), - ) - - if not submitted_objects: - log.info("No objects submitted for export to ClickHouse at all!") - else: - log.info( # pylint: disable=logging-not-lazy - "These objects were submitted for dump to ClickHouse successfully:\n\t" - + "\n\t".join(submitted_objects) - ) - diff --git a/event_sink_clickhouse/sinks/base_sink.py b/event_sink_clickhouse/sinks/base_sink.py index 4a68745..eaabde1 100644 --- a/event_sink_clickhouse/sinks/base_sink.py +++ b/event_sink_clickhouse/sinks/base_sink.py @@ -8,6 +8,7 @@ import requests from django.conf import settings +from django.core.paginator import Paginator from edx_toggles.toggles import WaffleFlag from event_sink_clickhouse.utils import get_model @@ -151,11 +152,14 @@ def get_model(self): """ return get_model(self.model) - def get_queryset(self): + def get_queryset(self, start_pk=None): """ Return the queryset to be used for the insert """ - return self.get_model().objects.all() + if start_pk: + return self.get_model().objects.filter(pk__gt=start_pk).order_by("pk") + else: + return self.get_model().objects.all().order_by("pk") def dump(self, item_id, many=False, initial=None): """ @@ -272,19 +276,32 @@ def send_item(self, serialized_item, many=False): self._send_clickhouse_request(request) - def fetch_target_items(self, ids=None, skip_ids=None, force_dump=False): + def fetch_target_items(self, start_pk=None, ids=None, skip_ids=None, force_dump=False, batch_size=None): """ Fetch the items that should be dumped to ClickHouse """ if ids: item_keys = [self.convert_id(item_id) for item_id in ids] else: - item_keys = [item.id for item in self.get_queryset()] + print("load queryset") + item_keys = self.get_queryset(start_pk) skip_ids = ( [str(item_id) for item_id in skip_ids] if skip_ids else [] ) + if batch_size: + paginator = Paginator(item_keys, batch_size) + page = paginator.page(1) + while page.has_next(): + page = paginator.page(page.next_page_number()) + yield from self.iter_target_items(page.object_list, skip_ids, force_dump) + else: + yield from self.iter_target_items(item_keys, skip_ids, force_dump) + def iter_target_items(self, item_keys, skip_ids, force_dump): + """ + Iterate through the items that should be dumped to ClickHouse + """ for item_key in item_keys: if str(item_key) in skip_ids: yield item_key, False, f"{self.name} is explicitly skipped" diff --git a/event_sink_clickhouse/sinks/course_published.py b/event_sink_clickhouse/sinks/course_published.py index 6fd0569..c69fef0 100644 --- a/event_sink_clickhouse/sinks/course_published.py +++ b/event_sink_clickhouse/sinks/course_published.py @@ -220,6 +220,6 @@ def get_course_last_published(self, course_key): def convert_id(self, item_id): return CourseKey.from_string(item_id) - def get_queryset(self): + def get_queryset(self, start_pk=None): modulestore = get_modulestore() return modulestore.get_course_summaries() diff --git a/event_sink_clickhouse/sinks/user_profile_sink.py b/event_sink_clickhouse/sinks/user_profile_sink.py index ff652f6..e8bf8f2 100644 --- a/event_sink_clickhouse/sinks/user_profile_sink.py +++ b/event_sink_clickhouse/sinks/user_profile_sink.py @@ -14,3 +14,6 @@ class UserProfileSink(ModelBaseSink): # pylint: disable=abstract-method timestamp_field = "time_last_dumped" name = "User Profile" serializer_class = UserProfileSerializer + + def get_queryset(self, start_pk=None): + return super().get_queryset(start_pk).select_related("user")