Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
fix: fix initial page, tests, and quality
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian2012 committed Feb 7, 2024
1 parent b61cf4d commit 51abf4c
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 294 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,15 @@
from django.core.management.base import BaseCommand, CommandError

from event_sink_clickhouse.sinks.base_sink import ModelBaseSink
from event_sink_clickhouse.utils import get_sink_by_model

log = logging.getLogger(__name__)


def dump_target_objects_to_clickhouse(
sink=None,
start_pk=None,
object_ids=[],
objects_to_skip=[],
object_ids=None,
objects_to_skip=None,
force=False,
limit=None,
batch_size=1000,
Expand All @@ -54,20 +53,19 @@ def dump_target_objects_to_clickhouse(

objects_to_submit = []

for object, should_be_dumped, reason in sink.fetch_target_items(
for obj, should_be_dumped, reason in sink.fetch_target_items(
start_pk, object_ids, objects_to_skip, force, batch_size
):
if not should_be_dumped:
skipped_objects.append(object.pk)
log.info(f"{sink.model}: Skipping object {object.pk}, reason: '{reason}'")
skipped_objects.append(obj.pk)
log.info(f"{sink.model}: Skipping object {obj.pk}, reason: '{reason}'")
else:
objects_to_submit.append(object)

objects_to_submit.append(obj)
count += 1
if len(objects_to_submit) % batch_size == 0:
sink.dump(objects_to_submit, many=True)
count += len(objects_to_submit)
objects_to_submit = []
log.info(f"Last IDs: {object.pk}")
log.info(f"Last IDs: {obj.pk}")
time.sleep(sleep_time)

if limit and count == limit:
Expand All @@ -78,6 +76,7 @@ def dump_target_objects_to_clickhouse(

if objects_to_submit:
sink.dump(objects_to_submit, many=True)
count += len(objects_to_submit)

log.info(f"Dumped {count} objects to ClickHouse")

Expand Down Expand Up @@ -194,7 +193,7 @@ def handle(self, *args, **options):
log.error(message)
raise CommandError(message)

Sink = get_sink_by_model(options["object"])
Sink = ModelBaseSink.get_sink_by_model(options["object"])
sink = Sink(connection_overrides, log)
dump_target_objects_to_clickhouse(
sink,
Expand Down
16 changes: 13 additions & 3 deletions event_sink_clickhouse/sinks/base_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,8 @@ def fetch_target_items(self, start_pk=None, ids=None, skip_ids=None, force_dump=
)
if batch_size:
paginator = Paginator(item_keys, batch_size)
page = paginator.page(1)
while page.has_next():
page = paginator.page(page.next_page_number())
for i in range(1, paginator.num_pages):
page = paginator.page(i)
yield from self.iter_target_items(page.object_list, skip_ids, force_dump)
else:
yield from self.iter_target_items(item_keys, skip_ids, force_dump)
Expand Down Expand Up @@ -367,3 +366,14 @@ def is_enabled(cls):
)

return enabled or waffle_flag.is_enabled()

@classmethod
def get_sink_by_model(cls, model):
"""
Return the sink instance for the given model
"""
for sink in cls.__subclasses__():
if sink.model == model:
return sink

return None
1 change: 0 additions & 1 deletion event_sink_clickhouse/sinks/course_published.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,3 @@ def get_course_last_published(self, course_key):

def convert_id(self, item_id):
return CourseKey.from_string(item_id)

11 changes: 0 additions & 11 deletions event_sink_clickhouse/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,3 @@ def get_detached_xblock_types(): # pragma: no cover
from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES

return DETACHED_XBLOCK_TYPES


def get_sink_by_model(model):
"""Get a sink by model."""
from event_sink_clickhouse.sinks.base_sink import ModelBaseSink

for sink in ModelBaseSink.__subclasses__():
if sink.model == model:
return sink

return None
236 changes: 0 additions & 236 deletions tests/commands/test_dump_courses_command.py

This file was deleted.

Loading

0 comments on commit 51abf4c

Please sign in to comment.