Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
feat: Add event listener for course publish
Browse files Browse the repository at this point in the history
Creates the edx-platform plugin plumbing, adds some new requirements, maps the appropriate Django Signal to push course structure to ClickHouse.
  • Loading branch information
bmtcril committed May 1, 2023
1 parent aa4123d commit dee8304
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 34 deletions.
4 changes: 2 additions & 2 deletions event_sink_clickhouse/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ class EventSinkClickhouseConfig(AppConfig):

def ready(self):
"""
Import our Celery tasks for initialization
Import our Celery tasks for initialization.
"""
super().ready()

from . import tasks
from . import tasks # pylint: disable=import-outside-toplevel, unused-import
7 changes: 6 additions & 1 deletion event_sink_clickhouse/settings/production.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
def plugin_settings(settings): # pylint: disable=unused-argument
"""
Production settings for the openedx_event_sink_clickhouse app.
"""


def plugin_settings(settings):
"""
Override the default app settings with production settings.
"""
Expand Down
6 changes: 2 additions & 4 deletions event_sink_clickhouse/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@

def receive_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument
"""
Receives publishing signal and performs publishing related workflows, such as
registering proctored exams, building up credit requirements, and performing
search indexing
Receives COURSE_PUBLISHED signal and queues the dump job.
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from .tasks import dump_course_to_clickhouse
from .tasks import dump_course_to_clickhouse # pylint: disable=import-outside-toplevel

course_key_str = str(course_key)
dump_course_to_clickhouse.delay(course_key_str)
Empty file.
23 changes: 23 additions & 0 deletions event_sink_clickhouse/sinks/base_sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""
Base classes for event sinks
"""

from django.conf import settings


class BaseSink:
"""
Base class for ClickHouse event sink, allows overwriting of default settings
"""
connection_overrides = None
log = None

def __init__(self, connection_overrides, log):

self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"]
self.ch_auth = (settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"],
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"])
self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["clickhouse_database"]
self.ch_timeout_secs = 3
self.connection_overrides = connection_overrides
self.log = log
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
import csv
import io
"""
Handler for the CMS COURSE_PUBLISHED event
from django.conf import settings
from django.utils import timezone
Does the following:
- Pulls the course structure from modulestore
- Serialize the xblocks and their parent/child relationships
- Sends them to ClickHouse in CSV format
import requests
Note that the serialization format does not include all fields as there may be things like
LTI passwords and other secrets. We just take the fields necessary for reporting at this time.
"""

import csv
import io

class ClickHouseDestination:
connection_overrides = None
log = None
import requests
from django.utils import timezone

def __init__(self, connection_overrides, log):
from .base_sink import BaseSink

self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"]
self.ch_auth = (settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"],
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"])
self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["clickhouse_database"]
self.connection_overrides = connection_overrides
self.log = log

class CoursePublishedSink(BaseSink):
"""
Event sink for the COURSE_PUBLISHED signal
"""
@staticmethod
def strip_branch_and_version(location):
"""
Expand Down Expand Up @@ -49,6 +52,7 @@ def get_course_last_published(course_key):
https://docs.python.org/3/library/datetime.html#datetime.date.__str__
"""
# Import is placed here to avoid model import at project startup.
# pylint: disable=import-outside-toplevel,import-error
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview

approx_last_published = CourseOverview.get_from_id(course_key).modified
Expand All @@ -66,6 +70,7 @@ def serialize_item(item, index):
block_type: the name of the XBlock's type (i.e. 'course'
or 'problem')
"""
# pylint: disable=import-outside-toplevel,import-error
from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES

course_key = item.scope_ids.usage_id.course_key
Expand All @@ -85,7 +90,7 @@ def serialize_item(item, index):
'order': index,
}

return rtn_fields, block_type
return rtn_fields

def serialize_course(self, course_id):
"""
Expand All @@ -99,7 +104,7 @@ def serialize_course(self, course_id):
relationships: a csv of relationships between nodes
"""
# Import is placed here to avoid model import at project startup.
from xmodule.modulestore.django import modulestore
from xmodule.modulestore.django import modulestore # pylint: disable=import-outside-toplevel,import-error

# create a location to node mapping we'll need later for
# writing relationships
Expand All @@ -110,7 +115,7 @@ def serialize_course(self, course_id):
i = 0
for item in items:
i += 1
fields, block_type = self.serialize_item(item, i)
fields = self.serialize_item(item, i)
location_to_node[self.strip_branch_and_version(item.location)] = fields

# create relationships
Expand All @@ -133,6 +138,9 @@ def serialize_course(self, course_id):
return nodes, relationships

def dump(self, course_key):
"""
Do the serialization and send to ClickHouse
"""
nodes, relationships = self.serialize_course(course_key)
self.log.info(
"Now dumping %s to ClickHouse: %d nodes and %d relationships",
Expand Down Expand Up @@ -161,7 +169,8 @@ def dump(self, course_key):
for node in nodes:
writer.writerow(node.values())

response = requests.post(self.ch_url, data=output.getvalue(), params=params, auth=self.ch_auth)
response = requests.post(self.ch_url, data=output.getvalue(), params=params, auth=self.ch_auth,
timeout=self.ch_timeout_secs)
self.log.info(response.headers)
self.log.info(response)
self.log.info(response.text)
Expand All @@ -175,7 +184,8 @@ def dump(self, course_key):
for relationship in relationships:
writer.writerow(relationship.values())

response = requests.post(self.ch_url, data=output.getvalue(), params=params, auth=self.ch_auth)
response = requests.post(self.ch_url, data=output.getvalue(), params=params, auth=self.ch_auth,
timeout=self.ch_timeout_secs)
self.log.info(response.headers)
self.log.info(response)
self.log.info(response.text)
Expand Down
13 changes: 6 additions & 7 deletions event_sink_clickhouse/tasks.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
"""
This file contains a management command for exporting course modulestore data to ClickHouse
This file contains a management command for exporting course modulestore data to ClickHouse.
"""

import logging

from celery import shared_task
from edx_django_utils.cache import RequestCache
from edx_django_utils.monitoring import set_code_owner_attribute
from opaque_keys.edx.keys import CourseKey

from .destination import ClickHouseDestination
from .sinks.course_published import CoursePublishedSink

log = logging.getLogger(__name__)
celery_log = logging.getLogger('edx.celery.task')
Expand All @@ -19,13 +18,13 @@
@set_code_owner_attribute
def dump_course_to_clickhouse(course_key_string, connection_overrides=None):
"""
Serializes a course and writes it to neo4j.
Serialize a course and writes it to ClickHouse.
Arguments:
course_key_string: course key for the course to be exported
connection_overrides (dict): overrides to ClickHouse connection
parameters specified in `settings.COURSEGRAPH_CONNECTION`.
parameters specified in `settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG`.
"""
course_key = CourseKey.from_string(course_key_string)
destination = ClickHouseDestination(connection_overrides=connection_overrides, log=celery_log)
destination.dump(course_key)
sink = CoursePublishedSink(connection_overrides=connection_overrides, log=celery_log)
sink.dump(course_key)

0 comments on commit dee8304

Please sign in to comment.