Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pythontest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
# Label used to access the service container
redis:
# Docker Hub image
image: redis:4.0.9
image: redis:6.0.9
# Set health checks to wait until redis has started
options: >-
--health-cmd "redis-cli ping"
Expand Down
14 changes: 14 additions & 0 deletions contentcuration/contentcuration/asgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter
from channels.routing import URLRouter

from contentcuration.viewsets.websockets.routing import websocket_urlpatterns

application = ProtocolTypeRouter({
"websocket":
AuthMiddlewareStack(
URLRouter(
websocket_urlpatterns
)
),
})
45 changes: 36 additions & 9 deletions contentcuration/contentcuration/frontend/shared/data/serverSync.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
import debounce from 'lodash/debounce';
import findLastIndex from 'lodash/findLastIndex';
import get from 'lodash/get';
import pick from 'lodash/pick';
import orderBy from 'lodash/orderBy';
import pick from 'lodash/pick';
import uniq from 'lodash/uniq';
import applyChanges from './applyRemoteChanges';
import { hasActiveLocks, cleanupLocks } from './changes';
import {
ACTIVE_CHANNELS,
CHANGES_TABLE,
CHANGE_LOCKS_TABLE,
CHANGE_TYPES,
CHANGES_TABLE,
IGNORED_SOURCE,
CHANNEL_SYNC_KEEP_ALIVE_INTERVAL,
ACTIVE_CHANNELS,
IGNORED_SOURCE,
MAX_REV_KEY,
} from './constants';
import { Channel, Session, Task } from './resources';
import { cleanupLocks, hasActiveLocks } from './changes';

import { INDEXEDDB_RESOURCES } from './registry';
import applyChanges from './applyRemoteChanges';
import db from './db';
import mergeAllChanges from './mergeChanges';
import { INDEXEDDB_RESOURCES } from './registry';
import { Channel, Session, Task } from './resources';
import client from 'shared/client';
import urls from 'shared/urls';

Expand All @@ -29,7 +30,7 @@ const SYNC_IF_NO_CHANGES_FOR = 2;
// When this many seconds pass, repoll the backend to
// check for any updates to active channels, or the user and sync any current changes
const SYNC_POLL_INTERVAL = 5;

let socket;
// Flag to check if a sync is currently active.
let syncActive = false;

Expand Down Expand Up @@ -274,7 +275,19 @@ async function syncChanges() {
// "errors": [],
// "successes": [],
// }
if (requestPayload.changes.length != 0) {
socket.send(
JSON.stringify({
payload: requestPayload,
})
);
}
const response = await client.post(urls['sync'](), requestPayload);
socket.onmessage = function(e) {
const data = JSON.parse(e.data);
console.log(data);
};

try {
await Promise.all([
handleDisallowed(response),
Expand Down Expand Up @@ -342,7 +355,21 @@ export function startSyncing() {
cleanupLocks();
// Initiate a sync immediately in case any data
// is left over in the database.

const websocketUrl = new URL(
`/ws/sync_socket/${window.CHANNEL_EDIT_GLOBAL.channel_id}/`,
window.location.href
);
websocketUrl.protocol = window.location.protocol == 'https:' ? 'wss:' : 'ws:';
socket = new WebSocket(websocketUrl);

// Connection opened
socket.addEventListener('open', () => {
console.log('Websocket connected');
});

debouncedSyncChanges();

// Start the sync interval
intervalTimer = setInterval(debouncedSyncChanges, SYNC_POLL_INTERVAL * 1000);
db.on('changes', handleChanges);
Expand Down
12 changes: 12 additions & 0 deletions contentcuration/contentcuration/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
# Application definition

INSTALLED_APPS = (
'channels',
'contentcuration.apps.ContentConfig',
'django.contrib.auth',
'django.contrib.contenttypes',
Expand All @@ -86,6 +87,7 @@
'webpack_loader',
'django_filters',
'mathfilters',

)

SESSION_ENGINE = "django.contrib.sessions.backends.cached_db"
Expand Down Expand Up @@ -201,7 +203,17 @@
]

WSGI_APPLICATION = 'contentcuration.wsgi.application'
ASGI_APPLICATION = 'contentcuration.asgi.application'


CHANNEL_LAYERS = {
'default': {
'BACKEND': 'channels_redis.core.RedisChannelLayer',
'CONFIG': {
"hosts": [('127.0.0.1', 6379)],
},
},
}

# Database
# https://docs.djangoproject.com/en/1.8/ref/settings/#databases
Expand Down
102 changes: 102 additions & 0 deletions contentcuration/contentcuration/tests/test_websocket_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import os

import pytest
from channels.layers import get_channel_layer
from channels.testing import WebsocketCommunicator
from django.core.management import call_command
from django.test import override_settings
from django.test import TransactionTestCase

from contentcuration.asgi import application
from contentcuration.tests import testdata

os.environ["DJANGO_ALLOW_ASYNC_UNSAFE"] = "true"


class WebsocketTestCase(TransactionTestCase):

def setUp(self):
call_command("loadconstants")
self.user = testdata.user("mrtest@testy.com")

def tearDown(self):
self.user.delete()

@pytest.mark.asyncio
async def test_authenticated_user_websocket_connection(self):
self.client.force_login(self.user)
headers = [(b'cookie', self.client.cookies.output(attrs=["value"], header='', sep='; ').encode())]
communicator = WebsocketCommunicator(application, 'ws/sync_socket/12312312312123/', headers)
connected = await communicator.connect()
assert connected
await communicator.disconnect()

@pytest.mark.asyncio
async def test_unauthenticated_user_websocket_connection(self):
headers = [(b'cookie', self.client.cookies.output(attrs=["value"], header='', sep='; ').encode())]
communicator = WebsocketCommunicator(application, 'ws/sync_socket/12312312312123/', headers)
connected, _ = await communicator.connect()
assert connected is False
await communicator.disconnect()

@pytest.mark.asyncio
async def test_disconnect_websockets(self):
self.client.force_login(self.user)
headers = [(b'cookie', self.client.cookies.output(attrs=["value"], header='', sep='; ').encode())]
channel_layers_setting = {
"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}
}
with override_settings(CHANNEL_LAYERS=channel_layers_setting):
communicator = WebsocketCommunicator(application, 'ws/sync_socket/12312312312123/', headers)
connected, _ = await communicator.connect()
channel_layer = get_channel_layer()
assert connected
await communicator.disconnect()
assert channel_layer.groups == {}

@pytest.mark.asyncio
async def test_send_payload_websockets(self):
self.client.force_login(self.user)
headers = [(b'cookie', self.client.cookies.output(attrs=["value"], header='', sep='; ').encode())]
communicator = WebsocketCommunicator(application, 'ws/sync_socket/12312312312123/', headers)
connected = await communicator.connect()
assert connected
await communicator.send_json_to({
"payload": {
"changes": [
{
"type": 2,
"key": "7ae83505f20a4642a004fadde7f151ed",
"table": "channel",
"rev": 253,
"channel_id": "7ae83505f20a4642a004fadde7f151ed",
"mods": {
"name": "test"
}
}
],
"channel_revs": {
"7ae83505f20a4642a004fadde7f151ed": 51
},
"user_rev": 0
}})
response = await communicator.receive_json_from()
assert response["response_payload"]
await communicator.disconnect()

@pytest.mark.asyncio
async def test_channels_groups(self):
self.client.force_login(self.user)
headers = [(b'cookie', self.client.cookies.output(attrs=["value"], header='', sep='; ').encode())]
channel_layers_setting = {
"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}
}
with override_settings(CHANNEL_LAYERS=channel_layers_setting):
communicator = WebsocketCommunicator(application, 'ws/sync_socket/12312312312123/', headers)
connected, _ = await communicator.connect()
channel_layer = get_channel_layer()
# check the grou for channel exist
assert channel_layer.groups['12312312312123']
assert channel_layer.groups[f"{self.user.id}"]
assert connected
await communicator.disconnect()
130 changes: 130 additions & 0 deletions contentcuration/contentcuration/viewsets/websockets/consumers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import json
import logging as logger

from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer

from contentcuration.models import Change
from contentcuration.models import Channel
from contentcuration.tasks import get_or_create_async_task
from contentcuration.viewsets.sync.constants import CHANNEL
from contentcuration.viewsets.sync.constants import CREATED


logging = logger.getLogger(__name__)


class SyncConsumer(WebsocketConsumer):
# Initial reset
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.room_group_name = None
self.indiviual_room_group_name = None

@property
def user(self):
return self.scope["user"]

# Checks permissions
def check_authentication(self):
return self.user.is_authenticated

def connect(self):
"""
Executes when a user tries to make a websocket connection.
- Creates and joins a group for indiviual user
- Joins a public group based on channel_id provided in url
"""
# Extract the channel_id from url
self.room_group_name = self.scope['url_route']['kwargs']['channel_id']

logging.debug("Connected to channel_id: " + self.room_group_name)

self.indiviual_room_group_name = str(self.user.id)

logging.debug("Connected to user " + str(self.user))

if self.check_authentication():
# Join room group based on channel_id
async_to_sync(self.channel_layer.group_add)(
self.room_group_name,
self.channel_name
)

# Join private room group for indiviual user
async_to_sync(self.channel_layer.group_add)(
self.indiviual_room_group_name,
self.channel_name
)

self.accept()

else:
self.close()

def disconnect(self, close_code):
"""
Executed to leave indiviual-user and channel group
"""
# Leave channel_id room group
async_to_sync(self.channel_layer.group_discard)(
self.room_group_name,
self.channel_name
)

# Leave indiviual room group
async_to_sync(self.channel_layer.group_discard)(
self.indiviual_room_group_name,
self.channel_name
)

def receive(self, text_data):
"""
Executes when data is received from websocket
"""
response_payload = {
"disallowed": [],
"allowed": [],
}
user_id = self.user.id
session_key = self.scope['cookies']['kolibri_studio_sessionid']
text_data_json = json.loads(text_data)
changes = text_data_json["payload"]["changes"]

change_channel_ids = set(x.get("channel_id") for x in changes if x.get("channel_id"))
# Channels that have been created on the client side won't exist on the server yet, so we need to add a special exception for them.
created_channel_ids = set(x.get("channel_id") for x in changes if x.get("channel_id") and x.get("table") == CHANNEL and x.get("type") == CREATED)
# However, this would also give people a mechanism to edit existing channels on the server side by adding a channel create event for an
# already existing channel, so we have to filter out the channel ids that are already created on the server side, regardless of whether
# the user making the requests has permissions for those channels.
created_channel_ids = created_channel_ids.difference(
set(Channel.objects.filter(id__in=created_channel_ids).values_list("id", flat=True).distinct())
)
allowed_ids = set(
Channel.filter_edit_queryset(Channel.objects.filter(id__in=change_channel_ids), self.user).values_list("id", flat=True).distinct()
).union(created_channel_ids)
# Allow changes that are either:
# Not related to a channel and instead related to the user if the user is the current user.
user_only_changes = []
# Related to a channel that the user is an editor for.
channel_changes = []
# Changes that cannot be made
disallowed_changes = []
for c in changes:
if c.get("channel_id") is None and c.get("user_id") == user_id:
user_only_changes.append(c)
elif c.get("channel_id") in allowed_ids:
channel_changes.append(c)
else:
disallowed_changes.append(c)
change_models = Change.create_changes(user_only_changes + channel_changes, created_by_id=user_id, session_key=session_key)
if user_only_changes:
get_or_create_async_task("apply_user_changes", self.user, user_id=user_id)
for channel_id in allowed_ids:
get_or_create_async_task("apply_channel_changes", self.user, channel_id=channel_id)
allowed_changes = [{"rev": c.client_rev, "server_rev": c.server_rev} for c in change_models]
response_payload.update({"disallowed": disallowed_changes, "allowed": allowed_changes})

self.send(json.dumps({
'response_payload': response_payload
}))
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from django.urls import re_path

from . import consumers

websocket_urlpatterns = [
re_path(r'ws/sync_socket/(?P<channel_id>\w+)/$', consumers.SyncConsumer.as_asgi()),
]
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ services:
- pgdata:/var/lib/postgresql/data/pgdata

redis:
image: redis:4.0.9
image: redis:6.0.9

cloudprober:
<<: *studio-worker
Expand Down
Loading