Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
Merge pull request #28 from openedx/cag/abstract-event-sink
Browse files Browse the repository at this point in the history
feat: allow to extend event sink
  • Loading branch information
bmtcril committed Aug 29, 2023
2 parents 7be9b2a + 3e2c255 commit 3283e79
Show file tree
Hide file tree
Showing 35 changed files with 1,953 additions and 507 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ upgrade: ## update the requirements/*.txt files with the latest packages satisfy
quality: ## check coding style with pycodestyle and pylint
tox -e quality

format: ## format the files with black
black event_sink_clickhouse
isort event_sink_clickhouse

pii_check: ## check for PII annotations on all Django models
tox -e pii_check

Expand Down
2 changes: 1 addition & 1 deletion event_sink_clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
A sink for Open edX events to send them to ClickHouse.
"""

__version__ = '0.1.2'
__version__ = "0.2.0"
39 changes: 22 additions & 17 deletions event_sink_clickhouse/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,33 @@ class EventSinkClickhouseConfig(AppConfig):
Configuration for the event_sink_clickhouse Django application.
"""

name = 'event_sink_clickhouse'
name = "event_sink_clickhouse"
verbose_name = "Event Sink ClickHouse"

plugin_app = {
PluginSettings.CONFIG: {
'lms.djangoapp': {
'production': {PluginSettings.RELATIVE_PATH: 'settings.production'},
'common': {PluginSettings.RELATIVE_PATH: 'settings.common'},
"lms.djangoapp": {
"production": {PluginSettings.RELATIVE_PATH: "settings.production"},
"common": {PluginSettings.RELATIVE_PATH: "settings.common"},
},
"cms.djangoapp": {
"production": {PluginSettings.RELATIVE_PATH: "settings.production"},
"common": {PluginSettings.RELATIVE_PATH: "settings.common"},
},
'cms.djangoapp': {
'production': {PluginSettings.RELATIVE_PATH: 'settings.production'},
'common': {PluginSettings.RELATIVE_PATH: 'settings.common'},
}
},
# Configuration setting for Plugin Signals for this app.
PluginSignals.CONFIG: {
# Configure the Plugin Signals for each Project Type, as needed.
'cms.djangoapp': {
"cms.djangoapp": {
# List of all plugin Signal receivers for this app and project type.
PluginSignals.RECEIVERS: [{
# The name of the app's signal receiver function.
PluginSignals.RECEIVER_FUNC_NAME: 'receive_course_publish',

# The full path to the module where the signal is defined.
PluginSignals.SIGNAL_PATH: 'xmodule.modulestore.django.COURSE_PUBLISHED',
}],
PluginSignals.RECEIVERS: [
{
# The name of the app's signal receiver function.
PluginSignals.RECEIVER_FUNC_NAME: "receive_course_publish",
# The full path to the module where the signal is defined.
PluginSignals.SIGNAL_PATH: "xmodule.modulestore.django.COURSE_PUBLISHED",
}
],
}
},
}
Expand All @@ -47,4 +48,8 @@ def ready(self):
"""
super().ready()

from . import tasks # pylint: disable=import-outside-toplevel, unused-import
from event_sink_clickhouse import ( # pylint: disable=import-outside-toplevel, unused-import
signals,
sinks,
tasks,
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from django.core.management.base import BaseCommand, CommandError
from edx_django_utils.cache import RequestCache

from event_sink_clickhouse.sinks.course_published import CoursePublishedSink
from event_sink_clickhouse.sinks.course_published import CourseOverviewSink
from event_sink_clickhouse.tasks import dump_course_to_clickhouse

log = logging.getLogger(__name__)
Expand All @@ -36,7 +36,7 @@ def dump_target_courses_to_clickhouse(
course_keys=None,
courses_to_skip=None,
force=None,
limit=None
limit=None,
):
"""
Iterates through a list of courses in a modulestore, serializes them to csv,
Expand All @@ -49,19 +49,23 @@ def dump_target_courses_to_clickhouse(
Returns: two lists--one of the courses that had dump jobs queued for them
and one of courses that did not.
"""
sink = CoursePublishedSink(connection_overrides, log)
sink = CourseOverviewSink(connection_overrides, log)

submitted_courses = []
skipped_courses = []

index = 0
for course_key, should_be_dumped, reason in sink.fetch_target_courses(course_keys, courses_to_skip, force):
for course_key, should_be_dumped, reason in sink.fetch_target_items(
course_keys, courses_to_skip, force
):
log.info(f"Iteration {index}: {course_key}")
index += 1

if not should_be_dumped:
skipped_courses.append(course_key)
log.info(f"Course {index}: Skipping course {course_key}, reason: '{reason}'")
log.info(
f"Course {index}: Skipping course {course_key}, reason: '{reason}'"
)
else:
# RequestCache is a local memory cache used in modulestore for performance reasons.
# Normally it is cleared at the end of every request, but in this command it will
Expand All @@ -83,7 +87,9 @@ def dump_target_courses_to_clickhouse(
submitted_courses.append(str(course_key))

if limit and len(submitted_courses) == limit:
log.info(f"Limit of {limit} eligible course has been reached, quitting!")
log.info(
f"Limit of {limit} eligible course has been reached, quitting!"
)
break

return submitted_courses, skipped_courses
Expand All @@ -93,55 +99,56 @@ class Command(BaseCommand):
"""
Dump course block and relationship data to a ClickHouse instance.
"""

help = dedent(__doc__).strip()

def add_arguments(self, parser):
parser.add_argument(
'--url',
"--url",
type=str,
help="the URL of the ClickHouse server",
)
parser.add_argument(
'--username',
"--username",
type=str,
help="the username of the ClickHouse user",
)
parser.add_argument(
'--password',
"--password",
type=str,
help="the password of the ClickHouse user",
)
parser.add_argument(
'--database',
"--database",
type=str,
help="the database in ClickHouse to connect to",
)
parser.add_argument(
'--timeout_secs',
"--timeout_secs",
type=int,
help="timeout for ClickHouse requests, in seconds",
)
parser.add_argument(
'--courses',
metavar='KEY',
"--courses",
metavar="KEY",
type=str,
nargs='*',
nargs="*",
help="keys of courses to serialize; if omitted all courses in system are serialized",
)
parser.add_argument(
'--courses_to_skip',
metavar='KEY',
"--courses_to_skip",
metavar="KEY",
type=str,
nargs='*',
nargs="*",
help="keys of courses to NOT to serialize",
)
parser.add_argument(
'--force',
action='store_true',
"--force",
action="store_true",
help="dump all courses regardless of when they were last published",
)
parser.add_argument(
'--limit',
"--limit",
type=int,
help="maximum number of courses to dump, cannot be used with '--courses' or '--force'",
)
Expand All @@ -158,25 +165,29 @@ def handle(self, *args, **options):
}

courses = options["courses"] if options["courses"] else []
courses_to_skip = options["courses_to_skip"] if options["courses_to_skip"] else []
courses_to_skip = (
options["courses_to_skip"] if options["courses_to_skip"] else []
)

if options["limit"] is not None and int(options["limit"]) < 1:
message = "'limit' must be greater than 0!"
log.error(message)
raise CommandError(message)

if options["limit"] and options["force"]:
message = "The 'limit' option cannot be used with 'force' as running the " \
"command repeatedly will result in the same courses being dumped every time."
message = (
"The 'limit' option cannot be used with 'force' as running the "
"command repeatedly will result in the same courses being dumped every time."
)
log.error(message)
raise CommandError(message)

submitted_courses, skipped_courses = dump_target_courses_to_clickhouse(
connection_overrides,
[course_key.strip() for course_key in courses],
[course_key.strip() for course_key in courses_to_skip],
options['force'],
options['limit']
options["force"],
options["limit"],
)

log.info(
Expand All @@ -189,6 +200,6 @@ def handle(self, *args, **options):
log.info("No courses submitted for export to ClickHouse at all!")
else:
log.info( # pylint: disable=logging-not-lazy
"These courses were submitted for dump to ClickHouse successfully:\n\t" +
"\n\t".join(submitted_courses)
"These courses were submitted for dump to ClickHouse successfully:\n\t"
+ "\n\t".join(submitted_courses)
)
Loading

0 comments on commit 3283e79

Please sign in to comment.