diff --git a/event_sink_clickhouse/apps.py b/event_sink_clickhouse/apps.py index 1501438..2d666d2 100644 --- a/event_sink_clickhouse/apps.py +++ b/event_sink_clickhouse/apps.py @@ -45,8 +45,8 @@ class EventSinkClickhouseConfig(AppConfig): def ready(self): """ - Import our Celery tasks for initialization + Import our Celery tasks for initialization. """ super().ready() - from . import tasks + from . import tasks # pylint: disable=import-outside-toplevel, unused-import diff --git a/event_sink_clickhouse/settings/production.py b/event_sink_clickhouse/settings/production.py index 192e0dd..18fee66 100644 --- a/event_sink_clickhouse/settings/production.py +++ b/event_sink_clickhouse/settings/production.py @@ -1,4 +1,9 @@ -def plugin_settings(settings): # pylint: disable=unused-argument +""" +Production settings for the openedx_event_sink_clickhouse app. +""" + + +def plugin_settings(settings): """ Override the default app settings with production settings. """ diff --git a/event_sink_clickhouse/signals.py b/event_sink_clickhouse/signals.py index 862d76f..360f975 100644 --- a/event_sink_clickhouse/signals.py +++ b/event_sink_clickhouse/signals.py @@ -5,12 +5,10 @@ def receive_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument """ - Receives publishing signal and performs publishing related workflows, such as - registering proctored exams, building up credit requirements, and performing - search indexing + Receives COURSE_PUBLISHED signal and queues the dump job. """ # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded - from .tasks import dump_course_to_clickhouse + from .tasks import dump_course_to_clickhouse # pylint: disable=import-outside-toplevel course_key_str = str(course_key) dump_course_to_clickhouse.delay(course_key_str) diff --git a/event_sink_clickhouse/sinks/__init__.py b/event_sink_clickhouse/sinks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/event_sink_clickhouse/sinks/base_sink.py b/event_sink_clickhouse/sinks/base_sink.py new file mode 100644 index 0000000..eb23a14 --- /dev/null +++ b/event_sink_clickhouse/sinks/base_sink.py @@ -0,0 +1,23 @@ +""" +Base classes for event sinks +""" + +from django.conf import settings + + +class BaseSink: + """ + Base class for ClickHouse event sink, allows overwriting of default settings + """ + connection_overrides = None + log = None + + def __init__(self, connection_overrides, log): + + self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"] + self.ch_auth = (settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"], + settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"]) + self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["clickhouse_database"] + self.ch_timeout_secs = 3 + self.connection_overrides = connection_overrides + self.log = log diff --git a/event_sink_clickhouse/destination.py b/event_sink_clickhouse/sinks/course_published.py similarity index 84% rename from event_sink_clickhouse/destination.py rename to event_sink_clickhouse/sinks/course_published.py index 794c97a..7a85e73 100644 --- a/event_sink_clickhouse/destination.py +++ b/event_sink_clickhouse/sinks/course_published.py @@ -1,25 +1,28 @@ -import csv -import io +""" +Handler for the CMS COURSE_PUBLISHED event -from django.conf import settings -from django.utils import timezone +Does the following: +- Pulls the course structure from modulestore +- Serialize the xblocks and their parent/child relationships +- Sends them to ClickHouse in CSV format -import requests +Note that the serialization format does not include all fields as there may be things like +LTI passwords and other secrets. We just take the fields necessary for reporting at this time. +""" +import csv +import io -class ClickHouseDestination: - connection_overrides = None - log = None +import requests +from django.utils import timezone - def __init__(self, connection_overrides, log): +from .base_sink import BaseSink - self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"] - self.ch_auth = (settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"], - settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"]) - self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["clickhouse_database"] - self.connection_overrides = connection_overrides - self.log = log +class CoursePublishedSink(BaseSink): + """ + Event sink for the COURSE_PUBLISHED signal + """ @staticmethod def strip_branch_and_version(location): """ @@ -49,6 +52,7 @@ def get_course_last_published(course_key): https://docs.python.org/3/library/datetime.html#datetime.date.__str__ """ # Import is placed here to avoid model import at project startup. + # pylint: disable=import-outside-toplevel,import-error from openedx.core.djangoapps.content.course_overviews.models import CourseOverview approx_last_published = CourseOverview.get_from_id(course_key).modified @@ -66,6 +70,7 @@ def serialize_item(item, index): block_type: the name of the XBlock's type (i.e. 'course' or 'problem') """ + # pylint: disable=import-outside-toplevel,import-error from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES course_key = item.scope_ids.usage_id.course_key @@ -85,7 +90,7 @@ def serialize_item(item, index): 'order': index, } - return rtn_fields, block_type + return rtn_fields def serialize_course(self, course_id): """ @@ -99,7 +104,7 @@ def serialize_course(self, course_id): relationships: a csv of relationships between nodes """ # Import is placed here to avoid model import at project startup. - from xmodule.modulestore.django import modulestore + from xmodule.modulestore.django import modulestore # pylint: disable=import-outside-toplevel,import-error # create a location to node mapping we'll need later for # writing relationships @@ -110,7 +115,7 @@ def serialize_course(self, course_id): i = 0 for item in items: i += 1 - fields, block_type = self.serialize_item(item, i) + fields = self.serialize_item(item, i) location_to_node[self.strip_branch_and_version(item.location)] = fields # create relationships @@ -133,6 +138,9 @@ def serialize_course(self, course_id): return nodes, relationships def dump(self, course_key): + """ + Do the serialization and send to ClickHouse + """ nodes, relationships = self.serialize_course(course_key) self.log.info( "Now dumping %s to ClickHouse: %d nodes and %d relationships", @@ -161,7 +169,8 @@ def dump(self, course_key): for node in nodes: writer.writerow(node.values()) - response = requests.post(self.ch_url, data=output.getvalue(), params=params, auth=self.ch_auth) + response = requests.post(self.ch_url, data=output.getvalue(), params=params, auth=self.ch_auth, + timeout=self.ch_timeout_secs) self.log.info(response.headers) self.log.info(response) self.log.info(response.text) @@ -175,7 +184,8 @@ def dump(self, course_key): for relationship in relationships: writer.writerow(relationship.values()) - response = requests.post(self.ch_url, data=output.getvalue(), params=params, auth=self.ch_auth) + response = requests.post(self.ch_url, data=output.getvalue(), params=params, auth=self.ch_auth, + timeout=self.ch_timeout_secs) self.log.info(response.headers) self.log.info(response) self.log.info(response.text) diff --git a/event_sink_clickhouse/tasks.py b/event_sink_clickhouse/tasks.py index 0e98da0..60d54e1 100644 --- a/event_sink_clickhouse/tasks.py +++ b/event_sink_clickhouse/tasks.py @@ -1,15 +1,14 @@ """ -This file contains a management command for exporting course modulestore data to ClickHouse +This file contains a management command for exporting course modulestore data to ClickHouse. """ import logging from celery import shared_task -from edx_django_utils.cache import RequestCache from edx_django_utils.monitoring import set_code_owner_attribute from opaque_keys.edx.keys import CourseKey -from .destination import ClickHouseDestination +from .sinks.course_published import CoursePublishedSink log = logging.getLogger(__name__) celery_log = logging.getLogger('edx.celery.task') @@ -19,13 +18,13 @@ @set_code_owner_attribute def dump_course_to_clickhouse(course_key_string, connection_overrides=None): """ - Serializes a course and writes it to neo4j. + Serialize a course and writes it to ClickHouse. Arguments: course_key_string: course key for the course to be exported connection_overrides (dict): overrides to ClickHouse connection - parameters specified in `settings.COURSEGRAPH_CONNECTION`. + parameters specified in `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`. """ course_key = CourseKey.from_string(course_key_string) - destination = ClickHouseDestination(connection_overrides=connection_overrides, log=celery_log) - destination.dump(course_key) + sink = CoursePublishedSink(connection_overrides=connection_overrides, log=celery_log) + sink.dump(course_key)