From 3e2c2557a1566cb95e817ddf7248d54a5264f07d Mon Sep 17 00:00:00 2001 From: Cristhian Garcia Date: Thu, 17 Aug 2023 12:19:04 -0500 Subject: [PATCH] feat: allow to extend event sink chore: install rest framework feat: use django serializer for sink serialization chore: reformat all files chore: bump version to 0.2.0 chore: reformat all files test: fix tests settings chore: fix quality checks test: use test settings for tox feat: allow to enable sink from settings or waffle flags chore: fix quality checks chore: update courseoverview sink to new abstraction chore: reformat all files chore: add documentation for base model sink fix: add missing graded xblock attribute chore: remove unused serializers chore: add documentation for base model sink chore: remove old course published event sink fix: remove nested_sink validation feat: add generic backfill command fix: bring back the should dump course logic test: fix broken tests chore: rename course_key for unique_key test: add unit test for utils test: add test for main class methods (cherry picked from commit 6d64950dcf0bb547feb17cb688334ae991f453fc) chore: install ddt as dev dependency chore: refactor get_courseware and get_detached_xblocks test: mock the item serializer test: Fix test issues test: decode relationships using utf-8 chore: refactor for quality checks test: improve test coverage for base sink test: add tests for dump data to clickhouse test: add more tests test: add tests for signals and tasks chore: upgrade python requirements test: Remove coverage from unnecessary code paths test: Remove coverage from unnecessary code paths --- Makefile | 4 + event_sink_clickhouse/__init__.py | 2 +- event_sink_clickhouse/apps.py | 39 +- .../commands/dump_courses_to_clickhouse.py | 65 ++- .../commands/dump_data_to_clickhouse.py | 207 +++++++ event_sink_clickhouse/serializers.py | 111 ++++ event_sink_clickhouse/settings/common.py | 17 +- event_sink_clickhouse/settings/production.py | 4 +- event_sink_clickhouse/signals.py | 21 +- event_sink_clickhouse/sinks/base_sink.py | 319 ++++++++++- .../sinks/course_published.py | 531 ++++++------------ .../sinks/user_profile_sink.py | 16 + event_sink_clickhouse/tasks.py | 51 +- event_sink_clickhouse/utils.py | 59 ++ event_sink_clickhouse/waffle.py | 4 + requirements/base.in | 2 + requirements/base.txt | 40 +- requirements/ci.txt | 4 +- requirements/dev.txt | 26 +- requirements/doc.txt | 25 +- requirements/quality.txt | 22 +- requirements/test.in | 1 + requirements/test.txt | 43 +- setup.cfg | 1 + setup.py | 1 + test_utils/helpers.py | 10 +- test_utils/test_settings.py | 44 ++ test_utils/test_utils.py | 73 +++ tests/commands/test_dump_courses_command.py | 12 +- .../commands/test_dump_data_to_clickhouse.py | 176 ++++++ tests/test_base_sink.py | 301 ++++++++++ tests/test_course_published.py | 128 +++-- tests/test_signals.py | 36 ++ tests/test_tasks.py | 59 ++ tox.ini | 6 +- 35 files changed, 1953 insertions(+), 507 deletions(-) create mode 100644 event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py create mode 100644 event_sink_clickhouse/serializers.py create mode 100644 event_sink_clickhouse/sinks/user_profile_sink.py create mode 100644 event_sink_clickhouse/utils.py create mode 100644 event_sink_clickhouse/waffle.py create mode 100644 test_utils/test_settings.py create mode 100644 test_utils/test_utils.py create mode 100644 tests/commands/test_dump_data_to_clickhouse.py create mode 100644 tests/test_base_sink.py create mode 100644 tests/test_signals.py create mode 100644 tests/test_tasks.py diff --git a/Makefile b/Makefile index 7c5fbc8..a678c02 100644 --- a/Makefile +++ b/Makefile @@ -53,6 +53,10 @@ upgrade: ## update the requirements/*.txt files with the latest packages satisfy quality: ## check coding style with pycodestyle and pylint tox -e quality +format: ## format the files with black + black event_sink_clickhouse + isort event_sink_clickhouse + pii_check: ## check for PII annotations on all Django models tox -e pii_check diff --git a/event_sink_clickhouse/__init__.py b/event_sink_clickhouse/__init__.py index aeea954..2abe147 100644 --- a/event_sink_clickhouse/__init__.py +++ b/event_sink_clickhouse/__init__.py @@ -2,4 +2,4 @@ A sink for Open edX events to send them to ClickHouse. """ -__version__ = '0.1.2' +__version__ = "0.2.0" diff --git a/event_sink_clickhouse/apps.py b/event_sink_clickhouse/apps.py index 85c287e..67dafa6 100644 --- a/event_sink_clickhouse/apps.py +++ b/event_sink_clickhouse/apps.py @@ -11,32 +11,33 @@ class EventSinkClickhouseConfig(AppConfig): Configuration for the event_sink_clickhouse Django application. """ - name = 'event_sink_clickhouse' + name = "event_sink_clickhouse" verbose_name = "Event Sink ClickHouse" plugin_app = { PluginSettings.CONFIG: { - 'lms.djangoapp': { - 'production': {PluginSettings.RELATIVE_PATH: 'settings.production'}, - 'common': {PluginSettings.RELATIVE_PATH: 'settings.common'}, + "lms.djangoapp": { + "production": {PluginSettings.RELATIVE_PATH: "settings.production"}, + "common": {PluginSettings.RELATIVE_PATH: "settings.common"}, + }, + "cms.djangoapp": { + "production": {PluginSettings.RELATIVE_PATH: "settings.production"}, + "common": {PluginSettings.RELATIVE_PATH: "settings.common"}, }, - 'cms.djangoapp': { - 'production': {PluginSettings.RELATIVE_PATH: 'settings.production'}, - 'common': {PluginSettings.RELATIVE_PATH: 'settings.common'}, - } }, # Configuration setting for Plugin Signals for this app. PluginSignals.CONFIG: { # Configure the Plugin Signals for each Project Type, as needed. - 'cms.djangoapp': { + "cms.djangoapp": { # List of all plugin Signal receivers for this app and project type. - PluginSignals.RECEIVERS: [{ - # The name of the app's signal receiver function. - PluginSignals.RECEIVER_FUNC_NAME: 'receive_course_publish', - - # The full path to the module where the signal is defined. - PluginSignals.SIGNAL_PATH: 'xmodule.modulestore.django.COURSE_PUBLISHED', - }], + PluginSignals.RECEIVERS: [ + { + # The name of the app's signal receiver function. + PluginSignals.RECEIVER_FUNC_NAME: "receive_course_publish", + # The full path to the module where the signal is defined. + PluginSignals.SIGNAL_PATH: "xmodule.modulestore.django.COURSE_PUBLISHED", + } + ], } }, } @@ -47,4 +48,8 @@ def ready(self): """ super().ready() - from . import tasks # pylint: disable=import-outside-toplevel, unused-import + from event_sink_clickhouse import ( # pylint: disable=import-outside-toplevel, unused-import + signals, + sinks, + tasks, + ) diff --git a/event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py b/event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py index 0bc05e1..2c66efa 100644 --- a/event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py +++ b/event_sink_clickhouse/management/commands/dump_courses_to_clickhouse.py @@ -25,7 +25,7 @@ from django.core.management.base import BaseCommand, CommandError from edx_django_utils.cache import RequestCache -from event_sink_clickhouse.sinks.course_published import CoursePublishedSink +from event_sink_clickhouse.sinks.course_published import CourseOverviewSink from event_sink_clickhouse.tasks import dump_course_to_clickhouse log = logging.getLogger(__name__) @@ -36,7 +36,7 @@ def dump_target_courses_to_clickhouse( course_keys=None, courses_to_skip=None, force=None, - limit=None + limit=None, ): """ Iterates through a list of courses in a modulestore, serializes them to csv, @@ -49,19 +49,23 @@ def dump_target_courses_to_clickhouse( Returns: two lists--one of the courses that had dump jobs queued for them and one of courses that did not. """ - sink = CoursePublishedSink(connection_overrides, log) + sink = CourseOverviewSink(connection_overrides, log) submitted_courses = [] skipped_courses = [] index = 0 - for course_key, should_be_dumped, reason in sink.fetch_target_courses(course_keys, courses_to_skip, force): + for course_key, should_be_dumped, reason in sink.fetch_target_items( + course_keys, courses_to_skip, force + ): log.info(f"Iteration {index}: {course_key}") index += 1 if not should_be_dumped: skipped_courses.append(course_key) - log.info(f"Course {index}: Skipping course {course_key}, reason: '{reason}'") + log.info( + f"Course {index}: Skipping course {course_key}, reason: '{reason}'" + ) else: # RequestCache is a local memory cache used in modulestore for performance reasons. # Normally it is cleared at the end of every request, but in this command it will @@ -83,7 +87,9 @@ def dump_target_courses_to_clickhouse( submitted_courses.append(str(course_key)) if limit and len(submitted_courses) == limit: - log.info(f"Limit of {limit} eligible course has been reached, quitting!") + log.info( + f"Limit of {limit} eligible course has been reached, quitting!" + ) break return submitted_courses, skipped_courses @@ -93,55 +99,56 @@ class Command(BaseCommand): """ Dump course block and relationship data to a ClickHouse instance. """ + help = dedent(__doc__).strip() def add_arguments(self, parser): parser.add_argument( - '--url', + "--url", type=str, help="the URL of the ClickHouse server", ) parser.add_argument( - '--username', + "--username", type=str, help="the username of the ClickHouse user", ) parser.add_argument( - '--password', + "--password", type=str, help="the password of the ClickHouse user", ) parser.add_argument( - '--database', + "--database", type=str, help="the database in ClickHouse to connect to", ) parser.add_argument( - '--timeout_secs', + "--timeout_secs", type=int, help="timeout for ClickHouse requests, in seconds", ) parser.add_argument( - '--courses', - metavar='KEY', + "--courses", + metavar="KEY", type=str, - nargs='*', + nargs="*", help="keys of courses to serialize; if omitted all courses in system are serialized", ) parser.add_argument( - '--courses_to_skip', - metavar='KEY', + "--courses_to_skip", + metavar="KEY", type=str, - nargs='*', + nargs="*", help="keys of courses to NOT to serialize", ) parser.add_argument( - '--force', - action='store_true', + "--force", + action="store_true", help="dump all courses regardless of when they were last published", ) parser.add_argument( - '--limit', + "--limit", type=int, help="maximum number of courses to dump, cannot be used with '--courses' or '--force'", ) @@ -158,7 +165,9 @@ def handle(self, *args, **options): } courses = options["courses"] if options["courses"] else [] - courses_to_skip = options["courses_to_skip"] if options["courses_to_skip"] else [] + courses_to_skip = ( + options["courses_to_skip"] if options["courses_to_skip"] else [] + ) if options["limit"] is not None and int(options["limit"]) < 1: message = "'limit' must be greater than 0!" @@ -166,8 +175,10 @@ def handle(self, *args, **options): raise CommandError(message) if options["limit"] and options["force"]: - message = "The 'limit' option cannot be used with 'force' as running the " \ - "command repeatedly will result in the same courses being dumped every time." + message = ( + "The 'limit' option cannot be used with 'force' as running the " + "command repeatedly will result in the same courses being dumped every time." + ) log.error(message) raise CommandError(message) @@ -175,8 +186,8 @@ def handle(self, *args, **options): connection_overrides, [course_key.strip() for course_key in courses], [course_key.strip() for course_key in courses_to_skip], - options['force'], - options['limit'] + options["force"], + options["limit"], ) log.info( @@ -189,6 +200,6 @@ def handle(self, *args, **options): log.info("No courses submitted for export to ClickHouse at all!") else: log.info( # pylint: disable=logging-not-lazy - "These courses were submitted for dump to ClickHouse successfully:\n\t" + - "\n\t".join(submitted_courses) + "These courses were submitted for dump to ClickHouse successfully:\n\t" + + "\n\t".join(submitted_courses) ) diff --git a/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py new file mode 100644 index 0000000..25605c7 --- /dev/null +++ b/event_sink_clickhouse/management/commands/dump_data_to_clickhouse.py @@ -0,0 +1,207 @@ +""" +Management command for exporting the modulestore ClickHouse. + +Example usages (see usage for more options): + + # Dump all objects published since last dump. + # Use connection parameters from `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`: + python manage.py cms dump_objects_to_clickhouse --object user_profile + + # Specify certain objects instead of dumping all of them. + # Use connection parameters from `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`. + python manage.py cms dump_objects_to_clickhouse --object user_profile --objects 123 124 125 + + # Dump a limited number of objects to prevent stress on production systems + python manage.py cms dump_objects_to_clickhouse --limit 1000 +""" +import logging +from textwrap import dedent + +from django.core.management.base import BaseCommand, CommandError + +from event_sink_clickhouse.sinks.base_sink import ModelBaseSink +from event_sink_clickhouse.tasks import dump_data_to_clickhouse + +log = logging.getLogger(__name__) + + +def dump_target_objects_to_clickhouse( + connection_overrides=None, + sink=None, + object_ids=None, + objects_to_skip=None, + force=None, + limit=None, +): + """ + Iterates through a list of objects in the ORN, serializes them to csv, + then submits tasks to post them to ClickHouse. + + Arguments: + force: serialize the objects even if they've been recently + serialized + + Returns: two lists--one of the objects that had dump jobs queued for them + and one of objects that did not. + """ + + submitted_objects = [] + skipped_objects = [] + + index = 0 + for object_id, should_be_dumped, reason in sink.fetch_target_items( + object_ids, objects_to_skip, force + ): + log.info(f"Iteration {index}: {object_id}") + index += 1 + + if not should_be_dumped: + skipped_objects.append(object_id) + log.info( + f"{sink.model} {index}: Skipping object {object_id}, reason: '{reason}'" + ) + else: + log.info( + f"{sink.model} {index}: Submitting {object_id} for dump to ClickHouse, reason '{reason}'." + ) + + dump_data_to_clickhouse.apply_async( + kwargs={ + "sink_module": sink.__module__, + "sink_name": sink.__class__.__name__, + "object_id": str(object_id), + "connection_overrides": connection_overrides, + } + ) + + submitted_objects.append(str(object_id)) + + if limit and len(submitted_objects) == limit: + log.info( + f"Limit of {limit} eligible objects has been reached, quitting!" + ) + break + + return submitted_objects, skipped_objects + + +class Command(BaseCommand): + """ + Dump objects to a ClickHouse instance. + """ + + help = dedent(__doc__).strip() + + def add_arguments(self, parser): + parser.add_argument( + "--url", + type=str, + help="the URL of the ClickHouse server", + ) + parser.add_argument( + "--username", + type=str, + help="the username of the ClickHouse user", + ) + parser.add_argument( + "--password", + type=str, + help="the password of the ClickHouse user", + ) + parser.add_argument( + "--database", + type=str, + help="the database in ClickHouse to connect to", + ) + parser.add_argument( + "--timeout_secs", + type=int, + help="timeout for ClickHouse requests, in seconds", + ) + parser.add_argument( + "--object", + type=str, + help="the type of object to dump", + ) + parser.add_argument( + "--ids", + metavar="KEY", + type=str, + nargs="*", + help="keys of objects to serialize; if omitted all objects in system are serialized", + ) + parser.add_argument( + "--ids_to_skip", + metavar="KEY", + type=str, + nargs="*", + help="keys of objects to NOT to serialize", + ) + parser.add_argument( + "--force", + action="store_true", + help="dump all objects regardless of when they were last published", + ) + parser.add_argument( + "--limit", + type=int, + help="maximum number of objects to dump, cannot be used with '--ids' or '--force'", + ) + + def handle(self, *args, **options): + """ + Iterates through each objects, serializes and saves them to clickhouse. + """ + connection_overrides = { + key: options[key] + for key in ["url", "username", "password", "database", "timeout_secs"] + if options[key] + } + + ids = options["ids"] if options["ids"] else [] + ids_to_skip = options["ids_to_skip"] if options["ids_to_skip"] else [] + + if options["limit"] is not None and int(options["limit"]) < 1: + message = "'limit' must be greater than 0!" + log.error(message) + raise CommandError(message) + + if options["limit"] and options["force"]: + message = ( + "The 'limit' option cannot be used with 'force' as running the " + "command repeatedly will result in the same objects being dumped every time." + ) + log.error(message) + raise CommandError(message) + + if options["object"] is None: + message = "You must specify an object type to dump!" + log.error(message) + raise CommandError(message) + + for cls in ModelBaseSink.__subclasses__(): # pragma: no cover + if cls.model == options["object"]: + sink = cls(connection_overrides, log) + submitted_objects, skipped_objects = dump_target_objects_to_clickhouse( + connection_overrides, + sink, + [object_id.strip() for object_id in ids], + [object_id.strip() for object_id in ids_to_skip], + options["force"], + options["limit"], + ) + + log.info( + "%d objects submitted for export to ClickHouse. %d objects skipped.", + len(submitted_objects), + len(skipped_objects), + ) + + if not submitted_objects: + log.info("No objects submitted for export to ClickHouse at all!") + else: + log.info( # pylint: disable=logging-not-lazy + "These objects were submitted for dump to ClickHouse successfully:\n\t" + + "\n\t".join(submitted_objects) + ) + break diff --git a/event_sink_clickhouse/serializers.py b/event_sink_clickhouse/serializers.py new file mode 100644 index 0000000..aa70252 --- /dev/null +++ b/event_sink_clickhouse/serializers.py @@ -0,0 +1,111 @@ +"""Django serializers for the event_sink_clickhouse app.""" +import json +import uuid + +from django.utils import timezone +from rest_framework import serializers + +from event_sink_clickhouse.utils import get_model + + +class BaseSinkSerializer(serializers.Serializer): # pylint: disable=abstract-method + """Base sink serializer for ClickHouse.""" + + dump_id = serializers.SerializerMethodField() + time_last_dumped = serializers.SerializerMethodField() + + class Meta: + """Meta class for base sink serializer.""" + + fields = [ + "dump_id", + "time_last_dumped", + ] + + def get_dump_id(self, instance): # pylint: disable=unused-argument + """Return a unique ID for the dump.""" + return uuid.uuid4() + + def get_time_last_dumped(self, instance): # pylint: disable=unused-argument + """Return the timestamp for the dump.""" + return timezone.now() + + +class UserProfileSerializer(BaseSinkSerializer, serializers.ModelSerializer): + """Serializer for user profile events.""" + + class Meta: + """Meta class for user profile serializer.""" + + model = get_model("user_profile") + fields = [ + "id", + "user_id", + "name", + "meta", + "courseware", + "language", + "location", + "year_of_birth", + "gender", + "level_of_education", + "mailing_address", + "city", + "country", + "state", + "goals", + "bio", + "profile_image_uploaded_at", + "phone_number", + "dump_id", + "time_last_dumped", + ] + + +class CourseOverviewSerializer(BaseSinkSerializer, serializers.ModelSerializer): + """Serializer for course overview events.""" + + course_data_json = serializers.SerializerMethodField() + course_key = serializers.SerializerMethodField() + course_start = serializers.CharField(source="start") + course_end = serializers.CharField(source="end") + + class Meta: + """Meta classes for course overview serializer.""" + + model = get_model("course_overviews") + fields = [ + "org", + "course_key", + "display_name", + "course_start", + "course_end", + "enrollment_start", + "enrollment_end", + "self_paced", + "course_data_json", + "created", + "modified", + "dump_id", + "time_last_dumped", + ] + + def get_course_data_json(self, overview): + """Return the course data as a JSON string.""" + json_fields = { + "advertised_start": overview.advertised_start, + "announcement": overview.announcement, + "lowest_passing_grade": float(overview.lowest_passing_grade), + "invitation_only": overview.invitation_only, + "max_student_enrollments_allowed": overview.max_student_enrollments_allowed, + "effort": overview.effort, + "enable_proctored_exams": overview.enable_proctored_exams, + "entrance_exam_enabled": overview.entrance_exam_enabled, + "external_id": overview.external_id, + "language": overview.language, + } + return json.dumps(json_fields) + + def get_course_key(self, overview): + """Return the course key as a string.""" + return str(overview.id) diff --git a/event_sink_clickhouse/settings/common.py b/event_sink_clickhouse/settings/common.py index 1b6d117..f538a8d 100644 --- a/event_sink_clickhouse/settings/common.py +++ b/event_sink_clickhouse/settings/common.py @@ -12,8 +12,19 @@ def plugin_settings(settings): # http://foo.openedx.org:8123/ . Note that we only support the ClickHouse HTTP interface # to avoid pulling in more dependencies to the platform than necessary. "url": "http://clickhouse:8123", - "username": "changeme", - "password": "changeme", + "username": "ch_cms", + "password": "TYreGozgtDG3vkoWPUHVVM6q", "database": "event_sink", - "timeout_secs": 3, + "timeout_secs": 5, + } + + settings.EVENT_SINK_CLICKHOUSE_MODEL_CONFIG = { + "user_profile": { + "module": "common.djangoapps.student.models", + "model": "UserProfile", + }, + "course_overviews": { + "module": "openedx.core.djangoapps.content.course_overviews.models", + "model": "CourseOverview", + }, } diff --git a/event_sink_clickhouse/settings/production.py b/event_sink_clickhouse/settings/production.py index 18fee66..204cafb 100644 --- a/event_sink_clickhouse/settings/production.py +++ b/event_sink_clickhouse/settings/production.py @@ -8,6 +8,6 @@ def plugin_settings(settings): Override the default app settings with production settings. """ settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG = settings.ENV_TOKENS.get( - 'EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG', - settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG + "EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG", + settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG, ) diff --git a/event_sink_clickhouse/signals.py b/event_sink_clickhouse/signals.py index 25890a9..f6bf2ce 100644 --- a/event_sink_clickhouse/signals.py +++ b/event_sink_clickhouse/signals.py @@ -1,9 +1,15 @@ """ Signal handler functions, mapped to specific signals in apps.py. """ +from django.db.models.signals import post_save +from django.dispatch import receiver +from event_sink_clickhouse.utils import get_model -def receive_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument # pragma: no cover + +def receive_course_publish( # pylint: disable=unused-argument # pragma: no cover + sender, course_key, **kwargs +): """ Receives COURSE_PUBLISHED signal and queues the dump job. """ @@ -11,3 +17,16 @@ def receive_course_publish(sender, course_key, **kwargs): # pylint: disable=unu from event_sink_clickhouse.tasks import dump_course_to_clickhouse # pylint: disable=import-outside-toplevel dump_course_to_clickhouse.delay(str(course_key)) + + +@receiver(post_save, sender=get_model("user_profile")) +def on_user_profile_updated( # pylint: disable=unused-argument # pragma: no cover + sender, instance, **kwargs +): + """ + Receives post save signal and queues the dump job. + """ + # import here, because signal is registered at startup, but items in tasks are not yet able to be loaded + from event_sink_clickhouse.tasks import dump_user_profile_to_clickhouse # pylint: disable=import-outside-toplevel + + dump_user_profile_to_clickhouse.delay(instance.id) diff --git a/event_sink_clickhouse/sinks/base_sink.py b/event_sink_clickhouse/sinks/base_sink.py index 987c484..0e8465f 100644 --- a/event_sink_clickhouse/sinks/base_sink.py +++ b/event_sink_clickhouse/sinks/base_sink.py @@ -1,11 +1,18 @@ """ Base classes for event sinks """ +import csv +import datetime +import io import json from collections import namedtuple import requests from django.conf import settings +from edx_toggles.toggles import WaffleFlag + +from event_sink_clickhouse.utils import get_model +from event_sink_clickhouse.waffle import WAFFLE_FLAG_NAMESPACE ClickHouseAuth = namedtuple("ClickHouseAuth", ["username", "password"]) @@ -14,21 +21,36 @@ class BaseSink: """ Base class for ClickHouse event sink, allows overwriting of default settings """ + + CLICKHOUSE_BULK_INSERT_PARAMS = { + "input_format_allow_errors_num": 1, + "input_format_allow_errors_ratio": 0.1, + } + def __init__(self, connection_overrides, log): + self.connection_overrides = connection_overrides self.log = log self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"] - self.ch_auth = ClickHouseAuth(settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"], - settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"]) + self.ch_auth = ClickHouseAuth( + settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"], + settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"], + ) self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["database"] - self.ch_timeout_secs = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["timeout_secs"] + self.ch_timeout_secs = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG[ + "timeout_secs" + ] # If any overrides to the ClickHouse connection if connection_overrides: self.ch_url = connection_overrides.get("url", self.ch_url) - self.ch_auth = ClickHouseAuth(connection_overrides.get("username", self.ch_auth.username), - connection_overrides.get("password", self.ch_auth.password)) + self.ch_auth = ClickHouseAuth( + connection_overrides.get("username", self.ch_auth.username), + connection_overrides.get("password", self.ch_auth.password), + ) self.ch_database = connection_overrides.get("database", self.ch_database) - self.ch_timeout_secs = connection_overrides.get("timeout_secs", self.ch_timeout_secs) + self.ch_timeout_secs = connection_overrides.get( + "timeout_secs", self.ch_timeout_secs + ) def _send_clickhouse_request(self, request, expected_insert_rows=None): """ @@ -62,3 +84,288 @@ def _send_clickhouse_request(self, request, expected_insert_rows=None): # ClickHouse can be configured not to return the metadata / summary we check above for # performance reasons. It's not critical, so we eat those here. return response + + +class ModelBaseSink(BaseSink): + """ + Base class for ClickHouse event sink, allows overwriting of default settings + + This class is used for the model based event sink, which uses the Django ORM to write + events to ClickHouse. + """ + + unique_key = None + """ + str: A unique identifier key used to distinguish between different instances of the sink. + It can be used to specify the uniqueness constraint when writing events to ClickHouse. + """ + + clickhouse_table_name = None + """ + str: The name of the ClickHouse table where the events will be written. + This should be set to the desired table name for the specific event type. + """ + + queryset = None + """ + QuerySet: A Django QuerySet that represents the initial set of data to be processed by the sink. + It can be used to filter and select specific data for writing to ClickHouse. + """ + + name = None + """ + str: A human-readable name for the sink instance. This can be used for logging and identification purposes. + """ + + timestamp_field = None + """ + str: The name of the field in the model representing the timestamp of the event. + It is used to extract the timestamp from the event data for writing to ClickHouse. + """ + + serializer_class = None + """ + Serializer: The serializer class responsible for converting event data into a format suitable for storage. + This serializer should be compatible with Django's serialization framework. + """ + + model = None + """ + Model: The Django model class representing the structure of the event data. + This is used to validate and organize the data before writing it to ClickHouse. + """ + + nested_sinks = [] + """ + list: A list of nested sink instances that can be used to further process or route the event data. + Nested sinks allow chaining multiple sinks together for more complex event processing pipelines. + """ + + def __init__(self, connection_overrides, log): + super().__init__(connection_overrides, log) + + required_fields = [ + self.clickhouse_table_name, + self.timestamp_field, + self.unique_key, + self.name, + ] + + if not all(required_fields): + raise NotImplementedError( + "ModelBaseSink needs to be subclassed with clickhouse_table_name," + "timestamp_field, unique_key, and name" + ) + + self._nested_sinks = [ + sink(connection_overrides, log) for sink in self.nested_sinks + ] + + def get_model(self): + """ + Return the model to be used for the insert + """ + return get_model(self.model) + + def get_queryset(self): + """ + Return the queryset to be used for the insert + """ + return self.get_model().objects.all() + + def dump(self, item_id, many=False, initial=None): + """ + Do the serialization and send to ClickHouse + """ + if many: + # If we're dumping many items, we expect to get a list of items + serialized_item = self.serialize_item(item_id, many=many, initial=initial) + self.log.info( + f"Now dumping {len(serialized_item)} {self.name} to ClickHouse", + ) + self.send_item_and_log(item_id, serialized_item, many) + self.log.info( + f"Completed dumping {len(serialized_item)} {self.name} to ClickHouse" + ) + + for item in serialized_item: + for nested_sink in self._nested_sinks: + nested_sink.dump_related( + item, item["dump_id"], item["time_last_dumped"] + ) + else: + item = self.get_object(item_id) + serialized_item = self.serialize_item(item, many=many, initial=initial) + self.log.info( + f"Now dumping {self.name} {item_id} to ClickHouse", + ) + self.send_item_and_log(item_id, serialized_item, many) + self.log.info(f"Completed dumping {self.name} {item_id} to ClickHouse") + + for nested_sink in self._nested_sinks: + nested_sink.dump_related( + serialized_item, + serialized_item["dump_id"], + serialized_item["time_last_dumped"], + ) + + def send_item_and_log( + self, + item_id, + serialized_item, + many, + ): + """Send the item to clickhouse and log any errors""" + try: + self.send_item(serialized_item, many=many) + except Exception: + self.log.exception( + f"Error trying to dump {self.name} {str(item_id)} to ClickHouse!", + ) + raise + + def get_object(self, item_id): + """ + Return the object to be dumped to ClickHouse + """ + return self.get_model().objects.get(id=item_id) + + def dump_related(self, serialized_item, dump_id, time_last_dumped): + """ + Dump related items to ClickHouse + """ + raise NotImplementedError( + "dump_related needs to be implemented in the subclass" + f"{self.__class__.__name__}!" + ) + + def serialize_item(self, item, many=False, initial=None): + """ + Serialize the data to be sent to ClickHouse + """ + Serializer = self.get_serializer() + serializer = Serializer( # pylint: disable=not-callable + item, many=many, initial=initial + ) + return serializer.data + + def get_serializer(self): + """ + Return the serializer to be used for the insert + """ + return self.serializer_class + + def send_item(self, serialized_item, many=False): + """ + Create the insert query and CSV to send the serialized CourseOverview to ClickHouse. + + We still use a CSV here even though there's only 1 row because it affords handles + type serialization for us and keeps the pattern consistent. + """ + params = self.CLICKHOUSE_BULK_INSERT_PARAMS.copy() + + # "query" is a special param for the query, it's the best way to get the FORMAT CSV in there. + params[ + "query" + ] = f"INSERT INTO {self.ch_database}.{self.clickhouse_table_name} FORMAT CSV" + + output = io.StringIO() + writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC) + + if many: + for node in serialized_item: + writer.writerow(node.values()) + else: + writer.writerow(serialized_item.values()) + + request = requests.Request( + "POST", + self.ch_url, + data=output.getvalue().encode("utf-8"), + params=params, + auth=self.ch_auth, + ) + + self._send_clickhouse_request( + request, expected_insert_rows=len(serialized_item) if many else 1 + ) + + def fetch_target_items(self, ids=None, skip_ids=None, force_dump=False): + """ + Fetch the items that should be dumped to ClickHouse + """ + if ids: + item_keys = [self.convert_id(item_id) for item_id in ids] + else: + item_keys = [self.convert_id(item.id) for item in self.get_queryset()] + + skip_ids = ( + [self.convert_id(item_id) for item_id in skip_ids] if skip_ids else [] + ) + + for item_key in item_keys: + if item_key in skip_ids: + yield item_key, False, f"{self.name} is explicitly skipped" + elif force_dump: + yield item_key, True, "Force is set" + else: + should_be_dumped, reason = self.should_dump_item(item_key) + yield item_key, should_be_dumped, reason + + def convert_id(self, item_id): + """ + Convert the id to the correct type for the model + """ + return item_id + + def should_dump_item(self, unique_key): # pylint: disable=unused-argument + """ + Return True if the item should be dumped to ClickHouse, False otherwise + """ + return True, "No reason" + + def get_last_dumped_timestamp(self, item_id): + """ + Return the last timestamp that was dumped to ClickHouse + """ + params = { + "query": f"SELECT max({self.timestamp_field}) as time_last_dumped " + f"FROM {self.ch_database}.{self.clickhouse_table_name} " + f"WHERE {self.unique_key} = '{item_id}'" + } + + request = requests.Request("GET", self.ch_url, params=params, auth=self.ch_auth) + + response = self._send_clickhouse_request(request) + response.raise_for_status() + if response.text.strip(): + # ClickHouse returns timestamps in the format: "2023-05-03 15:47:39.331024+00:00" + # Our internal comparisons use the str() of a datetime object, this handles that + # transformation so that downstream comparisons will work. + return str(datetime.datetime.fromisoformat(response.text.strip())) + + # Item has never been dumped, return None + return None + + @classmethod + def is_enabled(cls): + """ + Return True if the sink is enabled, False otherwise + """ + enabled = getattr( + settings, + f"{WAFFLE_FLAG_NAMESPACE.upper()}_{cls.model.upper()}_ENABLED", + False, + ) + # .. toggle_name: event_sink_clickhouse.model.enabled + # .. toggle_implementation: WaffleFlag + # .. toggle_default: False + # .. toggle_description: Waffle flag to enable sink + # .. toggle_use_cases: open_edx + # .. toggle_creation_date: 2022-08-17 + waffle_flag = WaffleFlag( + f"{WAFFLE_FLAG_NAMESPACE}.{cls.model}.enabled", + __name__, + ) + + return enabled or waffle_flag.is_enabled() diff --git a/event_sink_clickhouse/sinks/course_published.py b/event_sink_clickhouse/sinks/course_published.py index 65f15c1..30705d7 100644 --- a/event_sink_clickhouse/sinks/course_published.py +++ b/event_sink_clickhouse/sinks/course_published.py @@ -9,18 +9,14 @@ Note that the serialization format does not include all fields as there may be things like LTI passwords and other secrets. We just take the fields necessary for reporting at this time. """ - -import csv import datetime -import io import json -import uuid -import requests -from django.utils import timezone from opaque_keys.edx.keys import CourseKey -from event_sink_clickhouse.sinks.base_sink import BaseSink +from event_sink_clickhouse.serializers import CourseOverviewSerializer +from event_sink_clickhouse.sinks.base_sink import ModelBaseSink +from event_sink_clickhouse.utils import get_detached_xblock_types, get_modulestore # Defaults we want to ensure we fail early on bulk inserts CLICKHOUSE_BULK_INSERT_PARAMS = { @@ -29,420 +25,188 @@ } -class CoursePublishedSink(BaseSink): +class XBlockRelationshipSink(ModelBaseSink): """ - Event sink for the COURSE_PUBLISHED signal + Sink for XBlock relationships """ - @staticmethod - def _get_detached_xblock_types(): # pragma: no cover - """ - Import and return DETACHED_XBLOCK_TYPES. - Placed here to avoid model import at startup and to facilitate mocking them in testing. - """ - # pylint: disable=import-outside-toplevel,import-error - from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES - return DETACHED_XBLOCK_TYPES - - @staticmethod - def _get_modulestore(): # pragma: no cover - """ - Import and return modulestore. - Placed here to avoid model import at startup and to facilitate mocking them in testing. - """ - # pylint: disable=import-outside-toplevel,import-error - from xmodule.modulestore.django import modulestore - return modulestore() - - @staticmethod - def _get_course_overview_model(): # pragma: no cover - """ - Import and return CourseOverview. - Placed here to avoid model import at startup and to facilitate mocking them in testing. - """ - # pylint: disable=import-outside-toplevel,import-error - from openedx.core.djangoapps.content.course_overviews.models import CourseOverview - return CourseOverview - - @staticmethod - def strip_branch_and_version(location): - """ - Removes the branch and version information from a location. - Args: - location: an xblock's location. - Returns: that xblock's location without branch and version information. - """ - return location.for_branch(None) - @staticmethod - def serialize_xblock(item, index, detached_xblock_types, dump_id, dump_timestamp): - """ - Args: - item: an XBlock - index: a number indicating where the item falls in the course hierarchy + clickhouse_table_name = "course_relationships" + name = "XBlock Relationships" + timestamp_field = "time_last_dumped" + unique_key = "parent_location" - Returns: - fields: a *limited* dictionary of an XBlock's field names and values - block_type: the name of the XBlock's type (i.e. 'course' - or 'problem') - - Schema of the destination table, as defined in tutor-contrib-oars: - org String NOT NULL, - course_key String NOT NULL, - location String NOT NULL, - display_name String NOT NULL, - xblock_data_json String NOT NULL, - order Int32 default 0, - edited_on String NOT NULL, - dump_id UUID NOT NULL, - time_last_dumped String NOT NULL - """ - course_key = item.scope_ids.usage_id.course_key - block_type = item.scope_ids.block_type - - # Extra data not needed for the table to function, things can be - # added here without needing to rebuild the whole table. - json_data = { - 'course': course_key.course, - 'run': course_key.run, - 'block_type': block_type, - 'detached': 1 if block_type in detached_xblock_types else 0, - 'graded': 1 if getattr(item, 'graded', False) else 0, - } + def dump_related(self, serialized_item, dump_id, time_last_dumped): + self.dump( + serialized_item, + many=True, + initial={"dump_id": dump_id, "time_last_dumped": time_last_dumped}, + ) - # Core table data, if things change here it's a big deal. - serialized_block = { - 'org': course_key.org, - 'course_key': str(course_key), - 'location': str(item.location), - 'display_name': item.display_name_with_default.replace("'", "\'"), - 'xblock_data_json': json.dumps(json_data), - 'order': index, - 'edited_on': str(getattr(item, 'edited_on', '')), - 'dump_id': dump_id, - 'time_last_dumped': dump_timestamp, - } + def serialize_item(self, item, many=False, initial=None): + return item - return serialized_block - @staticmethod - def serialize_course_overview(overview, dump_id, time_last_dumped): - """ - Return a dict representing a subset of CourseOverview fields. - - Schema of the downstream table as defined in tutor-contrib-oars: - org String NOT NULL, - course_key String NOT NULL, - display_name String NOT NULL, - course_start String NOT NULL, - course_end String NOT NULL, - enrollment_start String NOT NULL, - enrollment_end String NOT NULL, - self_paced BOOL NOT NULL, - course_data_json String NOT NULL, - created String NOT NULL, - modified String NOT NULL - dump_id UUID NOT NULL, - time_last_dumped String NOT NULL - """ - json_fields = { - "advertised_start": str(overview.advertised_start), - "announcement": str(overview.announcement), - "lowest_passing_grade": str(overview.lowest_passing_grade), - "invitation_only": overview.invitation_only, - "max_student_enrollments_allowed": overview.max_student_enrollments_allowed, - "effort": overview.effort, - "enable_proctored_exams": overview.enable_proctored_exams, - "entrance_exam_enabled": overview.entrance_exam_enabled, - "external_id": overview.external_id, - "language": overview.language, - } +class XBlockSink(ModelBaseSink): + """ + Sink for XBlock model + """ - return { - "org": overview.org, - "course_key": str(overview.id), - "display_name": overview.display_name, - "course_start": overview.start, - "course_end": overview.end, - "enrollment_start": overview.enrollment_start, - "enrollment_end": overview.enrollment_end, - "self_paced": overview.self_paced, - "course_data_json": json.dumps(json_fields), - "created": overview.created, - "modified": overview.modified, - "dump_id": dump_id, - "time_last_dumped": time_last_dumped - } + unique_key = "location" + clickhouse_table_name = "course_blocks" + timestamp_field = "time_last_dumped" + name = "XBlock" + nested_sinks = [] + + def dump_related(self, serialized_item, dump_id, time_last_dumped): + """Dump all XBlocks for a course""" + self.dump( + serialized_item, + many=True, + initial={"dump_id": dump_id, "time_last_dumped": time_last_dumped}, + ) - def serialize_course(self, course_id): + def serialize_item(self, item, many=False, initial=None): """ - Serializes a course into a CSV of nodes and relationships. - - Args: - course_id: CourseKey of the course we want to serialize - - Returns: - nodes: a list of dicts representing xblocks for the course - relationships: a list of dicts representing relationships between nodes + Serialize an XBlock into a dict """ - modulestore = CoursePublishedSink._get_modulestore() - detached_xblock_types = CoursePublishedSink._get_detached_xblock_types() - - dump_id = str(uuid.uuid4()) - dump_timestamp = str(timezone.now()) + course_key = CourseKey.from_string(item["course_key"]) + modulestore = get_modulestore() + detached_xblock_types = get_detached_xblock_types() - courseoverview_model = self._get_course_overview_model() - course_overview = courseoverview_model.get_from_id(course_id) - serialized_course_overview = self.serialize_course_overview(course_overview, dump_id, dump_timestamp) - - # Create a location to node mapping as a lookup for writing relationships later location_to_node = {} - items = modulestore.get_items(course_id) + items = modulestore.get_items(course_key) # Serialize the XBlocks to dicts and map them with their location as keys the # whole map needs to be completed before we can define relationships index = 0 - for item in items: + for block in items: index += 1 - fields = self.serialize_xblock(item, index, detached_xblock_types, dump_id, dump_timestamp) - location_to_node[self.strip_branch_and_version(item.location)] = fields + fields = self.serialize_xblock( + block, + index, + detached_xblock_types, + initial["dump_id"], + initial["time_last_dumped"], + ) + location_to_node[ + XBlockSink.strip_branch_and_version(block.location) + ] = fields + + nodes = list(location_to_node.values()) - # Create a list of relationships between blocks, using their locations as identifiers + self.serialize_relationships( + items, + location_to_node, + course_key, + initial["dump_id"], + initial["time_last_dumped"], + ) + + return nodes + + def serialize_relationships( + self, items, location_to_node, course_id, dump_id, dump_timestamp + ): + """Serialize the relationships between XBlocks""" relationships = [] for item in items: for index, child in enumerate(item.get_children()): - parent_node = location_to_node.get(self.strip_branch_and_version(item.location)) - child_node = location_to_node.get(self.strip_branch_and_version(child.location)) - - if parent_node is not None and child_node is not None: + parent_node = location_to_node.get( + XBlockSink.strip_branch_and_version(item.location) + ) + child_node = location_to_node.get( + XBlockSink.strip_branch_and_version(child.location) + ) + + if parent_node is not None and child_node is not None: # pragma: no cover relationship = { - 'course_key': str(course_id), - 'parent_location': str(parent_node["location"]), - 'child_location': str(child_node["location"]), - 'order': index, - 'dump_id': dump_id, - 'time_last_dumped': dump_timestamp, + "course_key": str(course_id), + "parent_location": str(parent_node["location"]), + "child_location": str(child_node["location"]), + "order": index, + "dump_id": dump_id, + "time_last_dumped": dump_timestamp, } relationships.append(relationship) - - nodes = list(location_to_node.values()) - return serialized_course_overview, nodes, relationships - - def _send_course_overview(self, serialized_overview): - """ - Create the insert query and CSV to send the serialized CourseOverview to ClickHouse. - - We still use a CSV here even though there's only 1 row because it affords handles - type serialization for us and keeps the pattern consistent. - """ - params = CLICKHOUSE_BULK_INSERT_PARAMS.copy() - - # "query" is a special param for the query, it's the best way to get the FORMAT CSV in there. - params["query"] = f"INSERT INTO {self.ch_database}.course_overviews FORMAT CSV" - - output = io.StringIO() - writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC) - writer.writerow(serialized_overview.values()) - - request = requests.Request( - 'POST', - self.ch_url, - data=output.getvalue().encode("utf-8"), - params=params, - auth=self.ch_auth - ) - - self._send_clickhouse_request(request, expected_insert_rows=1) - - def _send_xblocks(self, serialized_xblocks): - """ - Create the insert query and CSV to send the serialized XBlocks to ClickHouse. - """ - params = CLICKHOUSE_BULK_INSERT_PARAMS.copy() - - # "query" is a special param for the query, it's the best way to get the FORMAT CSV in there. - params["query"] = f"INSERT INTO {self.ch_database}.course_blocks FORMAT CSV" - - output = io.StringIO() - writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC) - - for node in serialized_xblocks: - writer.writerow(node.values()) - - request = requests.Request( - 'POST', - self.ch_url, - data=output.getvalue().encode("utf-8"), - params=params, - auth=self.ch_auth - ) - - self._send_clickhouse_request(request, expected_insert_rows=len(serialized_xblocks)) - - def _send_relationships(self, relationships): - """ - Create the insert query and CSV to send the serialized relationships to ClickHouse. - """ - params = CLICKHOUSE_BULK_INSERT_PARAMS.copy() - - # "query" is a special param for the query, it's the best way to get the FORMAT CSV in there. - params["query"] = f"INSERT INTO {self.ch_database}.course_relationships FORMAT CSV" - output = io.StringIO() - writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC) - - for relationship in relationships: - writer.writerow(relationship.values()) - - request = requests.Request( - 'POST', - self.ch_url, - data=output.getvalue(), - params=params, - auth=self.ch_auth + XBlockRelationshipSink(self.connection_overrides, self.log).dump_related( + relationships, dump_id, dump_timestamp ) - self._send_clickhouse_request(request, expected_insert_rows=len(relationships)) + def serialize_xblock( + self, item, index, detached_xblock_types, dump_id, time_last_dumped + ): + """Serialize an XBlock instance into a dict""" + course_key = item.scope_ids.usage_id.course_key + block_type = item.scope_ids.block_type - def dump(self, course_key): - """ - Do the serialization and send to ClickHouse - """ - serialized_courseoverview, serialized_blocks, relationships = self.serialize_course(course_key) + # Extra data not needed for the table to function, things can be + # added here without needing to rebuild the whole table. + json_data = { + "course": course_key.course, + "run": course_key.run, + "block_type": block_type, + "detached": 1 if block_type in detached_xblock_types else 0, + "graded": 1 if getattr(item, "graded", False) else 0, + } - self.log.info( - "Now dumping %s to ClickHouse: %d serialized_blocks and %d relationships", - course_key, - len(serialized_blocks), - len(relationships), - ) + # Core table data, if things change here it's a big deal. + serialized_block = { + "org": course_key.org, + "course_key": str(course_key), + "location": str(item.location), + "display_name": item.display_name_with_default.replace("'", "'"), + "xblock_data_json": json.dumps(json_data), + "order": index, + "edited_on": str(getattr(item, "edited_on", "")), + "dump_id": dump_id, + "time_last_dumped": time_last_dumped, + } - course_string = str(course_key) - - try: - self._send_course_overview(serialized_courseoverview) - self._send_xblocks(serialized_blocks) - self._send_relationships(relationships) - self.log.info("Completed dumping %s to ClickHouse", course_key) - except Exception: - self.log.exception( - "Error trying to dump course %s to ClickHouse!", - course_string - ) - raise + return serialized_block @staticmethod - def get_course_last_published(course_key): - """ - Get approximate last publish date for the given course. - - We use the 'modified' column in the CourseOverview table as a quick and easy - (although perhaps inexact) way of determining when a course was last - published. This works because CourseOverview rows are re-written upon - course publish. - - Args: - course_key: a CourseKey - - Returns: The datetime the course was last published at, stringified. - Uses Python's default str(...) implementation for datetimes, which - is sortable and similar to ISO 8601: - https://docs.python.org/3/library/datetime.html#datetime.date.__str__ - """ - CourseOverview = CoursePublishedSink._get_course_overview_model() - approx_last_published = CourseOverview.get_from_id(course_key).modified - if approx_last_published: - return str(approx_last_published) - - return None - - def fetch_target_courses(self, courses=None, skipped_courses=None, force=None): - """ - Yield a set of courses meeting the given criteria. - - If no parameters are given, loads all course_keys from the - modulestore. Filters out course_keys in the `skip` parameter, - if provided. - - Args: - courses: A list of string serializations of course keys. - For example, ["course-v1:org+course+run"]. - skipped_courses: A list of string serializations of course keys to - be ignored. - force: Include all courses except those explicitly skipped via - skipped_courses - """ - modulestore = CoursePublishedSink._get_modulestore() - - if courses: - course_keys = [CourseKey.from_string(course) for course in courses] - else: - course_keys = [ - course.id for course in modulestore.get_course_summaries() - ] - - for course_key in course_keys: - if course_key in skipped_courses: - yield course_key, False, "Course is explicitly skipped" - elif force: - yield course_key, True, "Force is set" - else: - should_be_dumped, reason = self.should_dump_course(course_key) - yield course_key, should_be_dumped, reason - - def get_course_last_dump_time(self, course_key): + def strip_branch_and_version(location): """ - Get the most recent dump time for this course from ClickHouse - + Removes the branch and version information from a location. Args: - course_key: a CourseKey - - Returns: The datetime that the command was last run, converted into - text, or None, if there's no record of this command last being run. + location: an xblock's location. + Returns: that xblock's location without branch and version information. """ - params = { - "query": f"SELECT max(time_last_dumped) as time_last_dumped " - f"FROM {self.ch_database}.course_blocks " - f"WHERE course_key = '{course_key}'" - } + return location.for_branch(None) - request = requests.Request( - 'GET', - self.ch_url, - params=params, - auth=self.ch_auth - ) - response = self._send_clickhouse_request(request) - response.raise_for_status() - if response.text.strip(): - # ClickHouse returns timestamps in the format: "2023-05-03 15:47:39.331024+00:00" - # Our internal comparisons use the str() of a datetime object, this handles that - # transformation so that downstream comparisons will work. - return str(datetime.datetime.fromisoformat(response.text.strip())) +class CourseOverviewSink(ModelBaseSink): # pylint: disable=abstract-method + """ + Sink for CourseOverview model + """ - # Course has never been dumped, return None - return None + model = "course_overviews" + unique_key = "course_key" + clickhouse_table_name = "course_overviews" + timestamp_field = "time_last_dumped" + name = "Course Overview" + serializer_class = CourseOverviewSerializer + nested_sinks = [XBlockSink] - def should_dump_course(self, course_key): + def should_dump_item(self, unique_key): """ Only dump the course if it's been changed since the last time it's been dumped. - Args: course_key: a CourseKey object. - Returns: - whether this course should be dumped (bool) - reason why course needs, or does not need, to be dumped (string) """ - course_last_dump_time = self.get_course_last_dump_time(course_key) + course_last_dump_time = self.get_last_dumped_timestamp(unique_key) # If we don't have a record of the last time this command was run, # we should serialize the course and dump it if course_last_dump_time is None: return True, "Course is not present in ClickHouse" - course_last_published_date = self.get_course_last_published(course_key) + course_last_published_date = self.get_course_last_published(unique_key) # If we've somehow dumped this course but there is no publish date # skip it @@ -450,10 +214,11 @@ def should_dump_course(self, course_key): return False, "No last modified date in CourseOverview" # Otherwise, dump it if it is newer - course_last_dump_time = datetime.datetime.strptime(course_last_dump_time, "%Y-%m-%d %H:%M:%S.%f+00:00") + course_last_dump_time = datetime.datetime.strptime( + course_last_dump_time, "%Y-%m-%d %H:%M:%S.%f+00:00" + ) course_last_published_date = datetime.datetime.strptime( - course_last_published_date, - "%Y-%m-%d %H:%M:%S.%f+00:00" + course_last_published_date, "%Y-%m-%d %H:%M:%S.%f+00:00" ) needs_dump = course_last_dump_time < course_last_published_date @@ -468,3 +233,31 @@ def should_dump_course(self, course_key): f"last dumped {course_last_dump_time} >= last published {str(course_last_published_date)}" ) return needs_dump, reason + + def get_course_last_published(self, course_key): + """ + Get approximate last publish date for the given course. + We use the 'modified' column in the CourseOverview table as a quick and easy + (although perhaps inexact) way of determining when a course was last + published. This works because CourseOverview rows are re-written upon + course publish. + Args: + course_key: a CourseKey + Returns: The datetime the course was last published at, stringified. + Uses Python's default str(...) implementation for datetimes, which + is sortable and similar to ISO 8601: + https://docs.python.org/3/library/datetime.html#datetime.date.__str__ + """ + CourseOverview = self.get_model() + approx_last_published = CourseOverview.get_from_id(course_key).modified + if approx_last_published: + return str(approx_last_published) + + return None + + def convert_id(self, item_id): + return CourseKey.from_string(item_id) + + def get_queryset(self): + modulestore = get_modulestore() + return modulestore.get_course_summaries() diff --git a/event_sink_clickhouse/sinks/user_profile_sink.py b/event_sink_clickhouse/sinks/user_profile_sink.py new file mode 100644 index 0000000..ff652f6 --- /dev/null +++ b/event_sink_clickhouse/sinks/user_profile_sink.py @@ -0,0 +1,16 @@ +"""User profile sink""" +from event_sink_clickhouse.serializers import UserProfileSerializer +from event_sink_clickhouse.sinks.base_sink import ModelBaseSink + + +class UserProfileSink(ModelBaseSink): # pylint: disable=abstract-method + """ + Sink for user profile events + """ + + model = "user_profile" + unique_key = "id" + clickhouse_table_name = "user_profile" + timestamp_field = "time_last_dumped" + name = "User Profile" + serializer_class = UserProfileSerializer diff --git a/event_sink_clickhouse/tasks.py b/event_sink_clickhouse/tasks.py index 9089bab..ba31c27 100644 --- a/event_sink_clickhouse/tasks.py +++ b/event_sink_clickhouse/tasks.py @@ -3,15 +3,17 @@ """ import logging +from importlib import import_module from celery import shared_task from edx_django_utils.monitoring import set_code_owner_attribute from opaque_keys.edx.keys import CourseKey -from event_sink_clickhouse.sinks.course_published import CoursePublishedSink +from event_sink_clickhouse.sinks.course_published import CourseOverviewSink +from event_sink_clickhouse.sinks.user_profile_sink import UserProfileSink log = logging.getLogger(__name__) -celery_log = logging.getLogger('edx.celery.task') +celery_log = logging.getLogger("edx.celery.task") @shared_task @@ -25,6 +27,45 @@ def dump_course_to_clickhouse(course_key_string, connection_overrides=None): connection_overrides (dict): overrides to ClickHouse connection parameters specified in `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`. """ - course_key = CourseKey.from_string(course_key_string) - sink = CoursePublishedSink(connection_overrides=connection_overrides, log=celery_log) - sink.dump(course_key) + if CourseOverviewSink.is_enabled(): # pragma: no cover + course_key = CourseKey.from_string(course_key_string) + sink = CourseOverviewSink(connection_overrides=connection_overrides, log=celery_log) + sink.dump(course_key) + + +@shared_task +@set_code_owner_attribute +def dump_user_profile_to_clickhouse(user_profile_id, connection_overrides=None): + """ + Serialize a user profile and writes it to ClickHouse. + + Arguments: + user_profile_id: user profile id for the user profile to be exported + connection_overrides (dict): overrides to ClickHouse connection + parameters specified in `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`. + """ + if UserProfileSink.is_enabled(): # pragma: no cover + sink = UserProfileSink( + connection_overrides=connection_overrides, log=celery_log + ) + sink.dump(user_profile_id) + + +@shared_task +@set_code_owner_attribute +def dump_data_to_clickhouse( + sink_module, sink_name, object_id, connection_overrides=None +): + """ + Serialize a data and writes it to ClickHouse. + + Arguments: + sink_module: module path of sink + sink_name: name of sink class + object_id: id of object + connection_overrides (dict): overrides to ClickHouse connection + """ + Sink = getattr(import_module(sink_module), sink_name) + + sink = Sink(connection_overrides=connection_overrides, log=celery_log) + sink.dump(object_id) diff --git a/event_sink_clickhouse/utils.py b/event_sink_clickhouse/utils.py new file mode 100644 index 0000000..28745c3 --- /dev/null +++ b/event_sink_clickhouse/utils.py @@ -0,0 +1,59 @@ +"""Utility functions for event_sink_clickhouse.""" +import logging +from importlib import import_module + +from django.conf import settings + +log = logging.getLogger(__name__) + + +def get_model(model_setting): + """Load a model from a setting.""" + MODEL_CONFIG = getattr(settings, "EVENT_SINK_CLICKHOUSE_MODEL_CONFIG", {}) + + model_config = MODEL_CONFIG.get(model_setting) + if not model_config: + log.error("Unable to find model config for %s", model_setting) + return None + + module = model_config.get("module") + if not module: + log.error("Module was not specified in %s", model_setting) + return None + + model_name = model_config.get("model") + if not model_name: + log.error("Model was not specified in %s", model_setting) + return None + + try: + model = getattr(import_module(module), model_name) + return model + except (ImportError, AttributeError, ModuleNotFoundError): + log.error("Unable to load model %s.%s", module, model_name) + + return None + + +def get_modulestore(): # pragma: no cover + """ + Import and return modulestore. + + Placed here to avoid model import at startup and to facilitate mocking them in testing. + """ + # pylint: disable=import-outside-toplevel,import-error + from xmodule.modulestore.django import modulestore + + return modulestore() + + +def get_detached_xblock_types(): # pragma: no cover + """ + Import and return DETACHED_XBLOCK_TYPES. + + Placed here to avoid model import at startup and to facilitate mocking them in testing. + """ + # pylint: disable=import-outside-toplevel,import-error + from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES + + return DETACHED_XBLOCK_TYPES diff --git a/event_sink_clickhouse/waffle.py b/event_sink_clickhouse/waffle.py new file mode 100644 index 0000000..4ebba5e --- /dev/null +++ b/event_sink_clickhouse/waffle.py @@ -0,0 +1,4 @@ +""" +Configuration for event sink clickhouse. +""" +WAFFLE_FLAG_NAMESPACE = "event_sink_clickhouse" diff --git a/requirements/base.in b/requirements/base.in index c08ac0a..81d1381 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -6,3 +6,5 @@ Django # Web application framework requests # HTTP request library edx-django-utils # Django utilities, we use caching and monitoring edx-opaque-keys # Parsing library for course and usage keys +django-rest-framework # REST API framework +edx-toggles diff --git a/requirements/base.txt b/requirements/base.txt index 5c01227..2a454d9 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -28,6 +28,7 @@ click==8.1.7 # click-didyoumean # click-plugins # click-repl + # code-annotations # edx-django-utils click-didyoumean==0.3.0 # via celery @@ -35,26 +36,46 @@ click-plugins==1.1.1 # via celery click-repl==0.3.0 # via celery +code-annotations==1.5.0 + # via edx-toggles django==3.2.20 # via # -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt # -r requirements/base.in # django-crum # django-waffle + # djangorestframework # edx-django-utils + # edx-toggles django-crum==0.7.9 - # via edx-django-utils + # via + # edx-django-utils + # edx-toggles +django-rest-framework==0.1.0 + # via -r requirements/base.in django-waffle==4.0.0 - # via edx-django-utils + # via + # edx-django-utils + # edx-toggles +djangorestframework==3.14.0 + # via django-rest-framework edx-django-utils==5.7.0 - # via -r requirements/base.in + # via + # -r requirements/base.in + # edx-toggles edx-opaque-keys==2.5.0 # via -r requirements/base.in +edx-toggles==5.1.0 + # via -r requirements/base.in idna==3.4 # via requests +jinja2==3.1.2 + # via code-annotations kombu==5.3.1 # via celery -newrelic==8.11.0 +markupsafe==2.1.3 + # via jinja2 +newrelic==9.0.0 # via edx-django-utils pbr==5.11.1 # via stevedore @@ -70,8 +91,14 @@ pynacl==1.5.0 # via edx-django-utils python-dateutil==2.8.2 # via celery +python-slugify==8.0.1 + # via code-annotations pytz==2023.3 - # via django + # via + # django + # djangorestframework +pyyaml==6.0.1 + # via code-annotations requests==2.31.0 # via -r requirements/base.in six==1.16.0 @@ -80,8 +107,11 @@ sqlparse==0.4.4 # via django stevedore==5.1.0 # via + # code-annotations # edx-django-utils # edx-opaque-keys +text-unidecode==1.3 + # via python-slugify typing-extensions==4.7.1 # via # asgiref diff --git a/requirements/ci.txt b/requirements/ci.txt index 4223b02..431a6fe 100644 --- a/requirements/ci.txt +++ b/requirements/ci.txt @@ -6,7 +6,7 @@ # distlib==0.3.7 # via virtualenv -filelock==3.12.2 +filelock==3.12.3 # via # tox # virtualenv @@ -29,5 +29,7 @@ tox==3.28.0 # tox-battery tox-battery==0.6.2 # via -r requirements/ci.in +typing-extensions==4.7.1 + # via filelock virtualenv==20.24.3 # via tox diff --git a/requirements/dev.txt b/requirements/dev.txt index b7650e7..c9dac5b 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -79,10 +79,13 @@ code-annotations==1.5.0 # via # -r requirements/quality.txt # edx-lint + # edx-toggles coverage[toml]==7.3.0 # via # -r requirements/quality.txt # pytest-cov +ddt==1.6.0 + # via -r requirements/quality.txt diff-cover==7.7.0 # via -r requirements/dev.in dill==0.3.7 @@ -99,29 +102,43 @@ django==3.2.20 # -r requirements/quality.txt # django-crum # django-waffle + # djangorestframework # edx-django-utils # edx-i18n-tools + # edx-toggles django-crum==0.7.9 # via # -r requirements/quality.txt # edx-django-utils + # edx-toggles +django-rest-framework==0.1.0 + # via -r requirements/quality.txt django-waffle==4.0.0 # via # -r requirements/quality.txt # edx-django-utils + # edx-toggles +djangorestframework==3.14.0 + # via + # -r requirements/quality.txt + # django-rest-framework edx-django-utils==5.7.0 - # via -r requirements/quality.txt + # via + # -r requirements/quality.txt + # edx-toggles edx-i18n-tools==1.1.0 # via -r requirements/dev.in edx-lint==5.3.4 # via -r requirements/quality.txt edx-opaque-keys==2.5.0 # via -r requirements/quality.txt +edx-toggles==5.1.0 + # via -r requirements/quality.txt exceptiongroup==1.1.3 # via # -r requirements/quality.txt # pytest -filelock==3.12.2 +filelock==3.12.3 # via # -r requirements/ci.txt # tox @@ -159,7 +176,7 @@ mccabe==0.7.0 # via # -r requirements/quality.txt # pylint -newrelic==8.11.0 +newrelic==9.0.0 # via # -r requirements/quality.txt # edx-django-utils @@ -269,6 +286,7 @@ pytz==2023.3 # via # -r requirements/quality.txt # django + # djangorestframework pyyaml==6.0.1 # via # -r requirements/quality.txt @@ -335,10 +353,12 @@ types-pyyaml==6.0.12.11 # responses typing-extensions==4.7.1 # via + # -r requirements/ci.txt # -r requirements/quality.txt # asgiref # astroid # edx-opaque-keys + # filelock # kombu # pylint tzdata==2023.3 diff --git a/requirements/doc.txt b/requirements/doc.txt index a9d09da..5c0e58c 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -72,28 +72,42 @@ click-repl==0.3.0 # -r requirements/test.txt # celery code-annotations==1.5.0 - # via -r requirements/test.txt + # via + # -r requirements/test.txt + # edx-toggles coverage[toml]==7.3.0 # via # -r requirements/test.txt # pytest-cov cryptography==41.0.3 # via secretstorage +ddt==1.6.0 + # via -r requirements/test.txt django==3.2.20 # via # -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt # -r requirements/test.txt # django-crum # django-waffle + # djangorestframework # edx-django-utils + # edx-toggles django-crum==0.7.9 # via # -r requirements/test.txt # edx-django-utils + # edx-toggles +django-rest-framework==0.1.0 + # via -r requirements/test.txt django-waffle==4.0.0 # via # -r requirements/test.txt # edx-django-utils + # edx-toggles +djangorestframework==3.14.0 + # via + # -r requirements/test.txt + # django-rest-framework doc8==1.1.1 # via -r requirements/doc.in docutils==0.19 @@ -104,9 +118,13 @@ docutils==0.19 # restructuredtext-lint # sphinx edx-django-utils==5.7.0 - # via -r requirements/test.txt + # via + # -r requirements/test.txt + # edx-toggles edx-opaque-keys==2.5.0 # via -r requirements/test.txt +edx-toggles==5.1.0 + # via -r requirements/test.txt exceptiongroup==1.1.3 # via # -r requirements/test.txt @@ -155,7 +173,7 @@ mdurl==0.1.2 # via markdown-it-py more-itertools==10.1.0 # via jaraco-classes -newrelic==8.11.0 +newrelic==9.0.0 # via # -r requirements/test.txt # edx-django-utils @@ -230,6 +248,7 @@ pytz==2023.3 # -r requirements/test.txt # babel # django + # djangorestframework pyyaml==6.0.1 # via # -r requirements/test.txt diff --git a/requirements/quality.txt b/requirements/quality.txt index d4cb6bc..4383630 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -68,10 +68,13 @@ code-annotations==1.5.0 # via # -r requirements/test.txt # edx-lint + # edx-toggles coverage[toml]==7.3.0 # via # -r requirements/test.txt # pytest-cov +ddt==1.6.0 + # via -r requirements/test.txt dill==0.3.7 # via pylint django==3.2.20 @@ -80,21 +83,35 @@ django==3.2.20 # -r requirements/test.txt # django-crum # django-waffle + # djangorestframework # edx-django-utils + # edx-toggles django-crum==0.7.9 # via # -r requirements/test.txt # edx-django-utils + # edx-toggles +django-rest-framework==0.1.0 + # via -r requirements/test.txt django-waffle==4.0.0 # via # -r requirements/test.txt # edx-django-utils + # edx-toggles +djangorestframework==3.14.0 + # via + # -r requirements/test.txt + # django-rest-framework edx-django-utils==5.7.0 - # via -r requirements/test.txt + # via + # -r requirements/test.txt + # edx-toggles edx-lint==5.3.4 # via -r requirements/quality.in edx-opaque-keys==2.5.0 # via -r requirements/test.txt +edx-toggles==5.1.0 + # via -r requirements/test.txt exceptiongroup==1.1.3 # via # -r requirements/test.txt @@ -127,7 +144,7 @@ markupsafe==2.1.3 # jinja2 mccabe==0.7.0 # via pylint -newrelic==8.11.0 +newrelic==9.0.0 # via # -r requirements/test.txt # edx-django-utils @@ -204,6 +221,7 @@ pytz==2023.3 # via # -r requirements/test.txt # django + # djangorestframework pyyaml==6.0.1 # via # -r requirements/test.txt diff --git a/requirements/test.in b/requirements/test.in index bb78ea0..7f7c862 100644 --- a/requirements/test.in +++ b/requirements/test.in @@ -7,3 +7,4 @@ pytest-cov # pytest extension for code coverage statistics pytest-django # pytest extension for better Django support code-annotations # provides commands used by the pii_check make target. responses # mocks for the requests library +ddt diff --git a/requirements/test.txt b/requirements/test.txt index a243a0e..36e58eb 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -57,27 +57,46 @@ click-repl==0.3.0 # -r requirements/base.txt # celery code-annotations==1.5.0 - # via -r requirements/test.in + # via + # -r requirements/base.txt + # -r requirements/test.in + # edx-toggles coverage[toml]==7.3.0 # via pytest-cov +ddt==1.6.0 + # via -r requirements/test.in # via # -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt # -r requirements/base.txt # django-crum # django-waffle + # djangorestframework # edx-django-utils + # edx-toggles django-crum==0.7.9 # via # -r requirements/base.txt # edx-django-utils + # edx-toggles +django-rest-framework==0.1.0 + # via -r requirements/base.txt django-waffle==4.0.0 # via # -r requirements/base.txt # edx-django-utils + # edx-toggles +djangorestframework==3.14.0 + # via + # -r requirements/base.txt + # django-rest-framework edx-django-utils==5.7.0 - # via -r requirements/base.txt + # via + # -r requirements/base.txt + # edx-toggles edx-opaque-keys==2.5.0 # via -r requirements/base.txt +edx-toggles==5.1.0 + # via -r requirements/base.txt exceptiongroup==1.1.3 # via pytest idna==3.4 @@ -87,14 +106,18 @@ idna==3.4 iniconfig==2.0.0 # via pytest jinja2==3.1.2 - # via code-annotations + # via + # -r requirements/base.txt + # code-annotations kombu==5.3.1 # via # -r requirements/base.txt # celery markupsafe==2.1.3 - # via jinja2 -newrelic==8.11.0 + # via + # -r requirements/base.txt + # jinja2 +newrelic==9.0.0 # via # -r requirements/base.txt # edx-django-utils @@ -139,13 +162,17 @@ python-dateutil==2.8.2 # -r requirements/base.txt # celery python-slugify==8.0.1 - # via code-annotations + # via + # -r requirements/base.txt + # code-annotations pytz==2023.3 # via # -r requirements/base.txt # django + # djangorestframework pyyaml==6.0.1 # via + # -r requirements/base.txt # code-annotations # responses requests==2.31.0 @@ -169,7 +196,9 @@ stevedore==5.1.0 # edx-django-utils # edx-opaque-keys text-unidecode==1.3 - # via python-slugify + # via + # -r requirements/base.txt + # python-slugify tomli==2.0.1 # via # coverage diff --git a/setup.cfg b/setup.cfg index d782599..c1422d2 100644 --- a/setup.cfg +++ b/setup.cfg @@ -5,6 +5,7 @@ line_length = 120 multi_line_output = 3 skip= migrations +profile = black [wheel] universal = 1 diff --git a/setup.py b/setup.py index 057db61..f3b99f9 100755 --- a/setup.py +++ b/setup.py @@ -115,6 +115,7 @@ def is_requirement(line): ), entry_points={ "lms.djangoapp": [ + "event-sink-clickhouse = event_sink_clickhouse.apps:EventSinkClickhouseConfig", ], "cms.djangoapp": [ "event-sink-clickhouse = event_sink_clickhouse.apps:EventSinkClickhouseConfig", diff --git a/test_utils/helpers.py b/test_utils/helpers.py index 9ff40f3..8ea02b6 100644 --- a/test_utils/helpers.py +++ b/test_utils/helpers.py @@ -14,7 +14,7 @@ from opaque_keys.edx.keys import CourseKey from opaque_keys.edx.locator import BlockUsageLocator -from event_sink_clickhouse.sinks.course_published import CoursePublishedSink +from event_sink_clickhouse.sinks.course_published import XBlockSink ORIG_IMPORT = __import__ ORG = "testorg" @@ -229,7 +229,7 @@ def match(request): assert dumped_json["advertised_start"] == str(course_overview.advertised_start) assert dumped_json["announcement"] == str(course_overview.announcement) - assert dumped_json["lowest_passing_grade"] == str(course_overview.lowest_passing_grade) + assert dumped_json["lowest_passing_grade"] == float(course_overview.lowest_passing_grade) assert dumped_json["invitation_only"] == course_overview.invitation_only assert dumped_json["max_student_enrollments_allowed"] == course_overview.max_student_enrollments_allowed assert dumped_json["effort"] == course_overview.effort @@ -307,12 +307,12 @@ def check_relationship_csv_matcher(course): for block in course: course_key = str(block.location.course_key) for _, child in enumerate(block.get_children()): - parent_node = str(CoursePublishedSink.strip_branch_and_version(block.location)) - child_node = str(CoursePublishedSink.strip_branch_and_version(child.location)) + parent_node = str(XBlockSink.strip_branch_and_version(block.location)) + child_node = str(XBlockSink.strip_branch_and_version(child.location)) relationships.append((course_key, parent_node, child_node)) def match(request): - body = request.body + body = request.body.decode("utf-8") lines = body.split("\n")[:-1] # The relationships CSV should have the same number of relationships as our test diff --git a/test_utils/test_settings.py b/test_utils/test_settings.py new file mode 100644 index 0000000..16e3cdb --- /dev/null +++ b/test_utils/test_settings.py @@ -0,0 +1,44 @@ + +""" +These settings are here to use during tests, because django requires them. + +In a real-world use case, apps in this project are installed into other +Django applications, so these settings will not be used. +""" + +DATABASES = { + "default": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": "default.db", + "USER": "", + "PASSWORD": "", + "HOST": "", + "PORT": "", + }, + "read_replica": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": "read_replica.db", + "USER": "", + "PASSWORD": "", + "HOST": "", + "PORT": "", + }, +} + + +INSTALLED_APPS = ( + "event_sink_clickhouse", +) + +EVENT_SINK_CLICKHOUSE_MODEL_CONFIG = { + "user_profile": { + "module": "common.djangoapps.student.models", + "model": "UserProfile", + }, + "course_overviews": { + "module": "openedx.core.djangoapps.content.course_overviews.models", + "model": "CourseOverview", + } +} + +EVENT_SINK_CLICKHOUSE_COURSE_OVERVIEWS_ENABLED = True diff --git a/test_utils/test_utils.py b/test_utils/test_utils.py new file mode 100644 index 0000000..8dd2f4d --- /dev/null +++ b/test_utils/test_utils.py @@ -0,0 +1,73 @@ +""" +Test utils. +""" +import unittest +from unittest.mock import Mock, patch + +from django.conf import settings + +from event_sink_clickhouse.utils import get_model + + +class TestUtils(unittest.TestCase): + """ + Test utils + """ + + @patch("event_sink_clickhouse.utils.import_module") + @patch.object( + settings, + "EVENT_SINK_CLICKHOUSE_MODEL_CONFIG", + {"my_model": {"module": "myapp.models", "model": "MyModel"}}, + ) + @patch("event_sink_clickhouse.utils.log") + def test_get_model_success(self, mock_log, mock_import_module): + mock_model = Mock(__name__="MyModel") + mock_import_module.return_value = Mock(MyModel=mock_model) + + model = get_model("my_model") + + mock_import_module.assert_called_once_with("myapp.models") + self.assertIsNotNone(model) + self.assertEqual(model.__name__, "MyModel") + mock_log.assert_not_called() + + @patch.object( + settings, + "EVENT_SINK_CLICKHOUSE_MODEL_CONFIG", + {"my_model": {"module": "myapp.models", "model": "NonExistentModel"}}, + ) + def test_get_model_non_existent_model(self): + model = get_model("my_model") + self.assertIsNone(model) + + @patch.object( + settings, + "EVENT_SINK_CLICKHOUSE_MODEL_CONFIG", + {"my_model": {"module": "non_existent_module", "model": "MyModel"}}, + ) + def test_get_model_non_existent_module(self): + model = get_model("my_model") + + self.assertIsNone(model) + + @patch.object( + settings, "EVENT_SINK_CLICKHOUSE_MODEL_CONFIG", {"my_model": {"module": ""}} + ) + def test_get_model_missing_module_and_model(self): + model = get_model("my_model") + self.assertIsNone(model) + + @patch.object(settings, "EVENT_SINK_CLICKHOUSE_MODEL_CONFIG", {}) + def test_get_model_missing_module_and_model_2(self): + model = get_model("my_model") + self.assertIsNone(model) + + @patch.object( + settings, + "EVENT_SINK_CLICKHOUSE_MODEL_CONFIG", + {"my_model": {"module": "myapp.models"}}, + ) + def test_get_model_missing_model_config(self): + model = get_model("my_model") + self.assertIsNone(model) diff --git a/tests/commands/test_dump_courses_command.py b/tests/commands/test_dump_courses_command.py index c07b088..56946ba 100644 --- a/tests/commands/test_dump_courses_command.py +++ b/tests/commands/test_dump_courses_command.py @@ -19,9 +19,9 @@ def mock_common_calls(): """ command_path = "event_sink_clickhouse.management.commands.dump_courses_to_clickhouse" with patch(command_path+".dump_course_to_clickhouse") as mock_dump_course: - with patch(command_path+".CoursePublishedSink._get_course_overview_model") as mock_get_course_overview_model: - with patch(command_path+".CoursePublishedSink._get_modulestore") as mock_modulestore: - with patch(command_path+".CoursePublishedSink.get_course_last_dump_time") as mock_last_dump_time: + with patch(command_path+".CourseOverviewSink.get_model") as mock_get_course_overview_model: + with patch("event_sink_clickhouse.sinks.course_published.get_modulestore") as mock_modulestore: + with patch(command_path+".CourseOverviewSink.get_last_dumped_timestamp") as mock_last_dump_time: # Set a reasonable default last dump time a year in the past mock_last_dump_time.return_value = \ (datetime.now() - timedelta(days=365)).strftime("%Y-%m-%d %H:%M:%S.%f+00:00") @@ -92,7 +92,7 @@ def dump_command_basic_options(): expected_num_submitted=0, expected_logs=[ "0 courses submitted for export to ClickHouse. 1 courses skipped.", - "Course is explicitly skipped" + "Course Overview is explicitly skipped" ] ), CommandOptions( @@ -142,7 +142,9 @@ def test_dump_courses_options( ) # Make sure that our mocks were called as expected - assert mock_modulestore.call_count == 1 + if "courses" not in option_combination: + # Modulestore will only be called here if we're not passing in a list of courses + assert mock_modulestore.call_count == 1 assert mock_dump_course.apply_async.call_count == expected_num_submitted for expected_output in expected_outputs: assert expected_output in caplog.text diff --git a/tests/commands/test_dump_data_to_clickhouse.py b/tests/commands/test_dump_data_to_clickhouse.py new file mode 100644 index 0000000..db124a6 --- /dev/null +++ b/tests/commands/test_dump_data_to_clickhouse.py @@ -0,0 +1,176 @@ +""" +Tests for the dump_data_to_clickhouse management command. +""" + +from collections import namedtuple +from datetime import datetime, timedelta +from unittest.mock import patch + +import django.core.management.base +import pytest +from django.core.management import call_command + +from event_sink_clickhouse.sinks.base_sink import ModelBaseSink + +CommandOptions = namedtuple( + "TestCommandOptions", ["options", "expected_num_submitted", "expected_logs"] +) + + +def dummy_model_factory(): + """ + Create a dummy model for testing. + """ + + class DummyModel: + """ + Dummy model for testing. + """ + + def __init__(self, id): + self.id = id + self.created = datetime.now() + + return DummyModel + + +def dummy_serializer_factory(): + """ + Create a dummy serializer for testing. + """ + + class DummySerializer: + """ + Dummy serializer for testing. + """ + + def __init__(self, model): + self.model = model + + @property + def data(self): + return {"id": self.model.id, "created": self.model.created} + + return DummySerializer + + +class DummySink(ModelBaseSink): + """ + Dummy sink for testing. + """ + + name = "Dummy" + model = "dummy" + unique_key = "id" + serializer_class = dummy_serializer_factory() + timestamp_field = "created" + clickhouse_table_name = "dummy_table" + + def get_queryset(self): + return [dummy_model_factory()(id) for id in range(1, 5)] + + def convert_id(self, item_id): + return int(item_id) + + def should_dump_item(self, unique_key): + if unique_key % 2 == 0: + return True, "Even number" + else: + return False, "Odd number" + + +def dump_command_basic_options(): + """ + Pytest params for all the different non-ClickHouse command options. + """ + options = [ + CommandOptions( + options={"object": "dummy", "ids_to_skip": ["1", "2", "3", "4"]}, + expected_num_submitted=0, + expected_logs=[ + "submitted for export to ClickHouse", + ], + ), + CommandOptions( + options={"object": "dummy", "limit": 1}, + expected_num_submitted=1, + expected_logs=["Limit of 1 eligible objects has been reached, quitting!"], + ), + CommandOptions( + options={"object": "dummy", "ids": ["1", "2", "3", "4"]}, + expected_num_submitted=2, + expected_logs=[ + "These objects were submitted for dump to ClickHouse successfully", + ], + ), + CommandOptions( + options={"object": "dummy", "force": True}, + expected_num_submitted=4, + expected_logs=["Force is set"], + ), + ] + + for option in options: + yield option + + +@pytest.mark.parametrize("test_command_option", dump_command_basic_options()) +@patch( + "event_sink_clickhouse.management.commands.dump_data_to_clickhouse.dump_data_to_clickhouse" +) +def test_dump_courses_options(mock_dump_data, test_command_option, caplog): + option_combination, expected_num_submitted, expected_outputs = test_command_option + + assert DummySink.model in [cls.model for cls in ModelBaseSink.__subclasses__()] + + call_command("dump_data_to_clickhouse", **option_combination) + + assert mock_dump_data.apply_async.call_count == expected_num_submitted + for expected_output in expected_outputs: + assert expected_output in caplog.text + + +def dump_basic_invalid_options(): + """ + Pytest params for all the different non-ClickHouse command options. + """ + options = [ + CommandOptions( + options={"object": "dummy", "limit": 1, "force": True}, + expected_num_submitted=1, + expected_logs=[], + ), + CommandOptions( + options={"object": "dummy", "limit": 1, "force": True}, + expected_num_submitted=1, + expected_logs=[], + ), + CommandOptions( + options={"object": "dummy", "limit": 0, "force": True}, + expected_num_submitted=1, + expected_logs=[], + ), + CommandOptions( + options={}, + expected_num_submitted=1, + expected_logs=[], + ), + ] + + for option in options: + yield option + + +@pytest.mark.parametrize("test_command_option", dump_basic_invalid_options()) +@patch( + "event_sink_clickhouse.management.commands.dump_data_to_clickhouse.dump_data_to_clickhouse" +) +def test_dump_courses_options_invalid(mock_dump_data, test_command_option, caplog): + option_combination, expected_num_submitted, expected_outputs = test_command_option + assert DummySink.model in [cls.model for cls in ModelBaseSink.__subclasses__()] + + with pytest.raises(django.core.management.base.CommandError): + call_command("dump_data_to_clickhouse", **option_combination) + # assert mock_dump_data.apply_async.call_count == expected_num_submitted + for expected_output in expected_outputs: + assert expected_output in caplog.text diff --git a/tests/test_base_sink.py b/tests/test_base_sink.py new file mode 100644 index 0000000..f7d8a57 --- /dev/null +++ b/tests/test_base_sink.py @@ -0,0 +1,301 @@ +""" +Tests for the base sinks. +""" +import json +import logging +from unittest.mock import MagicMock, Mock, patch + +import ddt +from django.test import TestCase +from django.test.utils import override_settings + +from event_sink_clickhouse.sinks.base_sink import BaseSink, ModelBaseSink + + +class ChildSink(ModelBaseSink): # pylint: disable=abstract-method + """ + Demo child sink. + """ + + nested_sinks = [MagicMock()] + model = "child_model" + unique_key = "id" + clickhouse_table_name = "child_model_table" + timestamp_field = "time_last_dumped" + name = "Child Model" + serializer_class = Mock() + + +@override_settings( + EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG={ + # URL to a running ClickHouse server's HTTP interface. ex: https://foo.openedx.org:8443/ or + # http://foo.openedx.org:8123/ . Note that we only support the ClickHouse HTTP interface + # to avoid pulling in more dependencies to the platform than necessary. + "url": "http://clickhouse:8123", + "username": "ch_cms", + "password": "TYreGozgtDG3vkoWPUHVVM6q", + "database": "event_sink", + "timeout_secs": 5, + }, + EVENT_SINK_CLICKHOUSE_MODEL_CONFIG={}, +) +@ddt.ddt +class TestModelBaseSink(TestCase): + """ + Tests for the ModelBaseSink. + """ + + def setUp(self): + """ + Set up the test suite. + """ + self.child_sink = ChildSink(connection_overrides={}, log=logging.getLogger()) + + @ddt.data( + (1, {"dump_id": 1, "time_last_dumped": "2020-01-01 00:00:00"}, False), + ( + [1, 2], + [ + {"dump_id": 1, "time_last_dumped": "2020-01-01 00:00:00"}, + {"dump_id": 2, "time_last_dumped": "2020-01-01 00:00:00"}, + ], + True, + ), + ) + @ddt.unpack + def test_dump(self, items_id, serialized_items, many): + """ + Test that the serialization/send logic is called correctly with many=True and many=False. + """ + self.child_sink.send_item_and_log = Mock() + self.child_sink.serialize_item = Mock(return_value=serialized_items) + self.child_sink.get_object = Mock(return_value=items_id) + + self.child_sink.dump(items_id, many=many) + + self.child_sink.serialize_item.assert_called_once_with( + items_id, many=many, initial=None + ) + self.child_sink.send_item_and_log.assert_called_once_with( + items_id, self.child_sink.serialize_item.return_value, many + ) + + def test_send_item_and_log(self): + """ + Test that send_item is called correctly. + """ + item = Mock(id=1) + self.child_sink.send_item = Mock() + serialized_item = {"dump_id": 1, "time_last_dumped": "2020-01-01 00:00:00"} + + self.child_sink.send_item_and_log(item.id, serialized_item, many=False) + + self.child_sink.send_item.assert_called_once_with(serialized_item, many=False) + + def test_serialize_item(self): + """ + Test that serialize_item() returns the correct serialized data. + """ + item = Mock(id=1) + serialized_item = {"dump_id": 1, "time_last_dumped": "2020-01-01 00:00:00"} + self.child_sink.get_serializer = Mock(data=serialized_item) + self.child_sink.send_item_and_log = Mock() + + serialized_item = self.child_sink.serialize_item(item, many=False, initial=None) + + self.child_sink.get_serializer.return_value.assert_called_once_with( + item, + many=False, + initial=None, + ) + self.assertEqual( + serialized_item, + self.child_sink.get_serializer.return_value.return_value.data, + ) + + @patch("event_sink_clickhouse.sinks.base_sink.io") + @patch("event_sink_clickhouse.sinks.base_sink.requests") + @ddt.data( + ({"dump_id": 1, "time_last_dumped": "2020-01-01 00:00:00"}, False), + ( + [ + {"dump_id": 1, "time_last_dumped": "2020-01-01 00:00:00"}, + {"dump_id": 2, "time_last_dumped": "2020-01-01 00:00:00"}, + ], + True, + ), + ) + @ddt.unpack + def test_send_items(self, serialized_items, many, mock_requests, mock_io): + """ + Test that send_item() calls the correct requests. + """ + params = self.child_sink.CLICKHOUSE_BULK_INSERT_PARAMS.copy() + params["query"] = "INSERT INTO event_sink.child_model_table FORMAT CSV" + self.child_sink._send_clickhouse_request = ( # pylint: disable=protected-access + Mock() + ) + data = "1,2020-01-01 00:00:00\n2,2020-01-01 00:00:00\n" + mock_io.StringIO.return_value.getvalue.return_value.encode.return_value = data + + self.child_sink.send_item(serialized_items, many=many) + + mock_requests.Request.assert_called_once_with( + "POST", + self.child_sink.ch_url, + data=data, + params=params, + auth=self.child_sink.ch_auth, + ) + self.child_sink._send_clickhouse_request( # pylint: disable=protected-access + mock_requests.Request.return_value, + expected_insert_rows=len(serialized_items), + ) + + def test_init(self): + # Mock the required fields + connection_overrides = {} + log = MagicMock() + + # Test without all required fields + with self.assertRaises(NotImplementedError): + sink = ModelBaseSink(connection_overrides, log) + self.assertIsInstance(sink, ModelBaseSink) + + def fetch_target_items(self): + """ + Test that fetch_target_items() returns the correct data. + """ + + def test_get_last_dumped_timestamp(self): + """ + Test that get_last_dumped_timestamp() returns the correct data. + """ + + @override_settings( + EVENT_SINK_CLICKHOUSE_MODEL_CONFIG={ + "child_model": { + "module": "dummy.module", + "model": "dummy", + }, + } + ) + @patch("event_sink_clickhouse.sinks.base_sink.get_model") + def test_get_model(self, mock_get_model): + """ + Test that get_model() returns a query set. + """ + self.child_sink.get_model() + mock_get_model.assert_called_once_with("child_model") + + def test_get_queryset(self): + """ + Test that get_queryset() returns a query set. + """ + self.child_sink.get_model = Mock() + self.child_sink.get_queryset() + self.child_sink.get_model.return_value.objects.all.assert_called_once() + + def test_nested_sink_dump_related(self): + """ + Test that dump_related() calls the correct methods. + """ + self.child_sink.dump = Mock() + with self.assertRaises(NotImplementedError): + self.child_sink.dump_related("foo", "bar", "baz") + + def test_get_serializer(self): + """ + Test that get_serializer() returns the correct serializer. + """ + serializer = self.child_sink.get_serializer() + self.assertEqual(serializer, self.child_sink.serializer_class) + + def test_convert_id(self): + """ + Test that convert_id() returns the correct data. + """ + self.assertEqual(self.child_sink.convert_id(1), 1) + + def test_should_dump_item(self): + """ + Test that should_dump_item() returns the correct data. + """ + self.assertEqual(self.child_sink.should_dump_item(1), (True, "No reason")) + + @patch("event_sink_clickhouse.sinks.base_sink.WaffleFlag.is_enabled") + def test_is_not_enabled_waffle(self, mock_waffle_flag_is_enabled): + """ + Test that is_enable() returns the correct data. + """ + mock_waffle_flag_is_enabled.return_value = False + self.assertEqual(self.child_sink.__class__.is_enabled(), False) + + @patch("event_sink_clickhouse.sinks.base_sink.WaffleFlag.is_enabled") + def test_is_enabled_waffle(self, mock_waffle_flag_is_enabled): + """ + Test that is_enable() returns the correct data. + """ + mock_waffle_flag_is_enabled.return_value = True + self.assertEqual(self.child_sink.__class__.is_enabled(), True) + + @override_settings(EVENT_SINK_CLICKHOUSE_CHILD_MODEL_ENABLED=True) + def test_is_enabled(self): + """ + Test that is_enable() returns the correct data. + """ + self.assertEqual(self.child_sink.is_enabled(), True) + + @patch("requests.Session.send") + def test_expected_insert_rows_mismatch(self, mock_send): + mock_response = Mock() + mock_response.headers = { + "X-ClickHouse-Summary": json.dumps({"written_rows": 5}) + } + mock_send.return_value = mock_response + + connection_overrides = { + "url": "mocked_url", + "username": "mocked_username", + "password": "mocked_password", + "database": "mocked_database", + "timeout_secs": 5, + } + + mock_log = Mock(error=Mock()) + + base_sink_instance = BaseSink(connection_overrides, log=mock_log) + request = Mock() + request.prepare.return_value = Mock() + + base_sink_instance._send_clickhouse_request( # pylint: disable=protected-access + request, expected_insert_rows=10 + ) + mock_log.error.assert_called_once() + + @patch("requests.Session.send") + def test_expected_insert_rows_match(self, mock_send): + mock_response = Mock() + mock_response.headers = { + "X-ClickHouse-Summary": json.dumps({"written_rows": 8}) + } + mock_send.return_value = mock_response + + connection_overrides = { + "url": "mocked_url", + "username": "mocked_username", + "password": "mocked_password", + "database": "mocked_database", + "timeout_secs": 5, + } + + mock_log = Mock() + + base_sink_instance = BaseSink(connection_overrides, log=mock_log) + request = Mock() + request.prepare.return_value = Mock() + + response = base_sink_instance._send_clickhouse_request( # pylint: disable=protected-access + request, expected_insert_rows=8 + ) + self.assertEqual(response, mock_response) diff --git a/tests/test_course_published.py b/tests/test_course_published.py index c93c902..66b8952 100644 --- a/tests/test_course_published.py +++ b/tests/test_course_published.py @@ -3,17 +3,17 @@ """ import json import logging -import uuid from datetime import datetime from unittest.mock import patch import pytest import requests import responses +from django.test.utils import override_settings from responses import matchers from responses.registries import OrderedRegistry -from event_sink_clickhouse.sinks.course_published import CoursePublishedSink +from event_sink_clickhouse.sinks.course_published import CourseOverviewSink from event_sink_clickhouse.tasks import dump_course_to_clickhouse from test_utils.helpers import ( check_block_csv_matcher, @@ -29,10 +29,12 @@ @responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter -@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_course_overview_model") -@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_detached_xblock_types") -@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_modulestore") -def test_course_publish_success(mock_modulestore, mock_detached, mock_overview): +@override_settings(EVENT_SINK_CLICKHOUSE_COURSE_OVERVIEW_ENABLED=True) +@patch("event_sink_clickhouse.sinks.course_published.CourseOverviewSink.serialize_item") +@patch("event_sink_clickhouse.sinks.course_published.CourseOverviewSink.get_model") +@patch("event_sink_clickhouse.sinks.course_published.get_detached_xblock_types") +@patch("event_sink_clickhouse.sinks.course_published.get_modulestore") +def test_course_publish_success(mock_modulestore, mock_detached, mock_overview, mock_serialize_item): """ Test of a successful end-to-end run. """ @@ -41,6 +43,35 @@ def test_course_publish_success(mock_modulestore, mock_detached, mock_overview): course_overview = fake_course_overview_factory(modified=datetime.now()) mock_modulestore.return_value.get_items.return_value = course + json_fields = { + "advertised_start": str(course_overview.advertised_start), + "announcement": str(course_overview.announcement), + "lowest_passing_grade": float(course_overview.lowest_passing_grade), + "invitation_only": course_overview.invitation_only, + "max_student_enrollments_allowed": course_overview.max_student_enrollments_allowed, + "effort": course_overview.effort, + "enable_proctored_exams": course_overview.enable_proctored_exams, + "entrance_exam_enabled": course_overview.entrance_exam_enabled, + "external_id": course_overview.external_id, + "language": course_overview.language, + } + + mock_serialize_item.return_value = { + "org": course_overview.org, + "course_key": str(course_overview.id), + "display_name": course_overview.display_name, + "course_start": course_overview.start, + "course_end": course_overview.end, + "enrollment_start": course_overview.enrollment_start, + "enrollment_end": course_overview.enrollment_end, + "self_paced": course_overview.self_paced, + "course_data_json": json.dumps(json_fields), + "created": course_overview.created, + "modified": course_overview.modified, + "dump_id": "", + "time_last_dumped": "", + } + # Fake the "detached types" list since we can't import it here mock_detached.return_value = mock_detached_xblock_types() @@ -61,15 +92,15 @@ def test_course_publish_success(mock_modulestore, mock_detached, mock_overview): responses.post( "https://foo.bar/", match=[ - matchers.query_param_matcher(blocks_params), - check_block_csv_matcher(course) + matchers.query_param_matcher(relationships_params), + check_relationship_csv_matcher(course) ], ) responses.post( "https://foo.bar/", match=[ - matchers.query_param_matcher(relationships_params), - check_relationship_csv_matcher(course) + matchers.query_param_matcher(blocks_params), + check_block_csv_matcher(course) ], ) @@ -82,11 +113,12 @@ def test_course_publish_success(mock_modulestore, mock_detached, mock_overview): @responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter -@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_course_overview_model") -@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_detached_xblock_types") -@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_modulestore") +@patch("event_sink_clickhouse.sinks.course_published.CourseOverviewSink.serialize_item") +@patch("event_sink_clickhouse.sinks.course_published.CourseOverviewSink.get_model") +@patch("event_sink_clickhouse.sinks.course_published.get_detached_xblock_types") +@patch("event_sink_clickhouse.sinks.course_published.get_modulestore") # pytest:disable=unused-argument -def test_course_publish_clickhouse_error(mock_modulestore, mock_detached, mock_overview, caplog): +def test_course_publish_clickhouse_error(mock_modulestore, mock_detached, mock_overview, mock_serialize_item, caplog): """ Test the case where a ClickHouse POST fails. """ @@ -97,6 +129,35 @@ def test_course_publish_clickhouse_error(mock_modulestore, mock_detached, mock_o course_overview = fake_course_overview_factory(modified=datetime.now()) mock_overview.return_value.get_from_id.return_value = course_overview + json_fields = { + "advertised_start": str(course_overview.advertised_start), + "announcement": str(course_overview.announcement), + "lowest_passing_grade": float(course_overview.lowest_passing_grade), + "invitation_only": course_overview.invitation_only, + "max_student_enrollments_allowed": course_overview.max_student_enrollments_allowed, + "effort": course_overview.effort, + "enable_proctored_exams": course_overview.enable_proctored_exams, + "entrance_exam_enabled": course_overview.entrance_exam_enabled, + "external_id": course_overview.external_id, + "language": course_overview.language, + } + + mock_serialize_item.return_value = { + "org": course_overview.org, + "course_key": str(course_overview.id), + "display_name": course_overview.display_name, + "course_start": course_overview.start, + "course_end": course_overview.end, + "enrollment_start": course_overview.enrollment_start, + "enrollment_end": course_overview.enrollment_end, + "self_paced": course_overview.self_paced, + "course_data_json": json.dumps(json_fields), + "created": course_overview.created, + "modified": course_overview.modified, + "dump_id": "", + "time_last_dumped": "", + } + # This will raise an exception when we try to post to ClickHouse responses.post( "https://foo.bar/", @@ -109,16 +170,12 @@ def test_course_publish_clickhouse_error(mock_modulestore, mock_detached, mock_o with pytest.raises(requests.exceptions.RequestException): dump_course_to_clickhouse(course) - # Make sure everything was called as we expect - assert mock_modulestore.call_count == 1 - assert mock_detached.call_count == 1 - # Make sure our log messages went through. assert "Test Bad Request error" in caplog.text - assert f"Error trying to dump course {course} to ClickHouse!" in caplog.text + assert f"Error trying to dump Course Overview {course} to ClickHouse!" in caplog.text -@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_course_overview_model") +@patch("event_sink_clickhouse.sinks.course_published.CourseOverviewSink.get_model") def test_get_course_last_published(mock_overview): """ Make sure we get a valid date back from this in the expected format. @@ -131,13 +188,13 @@ def test_get_course_last_published(mock_overview): course_key = course_str_factory() # Confirm that the string date we get back is a valid date - last_published_date = CoursePublishedSink.get_course_last_published(course_key) + last_published_date = CourseOverviewSink(None, None).get_course_last_published(course_key) dt = datetime.strptime(last_published_date, "%Y-%m-%d %H:%M:%S.%f") assert dt @responses.activate(registry=OrderedRegistry) # pylint: disable=unexpected-keyword-arg,no-value-for-parameter -@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_course_overview_model") +@patch("event_sink_clickhouse.sinks.course_published.CourseOverviewSink.get_model") def test_no_last_published_date(mock_overview): """ Test that we get a None value back for courses that don't have a modified date. @@ -162,8 +219,8 @@ def test_no_last_published_date(mock_overview): ) # Confirm that the string date we get back is a valid date - sink = CoursePublishedSink(connection_overrides={}, log=logging.getLogger()) - should_dump_course, reason = sink.should_dump_course(course_key) + sink = CourseOverviewSink(connection_overrides={}, log=logging.getLogger()) + should_dump_course, reason = sink.should_dump_item(course_key) assert should_dump_course is False assert reason == "No last modified date in CourseOverview" @@ -183,8 +240,8 @@ def test_course_not_present_in_clickhouse(): ) # Confirm that the string date we get back is a valid date - sink = CoursePublishedSink(connection_overrides={}, log=logging.getLogger()) - last_published_date = sink.get_course_last_dump_time(course_key) + sink = CourseOverviewSink(connection_overrides={}, log=logging.getLogger()) + last_published_date = sink.get_last_dumped_timestamp(course_key) assert last_published_date is None @@ -204,22 +261,7 @@ def test_get_last_dump_time(): ) # Confirm that the string date we get back is a valid date - sink = CoursePublishedSink(connection_overrides={}, log=logging.getLogger()) - last_published_date = sink.get_course_last_dump_time(course_key) + sink = CourseOverviewSink(connection_overrides={}, log=logging.getLogger()) + last_published_date = sink.get_last_dumped_timestamp(course_key) dt = datetime.strptime(last_published_date, "%Y-%m-%d %H:%M:%S.%f+00:00") assert dt - - -def test_xblock_json_seralization(): - course = course_factory() - - for index, item in enumerate(course): - block = CoursePublishedSink.serialize_xblock( - item, - index, - mock_detached_xblock_types(), - str(uuid.uuid4()), - str(datetime.now()), - ) - block_json = json.loads(block['xblock_data_json']) - assert bool(int(block_json['graded'])) == item.graded diff --git a/tests/test_signals.py b/tests/test_signals.py new file mode 100644 index 0000000..081c0c2 --- /dev/null +++ b/tests/test_signals.py @@ -0,0 +1,36 @@ +""" +Tests for signal handlers. +""" +from unittest.mock import Mock, patch + +from django.test import TestCase + +from event_sink_clickhouse.signals import on_user_profile_updated, receive_course_publish + + +class SignalHandlersTestCase(TestCase): + """ + Test cases for signal handlers. + """ + + @patch("event_sink_clickhouse.tasks.dump_course_to_clickhouse") + def test_receive_course_publish(self, mock_dump_task): + """ + Test that receive_course_publish calls dump_course_to_clickhouse. + """ + sender = Mock() + course_key = "sample_key" + receive_course_publish(sender, course_key) + + mock_dump_task.delay.assert_called_once_with(course_key) + + @patch("event_sink_clickhouse.tasks.dump_user_profile_to_clickhouse") + def test_on_user_profile_updated(self, mock_dump_task): + """ + Test that on_user_profile_updated calls dump_user_profile_to_clickhouse. + """ + instance = Mock() + sender = Mock() + on_user_profile_updated(sender, instance) + + mock_dump_task.delay.assert_called_once_with(instance.id) diff --git a/tests/test_tasks.py b/tests/test_tasks.py new file mode 100644 index 0000000..776e179 --- /dev/null +++ b/tests/test_tasks.py @@ -0,0 +1,59 @@ +""" +Tests for the tasks module. +""" +import unittest +from unittest.mock import MagicMock, patch + +from event_sink_clickhouse.tasks import dump_data_to_clickhouse, dump_user_profile_to_clickhouse + + +class TestTasks(unittest.TestCase): + """ + Test cases for tasks. + """ + + @patch("event_sink_clickhouse.tasks.UserProfileSink.is_enabled", return_value=True) + @patch("event_sink_clickhouse.tasks.UserProfileSink") + @patch("event_sink_clickhouse.tasks.celery_log") + def test_dump_user_profile_to_clickhouse( + self, mock_celery_log, mock_UserProfileSink, mock_is_enabled + ): + # Mock the required objects and methods + mock_sink_instance = mock_UserProfileSink.return_value + mock_sink_instance.dump.return_value = None + + # Call the function + dump_user_profile_to_clickhouse( + "user_profile_id", connection_overrides={"param": "value"} + ) + + # Assertions + mock_is_enabled.assert_called_once() + mock_UserProfileSink.assert_called_once_with( + connection_overrides={"param": "value"}, log=mock_celery_log + ) + mock_sink_instance.dump.assert_called_once_with("user_profile_id") + + @patch("event_sink_clickhouse.tasks.import_module") + @patch("event_sink_clickhouse.tasks.celery_log") + def test_dump_data_to_clickhouse(self, mock_celery_log, mock_import_module): + # Mock the required objects and methods + mock_Sink_class = MagicMock() + mock_Sink_instance = mock_Sink_class.return_value + mock_Sink_instance.dump.return_value = None + mock_import_module.return_value = MagicMock(**{"sink_name": mock_Sink_class}) + + # Call the function + dump_data_to_clickhouse( + "sink_module", + "sink_name", + "object_id", + connection_overrides={"param": "value"}, + ) + + # Assertions + mock_import_module.assert_called_once_with("sink_module") + mock_Sink_class.assert_called_once_with( + connection_overrides={"param": "value"}, log=mock_celery_log + ) + mock_Sink_instance.dump.assert_called_once_with("object_id") diff --git a/tox.ini b/tox.ini index 10048eb..4842bb2 100644 --- a/tox.ini +++ b/tox.ini @@ -30,11 +30,13 @@ ignore = D101,D200,D203,D212,D215,D404,D405,D406,D407,D408,D409,D410,D411,D412,D match-dir = (?!migrations) [pytest] -DJANGO_SETTINGS_MODULE = test_settings +DJANGO_SETTINGS_MODULE = test_utils.test_settings addopts = --cov event_sink_clickhouse --cov-report term-missing --cov-report xml --log-level=INFO norecursedirs = .* docs requirements site-packages [testenv] +setenv: + DJANGO_SETTINGS_MODULE = test_utils.test_settings deps = django32: Django>=3.2,<4.0 django40: Django>=4.0,<4.1 @@ -45,7 +47,7 @@ commands = [testenv:docs] setenv = - DJANGO_SETTINGS_MODULE = test_settings + DJANGO_SETTINGS_MODULE = test_utils.test_settings PYTHONPATH = {toxinidir} # Adding the option here instead of as a default in the docs Makefile because that Makefile is generated by shpinx. SPHINXOPTS = -W