Skip to content
This repository has been archived by the owner on Apr 3, 2024. It is now read-only.

Commit

Permalink
feat: Add event listener for course publish
Browse files Browse the repository at this point in the history
Creates the edx-platform plugin plumbing, adds some new requirements, maps the appropriate Django Signal to push course structure to ClickHouse.
  • Loading branch information
bmtcril committed May 2, 2023
1 parent b178733 commit bcb1391
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 24 deletions.
5 changes: 3 additions & 2 deletions event_sink_clickhouse/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ def plugin_settings(settings):
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG = {
# URL to a running ClickHouse server's HTTP interface. ex: https://foo.openedx.org:8443/ or
# http://foo.openedx.org:8123/ . Note that we only support the ClickHouse HTTP interface
# to avoid pulling in more dependencies to the platrform than necessary.
# to avoid pulling in more dependencies to the platform than necessary.
"url": "http://clickhouse:8123",
"username": "changeme",
"password": "changeme",
"clickhouse_database": "event_sink",
"database": "event_sink",
"timeout_secs": 3,
}
19 changes: 11 additions & 8 deletions event_sink_clickhouse/sinks/base_sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,18 @@ class BaseSink:
"""
Base class for ClickHouse event sink, allows overwriting of default settings
"""
connection_overrides = None
log = None

def __init__(self, connection_overrides, log):

self.log = log
self.ch_url = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["url"]
self.ch_auth = (settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["username"],
settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["password"])
self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["clickhouse_database"]
self.ch_timeout_secs = 3
self.connection_overrides = connection_overrides
self.log = log
self.ch_database = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["database"]
self.ch_timeout_secs = settings.EVENT_SINK_CLICKHOUSE_BACKEND_CONFIG["timeout_secs"]

# If any overrides to the ClickHouse connection
if connection_overrides:
self.ch_url = connection_overrides.get("url", self.ch_url)
self.ch_auth = (connection_overrides.get("username", self.ch_auth[0]),
connection_overrides.get("password", self.ch_auth[1]))
self.ch_database = connection_overrides.get("database", self.ch_database)
self.ch_timeout_secs = connection_overrides.get("timeout_secs", self.ch_timeout_secs)
45 changes: 31 additions & 14 deletions event_sink_clickhouse/sinks/course_published.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,25 @@ class CoursePublishedSink(BaseSink):
"""
Event sink for the COURSE_PUBLISHED signal
"""
@staticmethod
def _get_detached_xblock_types():
# pylint: disable=import-outside-toplevel,import-error
from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES
return DETACHED_XBLOCK_TYPES

@staticmethod
def _get_modulestore():
# Import is placed here to avoid model import at project startup.
from xmodule.modulestore.django import modulestore # pylint: disable=import-outside-toplevel,import-error
return modulestore()

@staticmethod
def _get_course_overview_model():
# Import is placed here to avoid model import at project startup.
# pylint: disable=import-outside-toplevel,import-error
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview
return CourseOverview

@staticmethod
def strip_branch_and_version(location):
"""
Expand Down Expand Up @@ -51,15 +70,14 @@ def get_course_last_published(course_key):
is sortable and similar to ISO 8601:
https://docs.python.org/3/library/datetime.html#datetime.date.__str__
"""
# Import is placed here to avoid model import at project startup.
# pylint: disable=import-outside-toplevel,import-error
from openedx.core.djangoapps.content.course_overviews.models import CourseOverview

CourseOverview = CoursePublishedSink._get_course_overview_model()
approx_last_published = CourseOverview.get_from_id(course_key).modified
print(approx_last_published)
raise Exception("foo")
return str(approx_last_published)

@staticmethod
def serialize_item(item, index):
def serialize_item(item, index, detached_xblock_types):
"""
Args:
item: an XBlock
Expand All @@ -70,9 +88,6 @@ def serialize_item(item, index):
block_type: the name of the XBlock's type (i.e. 'course'
or 'problem')
"""
# pylint: disable=import-outside-toplevel,import-error
from xmodule.modulestore.store_utilities import DETACHED_XBLOCK_TYPES

course_key = item.scope_ids.usage_id.course_key
block_type = item.scope_ids.block_type

Expand All @@ -84,9 +99,9 @@ def serialize_item(item, index):
'location': str(item.location),
'display_name': item.display_name_with_default.replace("'", "\'"),
'block_type': block_type,
'detached': 1 if block_type in DETACHED_XBLOCK_TYPES else 0,
'detached': 1 if block_type in detached_xblock_types else 0,
'edited_on': str(getattr(item, 'edited_on', '')),
'time_last_dumped': str(timezone.now()),
'time_last_dumped': str(timezone.now()),
'order': index,
}

Expand All @@ -103,19 +118,19 @@ def serialize_course(self, course_id):
nodes: a list of dicts representing xblocks for the course
relationships: a list of dicts representing relationships between nodes
"""
# Import is placed here to avoid model import at project startup.
from xmodule.modulestore.django import modulestore # pylint: disable=import-outside-toplevel,import-error
modulestore = CoursePublishedSink._get_modulestore()
detached_xblock_types = CoursePublishedSink._get_detached_xblock_types()

# create a location to node mapping we'll need later for
# writing relationships
location_to_node = {}
items = modulestore().get_items(course_id)
items = modulestore.get_items(course_id)

# create nodes
i = 0
for item in items:
i += 1
fields = self.serialize_item(item, i)
fields = self.serialize_item(item, i, detached_xblock_types)
location_to_node[self.strip_branch_and_version(item.location)] = fields

# create relationships
Expand All @@ -142,6 +157,7 @@ def dump(self, course_key):
Do the serialization and send to ClickHouse
"""
nodes, relationships = self.serialize_course(course_key)

self.log.info(
"Now dumping %s to ClickHouse: %d nodes and %d relationships",
course_key,
Expand Down Expand Up @@ -198,3 +214,4 @@ def dump(self, course_key):
"Error trying to dump course %s to ClickHouse!",
course_string
)
raise
157 changes: 157 additions & 0 deletions test_utils/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import csv
import random
import string
from datetime import datetime
from io import StringIO
from unittest.mock import MagicMock, Mock

from opaque_keys.edx.keys import CourseKey
from opaque_keys.edx.locator import BlockUsageLocator

from event_sink_clickhouse.sinks.course_published import CoursePublishedSink

ORIG_IMPORT = __import__
ORG = "testorg"
COURSE = "testcourse"
COURSE_RUN = "2023_Fall"


class FakeXBlock:
def __init__(self, identifier, detached_block=False):
self.block_type = "course_info" if detached_block else "vertical"
self.scope_ids = Mock()
self.scope_ids.usage_id.course_key = course_key_factory()
self.scope_ids.block_type = self.block_type
self.location = block_usage_locator_factory()
self.display_name_with_default = f"Display name {identifier}"
self.edited_on = datetime.now()
self.children = []

def get_children(self):
return self.children


def course_str_factory():
course_str = f"course-v1:{ORG}+{COURSE}+{COURSE_RUN}"
return course_str


def course_key_factory():
return CourseKey.from_string(course_str_factory())


def block_usage_locator_factory():
block_id = ''.join(random.choices(string.ascii_letters, k=10))
return BlockUsageLocator(course_key_factory(), block_type="category", block_id=block_id, deprecated=True)


def mock_course_overview():
mock_overview = MagicMock()
mock_overview.get_from_id = MagicMock()
mock_overview.get_from_id.return_value = datetime.now()
# CourseOverview.get_from_id(course_key).modified
return mock_overview


def mock_detached_xblock_types():
# Current values as of 2023-05-01
return {'static_tab', 'about', 'course_info'}


def get_clickhouse_http_params():
blocks_params = {
"input_format_allow_errors_num": 1,
"input_format_allow_errors_ratio": 0.1,
"query": "INSERT INTO cool_data.course_blocks FORMAT CSV"
}
relationships_params = {
"input_format_allow_errors_num": 1,
"input_format_allow_errors_ratio": 0.1,
"query": "INSERT INTO cool_data.course_relationships FORMAT CSV"
}

return blocks_params, relationships_params


def course_factory():
top_block = FakeXBlock("top")
course = [top_block, ]

for i in range(3):
block = FakeXBlock(f"Child {i}")
course.append(block)
top_block.children.append(block)

if i > 0:
sub_block = FakeXBlock(f"Grandchild {i}")
course.append(sub_block)
block.children.append(sub_block)

for i in range(3):
course.append(FakeXBlock(f"Detached {i}", detached_block=True))

return course


def check_block_csv_matcher(course):
def match(request):
body = request.body
lines = body.split("\n")[:-1]

if len(lines) != len(course):
return False, f"Body has {len(lines)} lines, course has {len(course)}"

f = StringIO(body)
reader = csv.reader(f)

i = 0
try:
for row in reader:
block = course[i]
assert row[0] == block.location.org
assert row[1] == str(block.location.course_key)
assert row[2] == block.location.course
assert row[3] == block.location.run
assert row[4] == str(course[i].location)
assert row[5] == block.display_name_with_default
assert row[6] == str(block.block_type)
i += 1
except AssertionError as e:
return False, f"Mismatch in row {i}: {e}"
return True, ""
return match


def check_relationship_csv_matcher(course):
relationships = []
for block in course:
course_key = str(block.location.course_key)
for index, child in enumerate(block.get_children()):
parent_node = str(CoursePublishedSink.strip_branch_and_version(block.location))
child_node = str(CoursePublishedSink.strip_branch_and_version(child.location))
relationships.append((course_key, parent_node, child_node))

def match(request):
body = request.body
lines = body.split("\n")[:-1]

if len(lines) != len(relationships):
return False, f"Body has {len(lines)} lines but there are {len(relationships)} relationships"

f = StringIO(body)
reader = csv.reader(f)

i = 0
try:
for row in reader:
print(row)
print(relationships[i])
relation = relationships[i]
assert row[0] == relation[0]
assert row[1] == relation[1]
assert row[2] == relation[2]
i += 1
except AssertionError as e:
return False, f"Mismatch in row {i}: {e}"
return True, ""
return match
50 changes: 50 additions & 0 deletions tests/test_course_published.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import logging
from io import StringIO
from unittest.mock import patch

import responses
from responses import matchers
from responses.registries import OrderedRegistry

from test_utils.helpers import (
check_block_csv_matcher,
check_relationship_csv_matcher,
course_factory,
course_str_factory,
get_clickhouse_http_params,
mock_detached_xblock_types,
)
from event_sink_clickhouse.tasks import dump_course_to_clickhouse


@responses.activate(registry=OrderedRegistry)
@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_detached_xblock_types")
@patch("event_sink_clickhouse.sinks.course_published.CoursePublishedSink._get_modulestore")
def test_course_publish_success(mock_modulestore, mock_detached, caplog):
caplog.set_level(logging.INFO, logger="edx.celery.task")
course = course_factory()
mock_modulestore.return_value.get_items.return_value = course
mock_detached.return_value = mock_detached_xblock_types()

blocks_params, relationships_params = get_clickhouse_http_params()

responses.post(
"https://foo.bar/",
match=[
matchers.query_param_matcher(blocks_params),
check_block_csv_matcher(course)
],
)
responses.post(
"https://foo.bar/",
match=[
matchers.query_param_matcher(relationships_params),
check_relationship_csv_matcher(course)
],
)

course = course_str_factory()
dump_course_to_clickhouse(course)

assert mock_modulestore.call_count == 1
assert mock_detached.call_count == 1
Loading

0 comments on commit bcb1391

Please sign in to comment.