Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
feat: Add event listener for course publish
Browse files Browse the repository at this point in the history
Creates the edx-platform plugin plumbing, adds some new requirements, maps the appropriate Django Signal to push course structure to ClickHouse.
  • Loading branch information
bmtcril committed Apr 24, 2023
1 parent 2a55abd commit 0d07506
Show file tree
Hide file tree
Showing 14 changed files with 807 additions and 11 deletions.
39 changes: 39 additions & 0 deletions event_sink_clickhouse/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

from django.apps import AppConfig
from edx_django_utils.plugins import PluginSettings, PluginSignals


class EventSinkClickhouseConfig(AppConfig):
Expand All @@ -11,3 +12,41 @@ class EventSinkClickhouseConfig(AppConfig):
"""

name = 'event_sink_clickhouse'
verbose_name = "Event Sink ClickHouse"

plugin_app = {
PluginSettings.CONFIG: {
'lms.djangoapp': {
'production': {PluginSettings.RELATIVE_PATH: 'settings.production'},
'common': {PluginSettings.RELATIVE_PATH: 'settings.common'},
'devstack': {PluginSettings.RELATIVE_PATH: 'settings.devstack'},
},
'cms.djangoapp': {
'production': {PluginSettings.RELATIVE_PATH: 'settings.production'},
'common': {PluginSettings.RELATIVE_PATH: 'settings.common'},
'devstack': {PluginSettings.RELATIVE_PATH: 'settings.devstack'},
}
},
# Configuration setting for Plugin Signals for this app.
PluginSignals.CONFIG: {
# Configure the Plugin Signals for each Project Type, as needed.
'cms.djangoapp': {
# List of all plugin Signal receivers for this app and project type.
PluginSignals.RECEIVERS: [{
# The name of the app's signal receiver function.
PluginSignals.RECEIVER_FUNC_NAME: 'listen_for_course_publish',

# The full path to the module where the signal is defined.
PluginSignals.SIGNAL_PATH: 'xmodule.modulestore.django.SignalHandler.course_published',
}],
}
},
}

def ready(self):
"""
Import our Celery tasks for initialization
"""
super().ready()

from . import tasks
190 changes: 190 additions & 0 deletions event_sink_clickhouse/destination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import csv
import io

from django.conf import settings
from django.utils import timezone

import requests


class ClickHouseDestination:
connection_overrides = None
log = None

def __init__(self, connection_overrides, log):

self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"]
self.ch_auth = (settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["user"],
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"])

self.connection_overrides = connection_overrides
self.log = log

@staticmethod
def strip_branch_and_version(location):
"""
Removes the branch and version information from a location.
Args:
location: an xblock's location.
Returns: that xblock's location without branch and version information.
"""
return location.for_branch(None)

@staticmethod
def get_course_last_published(course_key):
"""
Approximately when was a course last published?
We use the 'modified' column in the CourseOverview table as a quick and easy
(although perhaps inexact) way of determining when a course was last
published. This works because CourseOverview rows are re-written upon
course publish.
Args:
course_key: a CourseKey
Returns: The datetime the course was last published at, stringified.
Uses Python's default str(...) implementation for datetimes, which
is sortable and similar to ISO 8601:
https://docs.python.org/3/library/datetime.html#datetime.date.__str__
"""
# Import is placed here to avoid model import at project startup.
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview

approx_last_published = CourseOverview.get_from_id(course_key).modified
return str(approx_last_published)

@staticmethod
def serialize_item(item, index):
"""
Args:
item: an XBlock
index: a number indicating where the item falls in the course hierarchy
Returns:
fields: a *limited* dictionary of an XBlock's field names and values
block_type: the name of the XBlock's type (i.e. 'course'
or 'problem')
"""
from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES

course_key = item.scope_ids.usage_id.course_key
block_type = item.scope_ids.block_type

rtn_fields = {
'org': course_key.org,
'course_key': str(course_key),
'course': course_key.course,
'run': course_key.run,
'location': str(item.location),
'display_name': item.display_name_with_default.replace("'", "\'"),
'block_type': block_type,
'detached': 1 if block_type in DETACHED_XBLOCK_TYPES else 0,
'edited_on': str(getattr(item, 'edited_on', '')),
'time_last_dumped': str(timezone.now()),
'order': index,
}

return rtn_fields, block_type

def serialize_course(self, course_id):
"""
Serializes a course into a CSV of nodes and relationships.
Args:
course_id: CourseKey of the course we want to serialize
Returns:
nodes: a csv of nodes for the course
relationships: a csv of relationships between nodes
"""
# Import is placed here to avoid model import at project startup.
from xmodule.modulestore.django import modulestore

# create a location to node mapping we'll need later for
# writing relationships
location_to_node = {}
items = modulestore().get_items(course_id)

# create nodes
i = 0
for item in items:
i += 1
fields, block_type = self.serialize_item(item, i)
location_to_node[self.strip_branch_and_version(item.location)] = fields

# create relationships
relationships = []
for item in items:
for index, child in enumerate(item.get_children()):
parent_node = location_to_node.get(self.strip_branch_and_version(item.location))
child_node = location_to_node.get(self.strip_branch_and_version(child.location))

if parent_node is not None and child_node is not None:
relationship = {
'course_key': str(course_id),
'parent_location': str(parent_node["location"]),
'child_location': str(child_node["location"]),
'order': index
}
relationships.append(relationship)

nodes = list(location_to_node.values())
return nodes, relationships

def dump(self, course_key):
nodes, relationships = self.serialize_course(course_key)
self.log.info(
"Now dumping %s to ClickHouse: %d nodes and %d relationships",
course_key,
len(nodes),
len(relationships),
)

course_string = str(course_key)

try:
# Params that begin with "param_" will be used in the query replacement
# all others are ClickHouse settings.
params = {
# Fail early on bulk inserts
"input_format_allow_errors_num": 1,
"input_format_allow_errors_ratio": 0.1,
}

# "query" is a special param for the query, it's the best way to get the FORMAT CSV in there.
params["query"] = "INSERT INTO coursegraph.coursegraph_nodes FORMAT CSV"

output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC)

for node in nodes:
writer.writerow(node.values())

response = requests.post(self.ch_url, data=output.getvalue(), params=params, auth=self.ch_auth)
self.log.info(response.headers)
self.log.info(response)
self.log.info(response.text)
response.raise_for_status()

# Just overwriting the previous query
params["query"] = "INSERT INTO coursegraph.coursegraph_relationships FORMAT CSV"
output = io.StringIO()
writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC)

for relationship in relationships:
writer.writerow(relationship.values())

response = requests.post(self.ch_url, data=output.getvalue(), params=params, auth=self.ch_auth)
self.log.info(response.headers)
self.log.info(response)
self.log.info(response.text)
response.raise_for_status()

self.log.info("Completed dumping %s to ClickHouse", course_key)

except Exception: # pylint: disable=broad-except
self.log.exception(
"Error trying to dump course %s to ClickHouse!",
course_string
)
Empty file.
16 changes: 16 additions & 0 deletions event_sink_clickhouse/settings/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
Default settings for the openedx_event_sink_clickhouse app.
"""


def plugin_settings(settings):
"""
Adds default settings
"""
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG = {
# URL to a running ClickHouse server's HTTP interface. ex: https://foo.openedx.org:8443/ or
# http://foo.openedx.org:8123/
"url": "http://clickhouse:8123",
"username": "ch_admin",
"password": "dazimUehjqKjWdpDNhRrGOfp"
}
Empty file.
16 changes: 16 additions & 0 deletions event_sink_clickhouse/signals/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""
Signal handler functions, mapped to specific signals in apps.py.
"""


def receive_course_publish(sender, course_key, **kwargs): # pylint: disable=unused-argument
"""
Receives publishing signal and performs publishing related workflows, such as
registering proctored exams, building up credit requirements, and performing
search indexing
"""
# import here, because signal is registered at startup, but items in tasks are not yet able to be loaded
from ..tasks import dump_course_to_clickhouse

course_key_str = str(course_key)
dump_course_to_clickhouse.delay(course_key_str)
31 changes: 31 additions & 0 deletions event_sink_clickhouse/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
This file contains a management command for exporting course modulestore data to ClickHouse
"""

import logging

from celery import shared_task
from edx_django_utils.cache import RequestCache
from edx_django_utils.monitoring import set_code_owner_attribute
from opaque_keys.edx.keys import CourseKey

from .destination import ClickHouseDestination

log = logging.getLogger(__name__)
celery_log = logging.getLogger('edx.celery.task')


@shared_task
@set_code_owner_attribute
def dump_course_to_clickhouse(course_key_string, connection_overrides=None):
"""
Serializes a course and writes it to neo4j.
Arguments:
course_key_string: course key for the course to be exported
connection_overrides (dict): overrides to ClickHouse connection
parameters specified in `settings.COURSEGRAPH_CONNECTION`.
"""
course_key = CourseKey.from_string(course_key_string)
destination = ClickHouseDestination(connection_overrides=connection_overrides, log=celery_log)
destination.dump(course_key)
5 changes: 4 additions & 1 deletion requirements/base.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Core requirements for using this application
-c constraints.txt

celery # Asynchronous task execution library
Django # Web application framework

requests # HTTP request library
edx-django-utils # Django utilities, we use caching and monitoring
edx-opaque-keys # Parsing library for course and usage keys
Loading

0 comments on commit 0d07506

Please sign in to comment.