Skip to content

Commit

Permalink
Fix cert filter handling, alter sync resume logic
Browse files Browse the repository at this point in the history
  • Loading branch information
bjester committed Aug 30, 2021
1 parent ae3abea commit 73bc252
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 26 deletions.
6 changes: 3 additions & 3 deletions kolibri/core/auth/management/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ def _sync(self, sync_session_client, **options): # noqa: C901

client_cert = sync_session_client.sync_session.client_certificate
register_sync_event_handlers(sync_session_client.controller)
sync_filter = get_sync_filter(client_cert)
client_cert_scope = client_cert.get_scope()

scope_params = json.loads(client_cert.scope_params)
dataset_id = scope_params.get("dataset_id")
Expand All @@ -503,14 +503,14 @@ def _sync(self, sync_session_client, **options): # noqa: C901
self._pull(
sync_session_client,
noninteractive,
sync_filter,
client_cert_scope.read_filter,
)
# and push our own data to server
if not no_push:
self._push(
sync_session_client,
noninteractive,
sync_filter,
client_cert_scope.write_filter,
)

if not no_provision:
Expand Down
20 changes: 13 additions & 7 deletions kolibri/core/discovery/utils/network/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from kolibri.core.device.utils import get_device_setting
from kolibri.core.discovery.models import DynamicNetworkLocation
from kolibri.core.public.utils import begin_request_soud_sync
from kolibri.core.public.utils import cleanup_server_soud_sync
from kolibri.core.public.utils import get_device_info
from kolibri.core.public.utils import stop_request_soud_sync

Expand Down Expand Up @@ -239,13 +240,18 @@ def remove_service(self, zeroconf, type, name):
if id in self.instances:
if not get_is_self(id):
instance = self.instances[id]
if get_device_setting(
"subset_of_users_device", False
) and not instance.get("subset_of_users_device", False):
for user in FacilityUser.objects.all().values("id"):
stop_request_soud_sync(
server=instance.get("base_url"), user=user["id"]
)
is_soud = instance.get("subset_of_users_device", False)

if get_device_setting("subset_of_users_device", False):
if not is_soud:
for user in FacilityUser.objects.all().values("id"):
stop_request_soud_sync(
server=instance.get("base_url"), user=user["id"]
)
elif is_soud:
# this means our device is not SoUD, and instance is a SoUD
cleanup_server_soud_sync(instance)

del self.instances[id]
except KeyError:
pass
Expand Down
88 changes: 72 additions & 16 deletions kolibri/core/public/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import hashlib
import json
import logging
import platform
import random
Expand All @@ -9,9 +10,13 @@
from django.core.urlresolvers import reverse
from django.utils import timezone
from morango.models import InstanceIDModel
from morango.models import SyncSession
from morango.models import TransferSession
from rest_framework import status

import kolibri
from kolibri.core.auth.constants.morango_sync import PROFILE_FACILITY_DATA
from kolibri.core.auth.constants.morango_sync import ScopeDefinitions
from kolibri.core.auth.models import FacilityUser
from kolibri.core.device.models import UserSyncStatus
from kolibri.core.device.utils import DeviceNotProvisioned
Expand Down Expand Up @@ -90,6 +95,45 @@ def get_device_info(version=DEVICE_INFO_VERSION):
return info


def find_soud_sync_sessions(**filters):
"""
:param filters: A dict of queryset filter
:return: A SyncSession queryset
"""
return SyncSession.objects.filter(
active=True,
connection_kind="network",
profile=PROFILE_FACILITY_DATA,
client_certificate__scope_definition_id=ScopeDefinitions.SINGLE_USER,
**filters
).order("-last_activity_timestamp")


def find_soud_sync_session_for_resume(user, base_url):
"""
Finds the most recently active sync session for a SoUD sync
:type user: FacilityUser
:type base_url: str
:rtype: SyncSession|None
"""
# SoUD requests sync with server, so for resume we filter by client and matching base url
sync_sessions = find_soud_sync_sessions(
is_server=False,
connection_path__startswith=base_url,
)

# ensure the certificate is for the user we're checking for
for sync_session in sync_sessions:
scope_params = json.loads(sync_session.client_certificate.scope_params)
dataset_id = scope_params.get("dataset_id")
user_id = scope_params.get("user_id", None)
if user_id == user.id and user.dataset_id == dataset_id:
return sync_session

return None


def peer_sync(**kwargs):
try:
call_command("sync", **kwargs)
Expand Down Expand Up @@ -135,16 +179,13 @@ def startpeerusersync(
)
job_data = None
# attempt to resume an existing session
try:
sync_session = UserSyncStatus.objects.get(user=user).sync_session
if sync_session and sync_session.active:
command = "resumesync"
# if resuming encounters an error, it should close the session to avoid a loop
job_data = prepare_soud_resume_sync_job(
server, sync_session.id, close_on_error=True, **common_job_args
)
except UserSyncStatus.DoesNotExist:
pass
sync_session = find_soud_sync_session_for_resume(user, server)
if sync_session is not None:
command = "resumesync"
# if resuming encounters an error, it should close the session to avoid a loop
job_data = prepare_soud_resume_sync_job(
server, sync_session.id, close_on_error=True, **common_job_args
)

# if not resuming, prepare normal job
if job_data is None:
Expand All @@ -160,12 +201,11 @@ def stoppeerusersync(server, user_id):
"""
Close the sync session with a server
"""
# skip if no sync status, no sync session, or sync session is inactive
try:
sync_session = UserSyncStatus.objects.get(user=user_id).sync_session
if not sync_session or not sync_session.active:
return
except UserSyncStatus.DoesNotExist:
user = FacilityUser.objects.get(pk=user_id)
sync_session = find_soud_sync_session_for_resume(user, server)

# skip if we couldn't find one for resume
if sync_session is None:
return

# hack: queue the resume job, without push or pull, and without keep_alive, so it should close
Expand Down Expand Up @@ -359,3 +399,19 @@ def schedule_new_sync(server, user, interval=OPTIONS["Deployment"]["SYNC_INTERVA
JOB_ID = hashlib.md5("{}:{}".format(server, user).encode()).hexdigest()
job = Job(request_soud_sync, server, user, job_id=JOB_ID)
scheduler.enqueue_in(dt, job)


def cleanup_server_soud_sync(device_info):
"""
:param device_info: Client's device info
"""
sync_sessions = find_soud_sync_sessions(is_server=True)
for sync_session in sync_sessions:
if (
TransferSession.objects.filter(
sync_session=sync_session, active=True
).count()
== 0
):
sync_session.active = False
sync_session.save()

0 comments on commit 73bc252

Please sign in to comment.