diff --git a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py index b096944..9a3b83c 100644 --- a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py +++ b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py @@ -22,7 +22,6 @@ from django.core.management.base import BaseCommand, CommandError from event_sink_clickhouse.sinks.base_sink import ModelBaseSink -from event_sink_clickhouse.utils import get_sink_by_model log = logging.getLogger(__name__) @@ -30,8 +29,8 @@ def dump_target_objects_to_clickhouse( sink=None, start_pk=None, - object_ids=[], - objects_to_skip=[], + object_ids=None, + objects_to_skip=None, force=False, limit=None, batch_size=1000, @@ -54,20 +53,19 @@ def dump_target_objects_to_clickhouse( objects_to_submit = [] - for object, should_be_dumped, reason in sink.fetch_target_items( + for obj, should_be_dumped, reason in sink.fetch_target_items( start_pk, object_ids, objects_to_skip, force, batch_size ): if not should_be_dumped: - skipped_objects.append(object.pk) - log.info(f"{sink.model}: Skipping object {object.pk}, reason: '{reason}'") + skipped_objects.append(obj.pk) + log.info(f"{sink.model}: Skipping object {obj.pk}, reason: '{reason}'") else: - objects_to_submit.append(object) - + objects_to_submit.append(obj) + count += 1 if len(objects_to_submit) % batch_size == 0: sink.dump(objects_to_submit, many=True) - count += len(objects_to_submit) objects_to_submit = [] - log.info(f"Last IDs: {object.pk}") + log.info(f"Last IDs: {obj.pk}") time.sleep(sleep_time) if limit and count == limit: @@ -78,6 +76,7 @@ def dump_target_objects_to_clickhouse( if objects_to_submit: sink.dump(objects_to_submit, many=True) + count += len(objects_to_submit) log.info(f"Dumped {count} objects to ClickHouse") @@ -194,7 +193,7 @@ def handle(self, *args, **options): log.error(message) raise CommandError(message) - Sink = get_sink_by_model(options["object"]) + Sink = ModelBaseSink.get_sink_by_model(options["object"]) sink = Sink(connection_overrides, log) dump_target_objects_to_clickhouse( sink, diff --git a/event_sink_clickhouse/sinks/base_sink.py b/event_sink_clickhouse/sinks/base_sink.py index 2d09c4a..2fbdb06 100644 --- a/event_sink_clickhouse/sinks/base_sink.py +++ b/event_sink_clickhouse/sinks/base_sink.py @@ -290,9 +290,8 @@ def fetch_target_items(self, start_pk=None, ids=None, skip_ids=None, force_dump= ) if batch_size: paginator = Paginator(item_keys, batch_size) - page = paginator.page(1) - while page.has_next(): - page = paginator.page(page.next_page_number()) + for i in range(1, paginator.num_pages): + page = paginator.page(i) yield from self.iter_target_items(page.object_list, skip_ids, force_dump) else: yield from self.iter_target_items(item_keys, skip_ids, force_dump) @@ -367,3 +366,14 @@ def is_enabled(cls): ) return enabled or waffle_flag.is_enabled() + + @classmethod + def get_sink_by_model(cls, model): + """ + Return the sink instance for the given model + """ + for sink in cls.__subclasses__(): + if sink.model == model: + return sink + + return None diff --git a/event_sink_clickhouse/sinks/course_published.py b/event_sink_clickhouse/sinks/course_published.py index db15054..d99e003 100644 --- a/event_sink_clickhouse/sinks/course_published.py +++ b/event_sink_clickhouse/sinks/course_published.py @@ -219,4 +219,3 @@ def get_course_last_published(self, course_key): def convert_id(self, item_id): return CourseKey.from_string(item_id) - diff --git a/event_sink_clickhouse/utils.py b/event_sink_clickhouse/utils.py index 6bc10a0..28745c3 100644 --- a/event_sink_clickhouse/utils.py +++ b/event_sink_clickhouse/utils.py @@ -57,14 +57,3 @@ def get_detached_xblock_types(): # pragma: no cover from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES return DETACHED_XBLOCK_TYPES - - -def get_sink_by_model(model): - """Get a sink by model.""" - from event_sink_clickhouse.sinks.base_sink import ModelBaseSink - - for sink in ModelBaseSink.__subclasses__(): - if sink.model == model: - return sink - - return None diff --git a/tests/commands/test_dump_courses_command.py b/tests/commands/test_dump_courses_command.py deleted file mode 100644 index 56946ba..0000000 --- a/tests/commands/test_dump_courses_command.py +++ /dev/null @@ -1,236 +0,0 @@ -""" -Tests for the dump_courses_to_clickhouse management command. -""" -from collections import namedtuple -from datetime import datetime, timedelta -from unittest.mock import patch - -import django.core.management.base -import pytest -from django.core.management import call_command - -from test_utils.helpers import FakeCourse, course_str_factory, fake_course_overview_factory - - -@pytest.fixture -def mock_common_calls(): - """ - Mock out calls that we test elsewhere and aren't relevant to the command tests. - """ - command_path = "event_sink_clickhouse.management.commands.dump_courses_to_clickhouse" - with patch(command_path+".dump_course_to_clickhouse") as mock_dump_course: - with patch(command_path+".CourseOverviewSink.get_model") as mock_get_course_overview_model: - with patch("event_sink_clickhouse.sinks.course_published.get_modulestore") as mock_modulestore: - with patch(command_path+".CourseOverviewSink.get_last_dumped_timestamp") as mock_last_dump_time: - # Set a reasonable default last dump time a year in the past - mock_last_dump_time.return_value = \ - (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d %H:%M:%S.%f+00:00") - yield mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time - - -def dump_command_clickhouse_options(): - """ - Pytest params for all the different ClickHouse options. - - Just making sure every option gets passed through correctly. - """ - options = [ - {}, - {'url': "https://foo/"}, - {'username': "Foo"}, - {'password': "F00"}, - {'database': "foo"}, - {'timeout_secs': 60}, - {'url': "https://foo/", 'username': "Foo", 'password': "F00", 'database': "foo", 'timeout_secs': 60}, - ] - - for option in options: - yield option - - -@pytest.mark.parametrize("option_combination", dump_command_clickhouse_options()) -def test_dump_courses_to_clickhouse_db_options( - option_combination, - mock_common_calls, - caplog -): - mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls - - course_id = course_str_factory() - - fake_modulestore_courses = [FakeCourse(course_id)] - mock_modulestore.return_value.get_course_summaries.return_value = fake_modulestore_courses - fake_overview = fake_course_overview_factory(modified=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00")) - mock_get_course_overview_model.return_value.get_from_id.return_value = fake_overview - - call_command( - 'dump_courses_to_clickhouse', - **option_combination - ) - - # Make sure that our mocks were called as expected - assert mock_modulestore.call_count == 1 - assert mock_dump_course.apply_async.call_count == 1 - mock_dump_course.apply_async.assert_called_once_with(kwargs=dict( - course_key_string=course_id, - connection_overrides=option_combination - )) - assert "Course has been published since last dump time" in caplog.text - assert "These courses were submitted for dump to ClickHouse successfully" in caplog.text - - -CommandOptions = namedtuple("TestCommandOptions", ["options", "expected_num_submitted", "expected_logs"]) - - -def dump_command_basic_options(): - """ - Pytest params for all the different non-ClickHouse command options. - """ - options = [ - CommandOptions( - options={"courses_to_skip": [course_str_factory()]}, - expected_num_submitted=0, - expected_logs=[ - "0 courses submitted for export to ClickHouse. 1 courses skipped.", - "Course Overview is explicitly skipped" - ] - ), - CommandOptions( - options={"limit": 1}, - expected_num_submitted=1, - expected_logs=["Limit of 1 eligible course has been reached, quitting!"] - ), - CommandOptions( - options={"courses": [course_str_factory()]}, - expected_num_submitted=1, - expected_logs=[ - "Course has been published since last dump time", - "These courses were submitted for dump to ClickHouse successfully" - ] - ), - CommandOptions( - options={"force": True}, - expected_num_submitted=1, - expected_logs=["Force is set"] - ), - ] - - for option in options: - yield option - - -@pytest.mark.parametrize("test_command_option", dump_command_basic_options()) -def test_dump_courses_options( - test_command_option, - mock_common_calls, - caplog -): - mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls - - option_combination, expected_num_submitted, expected_outputs = test_command_option - course_id = course_str_factory() - - fake_modulestore_courses = [FakeCourse(course_id), ] - mock_modulestore.return_value.get_course_summaries.return_value = fake_modulestore_courses - mock_get_course_overview_model.return_value.get_from_id.return_value = fake_course_overview_factory( - modified=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00") - ) - - call_command( - 'dump_courses_to_clickhouse', - **option_combination - ) - - # Make sure that our mocks were called as expected - if "courses" not in option_combination: - # Modulestore will only be called here if we're not passing in a list of courses - assert mock_modulestore.call_count == 1 - assert mock_dump_course.apply_async.call_count == expected_num_submitted - for expected_output in expected_outputs: - assert expected_output in caplog.text - - -def dump_command_invalid_options(): - """ - Pytest params for all the different non-ClickHouse command options. - """ - options = [ - CommandOptions( - options={"force": True, "limit": 100}, - expected_num_submitted=0, - expected_logs=[ - "The 'limit' option cannot be used with 'force'", - ] - ), - CommandOptions( - options={"limit": -1}, - expected_num_submitted=0, - expected_logs=["'limit' must be greater than 0!"] - ), - ] - - for option in options: - yield option - - -@pytest.mark.parametrize("test_command_option", dump_command_invalid_options()) -def test_invalid_dump_command_options( - test_command_option, - mock_common_calls, - caplog -): - mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls - option_combination, expected_num_submitted, expected_outputs = test_command_option - - with pytest.raises(django.core.management.base.CommandError): - call_command( - 'dump_courses_to_clickhouse', - **option_combination - ) - - # Just to make sure we're not calling things more than we need to - assert mock_modulestore.call_count == 0 - assert mock_dump_course.apply_async.call_count == 0 - for expected_output in expected_outputs: - assert expected_output in caplog.text - - -def test_multiple_courses_different_times( - mock_common_calls, - caplog -): - mock_dump_course, mock_get_course_overview_model, mock_modulestore, mock_last_dump_time = mock_common_calls - - test_timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f+00:00") - - course_id_1 = course_str_factory("course_1") - course_id_2 = course_str_factory("course_2") - course_id_3 = course_str_factory("course_3") - - fake_modulestore_courses = [FakeCourse(course_id_1), FakeCourse(course_id_2), FakeCourse(course_id_3)] - mock_modulestore.return_value.get_course_summaries.return_value = fake_modulestore_courses - - fake_overview = fake_course_overview_factory(modified=test_timestamp) - mock_get_course_overview_model.return_value.get_from_id.return_value = fake_overview - - # Each time last_dump_time is called it will get a different date so we can test - # them all together - mock_last_dump_time.side_effect = [ - # One year ago - (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d %H:%M:%S.%f+00:00"), - # A magic date that matches the last published date of our test course - test_timestamp, - # Course not dumped to ClickHouse yet - None, - ] - - call_command( - 'dump_courses_to_clickhouse' - ) - - assert mock_modulestore.call_count == 1 - assert mock_last_dump_time.call_count == 3 - assert "Course has been published since last dump time" in caplog.text - assert "Course has NOT been published since last dump time" in caplog.text - assert "Course is not present in ClickHouse" in caplog.text - assert "2 courses submitted for export to ClickHouse. 1 courses skipped." in caplog.text diff --git a/tests/commands/test_dump_data_to_clickhouse.py b/tests/commands/test_dump_data_to_clickhouse.py index db124a6..42715da 100644 --- a/tests/commands/test_dump_data_to_clickhouse.py +++ b/tests/commands/test_dump_data_to_clickhouse.py @@ -31,6 +31,10 @@ def __init__(self, id): self.id = id self.created = datetime.now() + @property + def pk(self): + return self.id + return DummyModel @@ -44,11 +48,15 @@ class DummySerializer: Dummy serializer for testing. """ - def __init__(self, model): + def __init__(self, model, many=False, initial=None): self.model = model + self.many = many + self.initial = initial @property def data(self): + if self.many: + return [{"id": item, "created": datetime.now()} for item in self.model] return {"id": self.model.id, "created": self.model.created} return DummySerializer @@ -65,18 +73,19 @@ class DummySink(ModelBaseSink): serializer_class = dummy_serializer_factory() timestamp_field = "created" clickhouse_table_name = "dummy_table" + factory = dummy_model_factory() - def get_queryset(self): - return [dummy_model_factory()(id) for id in range(1, 5)] - - def convert_id(self, item_id): - return int(item_id) + def get_queryset(self, start_pk=None): + return [self.factory(id) for id in range(1, 5)] def should_dump_item(self, unique_key): - if unique_key % 2 == 0: - return True, "Even number" - else: - return False, "Odd number" + return True, "No reason" + + def send_item_and_log(self, item_id, serialized_item, many): + pass + + def get_object(self, item_id): + return self.factory(item_id) def dump_command_basic_options(): @@ -85,28 +94,24 @@ def dump_command_basic_options(): """ options = [ CommandOptions( - options={"object": "dummy", "ids_to_skip": ["1", "2", "3", "4"]}, - expected_num_submitted=0, - expected_logs=[ - "submitted for export to ClickHouse", - ], + options={"object": "dummy", "batch_size": 1, "sleep_time": 0}, + expected_num_submitted=4, + expected_logs=["Dumped 3 objects to ClickHouse",], ), CommandOptions( - options={"object": "dummy", "limit": 1}, + options={"object": "dummy", "limit": 1, "batch_size": 1, "sleep_time": 0}, expected_num_submitted=1, expected_logs=["Limit of 1 eligible objects has been reached, quitting!"], ), CommandOptions( - options={"object": "dummy", "ids": ["1", "2", "3", "4"]}, + options={"object": "dummy", "batch_size": 2, "sleep_time": 0}, expected_num_submitted=2, - expected_logs=[ - "These objects were submitted for dump to ClickHouse successfully", - ], + expected_logs=["Now dumping 2 Dummy to ClickHouse",], ), CommandOptions( - options={"object": "dummy", "force": True}, - expected_num_submitted=4, - expected_logs=["Force is set"], + options={"object": "dummy", "batch_size": 1, "sleep_time": 0}, + expected_num_submitted=3, + expected_logs=["Now dumping 1 Dummy to ClickHouse", "Dumped 3 objects to ClickHouse"], ), ] @@ -115,17 +120,13 @@ def dump_command_basic_options(): @pytest.mark.parametrize("test_command_option", dump_command_basic_options()) -@patch( - "event_sink_clickhouse.management.commands.dump_data_to_clickhouse.dump_data_to_clickhouse" -) -def test_dump_courses_options(mock_dump_data, test_command_option, caplog): +def test_dump_courses_options(test_command_option, caplog): option_combination, expected_num_submitted, expected_outputs = test_command_option assert DummySink.model in [cls.model for cls in ModelBaseSink.__subclasses__()] call_command("dump_data_to_clickhouse", **option_combination) - assert mock_dump_data.apply_async.call_count == expected_num_submitted for expected_output in expected_outputs: assert expected_output in caplog.text @@ -162,10 +163,7 @@ def dump_basic_invalid_options(): @pytest.mark.parametrize("test_command_option", dump_basic_invalid_options()) -@patch( - "event_sink_clickhouse.management.commands.dump_data_to_clickhouse.dump_data_to_clickhouse" -) -def test_dump_courses_options_invalid(mock_dump_data, test_command_option, caplog): +def test_dump_courses_options_invalid(test_command_option, caplog): option_combination, expected_num_submitted, expected_outputs = test_command_option assert DummySink.model in [cls.model for cls in ModelBaseSink.__subclasses__()]