diff --git a/event_sink_clickhouse/settings/common.py b/event_sink_clickhouse/settings/common.py index af40f90..1b6d117 100644 --- a/event_sink_clickhouse/settings/common.py +++ b/event_sink_clickhouse/settings/common.py @@ -10,9 +10,10 @@ def plugin_settings(settings): settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG = { # URL to a running ClickHouse server's HTTP interface. ex: https://foo.openedx.org:8443/ or # http://foo.openedx.org:8123/ . Note that we only support the ClickHouse HTTP interface - # to avoid pulling in more dependencies to the platrform than necessary. + # to avoid pulling in more dependencies to the platform than necessary. "url": "http://clickhouse:8123", "username": "changeme", "password": "changeme", - "clickhouse_database": "event_sink", + "database": "event_sink", + "timeout_secs": 3, } diff --git a/event_sink_clickhouse/sinks/base_sink.py b/event_sink_clickhouse/sinks/base_sink.py index eb23a14..89ca440 100644 --- a/event_sink_clickhouse/sinks/base_sink.py +++ b/event_sink_clickhouse/sinks/base_sink.py @@ -9,15 +9,18 @@ class BaseSink: """ Base class for ClickHouse event sink, allows overwriting of default settings """ - connection_overrides = None - log = None - def __init__(self, connection_overrides, log): - + self.log = log self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"] self.ch_auth = (settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"], settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"]) - self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["clickhouse_database"] - self.ch_timeout_secs = 3 - self.connection_overrides = connection_overrides - self.log = log + self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["database"] + self.ch_timeout_secs = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["timeout_secs"] + + # If any overrides to the ClickHouse connection + if connection_overrides: + self.ch_url = connection_overrides.get("url", self.ch_url) + self.ch_auth = (connection_overrides.get("username", self.ch_auth[0]), + connection_overrides.get("password", self.ch_auth[1])) + self.ch_database = connection_overrides.get("database", self.ch_database) + self.ch_timeout_secs = connection_overrides.get("timeout_secs", self.ch_timeout_secs) diff --git a/event_sink_clickhouse/sinks/course_published.py b/event_sink_clickhouse/sinks/course_published.py index faa2067..a62ee47 100644 --- a/event_sink_clickhouse/sinks/course_published.py +++ b/event_sink_clickhouse/sinks/course_published.py @@ -23,6 +23,25 @@ class CoursePublishedSink(BaseSink): """ Event sink for the COURSE_PUBLISHED signal """ + @staticmethod + def _get_detached_xblock_types(): + # pylint: disable=import-outside-toplevel,import-error + from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES + return DETACHED_XBLOCK_TYPES + + @staticmethod + def _get_modulestore(): + # Import is placed here to avoid model import at project startup. + from xmodule.modulestore.django import modulestore # pylint: disable=import-outside-toplevel,import-error + return modulestore() + + @staticmethod + def _get_course_overview_model(): + # Import is placed here to avoid model import at project startup. + # pylint: disable=import-outside-toplevel,import-error + from openedx.core.djangoapps.content.course_overviews.models import CourseOverview + return CourseOverview + @staticmethod def strip_branch_and_version(location): """ @@ -51,15 +70,14 @@ def get_course_last_published(course_key): is sortable and similar to ISO 8601: https://docs.python.org/3/library/datetime.html#datetime.date.__str__ """ - # Import is placed here to avoid model import at project startup. - # pylint: disable=import-outside-toplevel,import-error - from openedx.core.djangoapps.content.course_overviews.models import CourseOverview - + CourseOverview = CoursePublishedSink._get_course_overview_model() approx_last_published = CourseOverview.get_from_id(course_key).modified + print(approx_last_published) + raise Exception("foo") return str(approx_last_published) @staticmethod - def serialize_item(item, index): + def serialize_item(item, index, detached_xblock_types): """ Args: item: an XBlock @@ -70,9 +88,6 @@ def serialize_item(item, index): block_type: the name of the XBlock's type (i.e. 'course' or 'problem') """ - # pylint: disable=import-outside-toplevel,import-error - from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES - course_key = item.scope_ids.usage_id.course_key block_type = item.scope_ids.block_type @@ -84,9 +99,9 @@ def serialize_item(item, index): 'location': str(item.location), 'display_name': item.display_name_with_default.replace("'", "\'"), 'block_type': block_type, - 'detached': 1 if block_type in DETACHED_XBLOCK_TYPES else 0, + 'detached': 1 if block_type in detached_xblock_types else 0, 'edited_on': str(getattr(item, 'edited_on', '')), - 'time_last_dumped': str(timezone.now()), + 'time_last_dumped': str(timezone.now()), 'order': index, } @@ -103,19 +118,19 @@ def serialize_course(self, course_id): nodes: a list of dicts representing xblocks for the course relationships: a list of dicts representing relationships between nodes """ - # Import is placed here to avoid model import at project startup. - from xmodule.modulestore.django import modulestore # pylint: disable=import-outside-toplevel,import-error + modulestore = CoursePublishedSink._get_modulestore() + detached_xblock_types = CoursePublishedSink._get_detached_xblock_types() # create a location to node mapping we'll need later for # writing relationships location_to_node = {} - items = modulestore().get_items(course_id) + items = modulestore.get_items(course_id) # create nodes i = 0 for item in items: i += 1 - fields = self.serialize_item(item, i) + fields = self.serialize_item(item, i, detached_xblock_types) location_to_node[self.strip_branch_and_version(item.location)] = fields # create relationships @@ -142,6 +157,7 @@ def dump(self, course_key): Do the serialization and send to ClickHouse """ nodes, relationships = self.serialize_course(course_key) + self.log.info( "Now dumping %s to ClickHouse: %d nodes and %d relationships", course_key, @@ -198,3 +214,4 @@ def dump(self, course_key): "Error trying to dump course %s to ClickHouse!", course_string ) + raise diff --git a/test_utils/helpers.py b/test_utils/helpers.py new file mode 100644 index 0000000..6c02a7d --- /dev/null +++ b/test_utils/helpers.py @@ -0,0 +1,157 @@ +import csv +import random +import string +from datetime import datetime +from io import StringIO +from unittest.mock import MagicMock, Mock + +from opaque_keys.edx.keys import CourseKey +from opaque_keys.edx.locator import BlockUsageLocator + +from event_sink_clickhouse.sinks.course_published import CoursePublishedSink + +ORIG_IMPORT = __import__ +ORG = "testorg" +COURSE = "testcourse" +COURSE_RUN = "2023_Fall" + + +class FakeXBlock: + def __init__(self, identifier, detached_block=False): + self.block_type = "course_info" if detached_block else "vertical" + self.scope_ids = Mock() + self.scope_ids.usage_id.course_key = course_key_factory() + self.scope_ids.block_type = self.block_type + self.location = block_usage_locator_factory() + self.display_name_with_default = f"Display name {identifier}" + self.edited_on = datetime.now() + self.children = [] + + def get_children(self): + return self.children + + +def course_str_factory(): + course_str = f"course-v1:{ORG}+{COURSE}+{COURSE_RUN}" + return course_str + + +def course_key_factory(): + return CourseKey.from_string(course_str_factory()) + + +def block_usage_locator_factory(): + block_id = ''.join(random.choices(string.ascii_letters, k=10)) + return BlockUsageLocator(course_key_factory(), block_type="category", block_id=block_id, deprecated=True) + + +def mock_course_overview(): + mock_overview = MagicMock() + mock_overview.get_from_id = MagicMock() + mock_overview.get_from_id.return_value = datetime.now() + # CourseOverview.get_from_id(course_key).modified + return mock_overview + + +def mock_detached_xblock_types(): + # Current values as of 2023-05-01 + return {'static_tab', 'about', 'course_info'} + + +def get_clickhouse_http_params(): + blocks_params = { + "input_format_allow_errors_num": 1, + "input_format_allow_errors_ratio": 0.1, + "query": "INSERT INTO cool_data.course_blocks FORMAT CSV" + } + relationships_params = { + "input_format_allow_errors_num": 1, + "input_format_allow_errors_ratio": 0.1, + "query": "INSERT INTO cool_data.course_relationships FORMAT CSV" + } + + return blocks_params, relationships_params + + +def course_factory(): + top_block = FakeXBlock("top") + course = [top_block, ] + + for i in range(3): + block = FakeXBlock(f"Child {i}") + course.append(block) + top_block.children.append(block) + + if i > 0: + sub_block = FakeXBlock(f"Grandchild {i}") + course.append(sub_block) + block.children.append(sub_block) + + for i in range(3): + course.append(FakeXBlock(f"Detached {i}", detached_block=True)) + + return course + + +def check_block_csv_matcher(course): + def match(request): + body = request.body + lines = body.split("\n")[:-1] + + if len(lines) != len(course): + return False, f"Body has {len(lines)} lines, course has {len(course)}" + + f = StringIO(body) + reader = csv.reader(f) + + i = 0 + try: + for row in reader: + block = course[i] + assert row[0] == block.location.org + assert row[1] == str(block.location.course_key) + assert row[2] == block.location.course + assert row[3] == block.location.run + assert row[4] == str(course[i].location) + assert row[5] == block.display_name_with_default + assert row[6] == str(block.block_type) + i += 1 + except AssertionError as e: + return False, f"Mismatch in row {i}: {e}" + return True, "" + return match + + +def check_relationship_csv_matcher(course): + relationships = [] + for block in course: + course_key = str(block.location.course_key) + for index, child in enumerate(block.get_children()): + parent_node = str(CoursePublishedSink.strip_branch_and_version(block.location)) + child_node = str(CoursePublishedSink.strip_branch_and_version(child.location)) + relationships.append((course_key, parent_node, child_node)) + + def match(request): + body = request.body + lines = body.split("\n")[:-1] + + if len(lines) != len(relationships): + return False, f"Body has {len(lines)} lines but there are {len(relationships)} relationships" + + f = StringIO(body) + reader = csv.reader(f) + + i = 0 + try: + for row in reader: + print(row) + print(relationships[i]) + relation = relationships[i] + assert row[0] == relation[0] + assert row[1] == relation[1] + assert row[2] == relation[2] + i += 1 + except AssertionError as e: + return False, f"Mismatch in row {i}: {e}" + return True, "" + return match diff --git a/tests/test_course_published.py b/tests/test_course_published.py new file mode 100644 index 0000000..834c00f --- /dev/null +++ b/tests/test_course_published.py @@ -0,0 +1,50 @@ +import logging +from io import StringIO +from unittest.mock import patch + +import responses +from responses import matchers +from responses.registries import OrderedRegistry + +from test_utils.helpers import ( + check_block_csv_matcher, + check_relationship_csv_matcher, + course_factory, + course_str_factory, + get_clickhouse_http_params, + mock_detached_xblock_types, +) +from event_sink_clickhouse.tasks import dump_course_to_clickhouse + + +@responses.activate(registry=OrderedRegistry) +@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_detached_xblock_types") +@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_modulestore") +def test_course_publish_success(mock_modulestore, mock_detached, caplog): + caplog.set_level(logging.INFO, logger="edx.celery.task") + course = course_factory() + mock_modulestore.return_value.get_items.return_value = course + mock_detached.return_value = mock_detached_xblock_types() + + blocks_params, relationships_params = get_clickhouse_http_params() + + responses.post( + "https://foo.bar/", + match=[ + matchers.query_param_matcher(blocks_params), + check_block_csv_matcher(course) + ], + ) + responses.post( + "https://foo.bar/", + match=[ + matchers.query_param_matcher(relationships_params), + check_relationship_csv_matcher(course) + ], + ) + + course = course_str_factory() + dump_course_to_clickhouse(course) + + assert mock_modulestore.call_count == 1 + assert mock_detached.call_count == 1 diff --git a/tests/test_django_settings.py b/tests/test_django_settings.py new file mode 100644 index 0000000..52ff506 --- /dev/null +++ b/tests/test_django_settings.py @@ -0,0 +1,55 @@ +""" +Test plugin settings +""" + +from django.conf import settings +from django.test import TestCase + +from event_sink_clickhouse.settings import common as common_settings +from event_sink_clickhouse.settings import production as production_setttings + + +class TestPluginSettings(TestCase): + """ + Tests plugin settings + """ + + def test_common_settings(self): + """ + Test common settings + """ + common_settings.plugin_settings(settings) + + for key in ("url", "username", "password", "database", "timeout_secs"): + assert key in settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG + + def test_production_settings(self): + """ + Test production settings + """ + test_url = "https://foo.bar" + test_username = "bob" + test_password = "secret" + test_database = "cool_data" + test_timeout = 1 + + settings.ENV_TOKENS = { + 'EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG': { + "url": test_url, + "username": test_username, + "password": test_password, + "database": test_database, + "timeout_secs": test_timeout + } + } + production_setttings.plugin_settings(settings) + + for key, val in ( + ("url", test_url), + ("username", test_username), + ("password", test_password), + ("database", test_database), + ("timeout_secs", test_timeout), + ): + assert key in settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG + assert settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG[key] == val