Skip to content

Commit

Permalink
Merge pull request #8342 from rtibbles/sync_queue_updates
Browse files Browse the repository at this point in the history
Sync queue updates
  • Loading branch information
rtibbles committed Aug 24, 2021
2 parents 44f4ca9 + 2fd8a3e commit 4309fdf
Show file tree
Hide file tree
Showing 7 changed files with 460 additions and 106 deletions.
11 changes: 0 additions & 11 deletions kolibri/core/auth/management/commands/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from kolibri.core.auth.models import dataset_cache
from kolibri.core.auth.sync_event_hook_utils import register_sync_event_handlers
from kolibri.core.logger.utils.data import bytes_for_humans
from kolibri.core.public.utils import schedule_new_sync
from kolibri.core.tasks.exceptions import UserCancelledError
from kolibri.core.tasks.management.commands.base import AsyncCommand
from kolibri.core.utils.lock import db_lock
Expand Down Expand Up @@ -82,12 +81,6 @@ def add_arguments(self, parser):
action="store_true",
help="do not create a facility and temporary superuser",
)
parser.add_argument(
"--resync-interval",
type=int,
default=None,
help="Seconds to schedule a new sync",
)
# parser.add_argument("--scope-id", type=str, default=FULL_FACILITY)

def handle_async(self, *args, **options): # noqa C901
Expand All @@ -103,7 +96,6 @@ def handle_async(self, *args, **options): # noqa C901
no_pull,
noninteractive,
no_provision,
resync_interval,
) = (
options["baseurl"],
options["facility"],
Expand All @@ -115,7 +107,6 @@ def handle_async(self, *args, **options): # noqa C901
options["no_pull"],
options["noninteractive"],
options["no_provision"],
options["resync_interval"],
)

PORTAL_SYNC = baseurl == DATA_PORTAL_SYNCING_BASE_URL
Expand Down Expand Up @@ -278,8 +269,6 @@ def handle_async(self, *args, **options): # noqa C901
self.job.save_meta()

dataset_cache.deactivate()
if user_id and resync_interval:
schedule_new_sync(baseurl, user_id, resync_interval)
logger.info("Syncing has been completed.")

@contextmanager
Expand Down
24 changes: 24 additions & 0 deletions kolibri/core/device/migrations/0014_syncqueue_instance_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.29 on 2021-08-24 17:37
from __future__ import unicode_literals

from uuid import uuid4

import morango.models.fields.uuids
from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
("device", "0013_usersyncstatus"),
]

operations = [
migrations.AddField(
model_name="syncqueue",
name="instance_id",
field=morango.models.fields.uuids.UUIDField(default=uuid4),
preserve_default=False,
),
]
7 changes: 4 additions & 3 deletions kolibri/core/device/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from django.conf import settings
from django.db import models
from django.db.models import F
from django.db.models import QuerySet
from morango.models import UUIDField
from morango.models.core import SyncSession
Expand Down Expand Up @@ -211,19 +212,19 @@ class SyncQueue(models.Model):

id = UUIDField(primary_key=True, default=uuid4)
user = models.ForeignKey(FacilityUser, on_delete=models.CASCADE, null=False)
instance_id = UUIDField(blank=False, null=False)
datetime = models.DateTimeField(auto_now_add=True)
updated = models.FloatField(default=time.time)
# polling interval is 5 seconds by default
keep_alive = models.FloatField(default=5.0)

@classmethod
def clean_stale(cls, expire=180.0):
def clean_stale(cls):
"""
This method will delete all the devices from the queue
with the expire time (in seconds) exhausted
"""
staled_time = time.time() - expire
cls.objects.filter(updated__lte=staled_time).delete()
cls.objects.filter(updated__lte=time.time() - F("keep_alive") * 2).delete()


class UserSyncStatus(models.Model):
Expand Down
36 changes: 26 additions & 10 deletions kolibri/core/device/test/test_syncqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from uuid import uuid4

import mock
import pytest
from django.test import TestCase

from kolibri.core.auth.models import Facility
Expand All @@ -22,7 +21,8 @@ def setUp(self):
def test_create_queue_element(self):
previous_time = time.time()
element, _ = SyncQueue.objects.get_or_create(
user=FacilityUser.objects.create(username="test", facility=self.facility)
user=FacilityUser.objects.create(username="test", facility=self.facility),
instance_id=uuid4(),
)
assert element.keep_alive == 5.0
current_time = time.time()
Expand All @@ -36,25 +36,42 @@ def test_queue_cleaning(self):
SyncQueue.objects.create(
user=FacilityUser.objects.create(
username="test{}".format(i), facility=self.facility
)
),
instance_id=uuid4(),
)
for i in range(3, 5):
item = SyncQueue.objects.create(
user=FacilityUser.objects.create(
username="test{}".format(i), facility=self.facility
)
),
instance_id=uuid4(),
)
item.updated = item.updated - 200
item.save()

assert SyncQueue.objects.count() == 5
SyncQueue.clean_stale() # default expiry time = 180 seconds
SyncQueue.clean_stale() # expiry time is 2 * keep_alive value
assert SyncQueue.objects.count() == 3

def test_dynamic_queue_cleaning(self):
for i in range(5):
item = SyncQueue.objects.create(
user=FacilityUser.objects.create(
username="test{}".format(i), facility=self.facility
),
instance_id=uuid4(),
)
item.updated = item.updated - 20
if i % 2 == 0:
item.keep_alive = 30
item.save()

assert SyncQueue.objects.count() == 5
SyncQueue.clean_stale() # expiry time is 2 * keep_alive value
assert SyncQueue.objects.count() == 3


@pytest.mark.django_db
class TestRequestSoUDSync(object):
@pytest.fixture()
class TestRequestSoUDSync(TestCase):
def setUp(self):
self.facility = Facility.objects.create(name="Test")
self.test_user = FacilityUser.objects.create(
Expand All @@ -66,7 +83,7 @@ def setUp(self):
"kolibri.core.public.utils.get_device_setting",
return_value=True,
)
def test_begin_request_soud_sync(self, mock_device_info, queue, setUp):
def test_begin_request_soud_sync(self, mock_device_info, queue):
begin_request_soud_sync("whatever_server", self.test_user.id)
queue.enqueue.assert_called_with(
request_soud_sync, "whatever_server", self.test_user.id
Expand All @@ -84,7 +101,6 @@ def test_request_soud_sync(
MorangoProfileController,
requests_mock,
scheduler,
setUp,
):

get_client_and_server_certs.return_value = None
Expand Down
113 changes: 68 additions & 45 deletions kolibri/core/public/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,7 @@ def list(self, request):
content = {"I'm a Subset of users device": "Nothing to do here"}
# would love to use HTTP 418, but it's not fully usable in browsers
return Response(content, status=status.HTTP_400_BAD_REQUEST)
SyncQueue.clean_stale(
expire=STALE_QUEUE_TIME
) # first, ensure not expired devices are in the queue
SyncQueue.clean_stale() # first, ensure not expired devices are in the queue
facilities = Facility.objects.all()
queue = {}
for facility in facilities:
Expand All @@ -188,6 +186,54 @@ def list(self, request):
).count()
return Response(queue)

def get_response_data(self, user, instance, pos, sync_interval, queue_object):
current_transfers = (
TransferSession.objects.filter(
Q(active=True)
| Q(
last_activity_timestamp__gte=timezone.now()
- datetime.timedelta(seconds=HANDSHAKING_TIME)
)
)
.exclude(transfer_stage_status=transfer_statuses.ERRORED)
.count()
)
if MAX_CONCURRENT_SYNCS - current_transfers > pos:
data = {"action": SYNC, "sync_interval": sync_interval}
if queue_object is not None:
queue_object.delete()
else:
# polling time at least HANDSHAKING_TIME seconds per position in the queue to
# be greater than the time needed for the handshake part of the ssl protocol
# we add one to the zero based position, as if the position is zero and it
# got to here, it means the sync queue is currently full, so we need to wait.
# we make sure that it is never less than half of the stale queue time, as the keep alive
# that we set here will be used to determine after what interval we should be expiring
# the queue item as stale - the keep_alive is doubled in order to achieve this, so
# by setting half the STALE_QUEUE_TIME to keep_alive, we are indirectly enforcing
# a stale queue time via the keep_alive.
polling = min(HANDSHAKING_TIME * (pos + 1), STALE_QUEUE_TIME / 2)
data = {
"action": QUEUED,
"keep_alive": polling,
}
if queue_object is not None:
# If the queue object exists, update it here.
queue_object.updated = time.time()
queue_object.keep_alive = polling
queue_object.save()
else:
# If no queue object, either because there was no pk
# or the pk was stale, generate a new object here.
queue_object = SyncQueue.objects.create(
user_id=user,
instance_id=instance,
keep_alive=polling,
)

data["id"] = queue_object.id
return data

def check_queue(self, request, pk=None):
is_SoUD = get_device_setting("subset_of_users_device", False)
if is_SoUD:
Expand All @@ -200,12 +246,17 @@ def check_queue(self, request, pk=None):
content = "Missing parameter: user is required"
return Response(content, status=status.HTTP_412_PRECONDITION_FAILED)

instance = request.data.get("instance") or request.query_params.get("instance")
if instance is None:
content = "Missing parameter: instance is required"
return Response(content, status=status.HTTP_412_PRECONDITION_FAILED)

if not FacilityUser.objects.filter(id=user).exists():
content = "This user is not registered in any of this server facilities"
return Response(content, status=status.HTTP_404_NOT_FOUND)

# first, ensure no expired devices are in the queue
SyncQueue.clean_stale(expire=STALE_QUEUE_TIME)
SyncQueue.clean_stale()

# Calculate the total size of the queue to scale things
total_queue_size = SyncQueue.objects.count()
Expand All @@ -218,8 +269,13 @@ def check_queue(self, request, pk=None):
OPTIONS["Deployment"]["SYNC_INTERVAL"] * (total_queue_size + 1),
DELAYED_SYNC / 2,
)
last_activity = timezone.now() - datetime.timedelta(minutes=5)
queue_object = SyncQueue.objects.filter(id=pk).first()

if pk is not None:
queue_object = SyncQueue.objects.filter(id=pk).first()
else:
queue_object = SyncQueue.objects.filter(
user_id=user, instance_id=instance
).first()

# Default the position to the total queue size, so that
# if the id does not exist, send them to the back of the queue
Expand All @@ -230,53 +286,20 @@ def check_queue(self, request, pk=None):
"Queue did not match user in request",
status=status.HTTP_400_BAD_REQUEST,
)
if queue_object.instance_id != instance:
return Response(
"Queue did not match instance in request",
status=status.HTTP_400_BAD_REQUEST,
)
# To work out the position in the queue, find all queued sync requests
# that were made before this request. If pk is None or the queue
# has expired (3 minutes), we will set the position to the length of the
# queue.
queue_object = SyncQueue.objects.get(pk=pk)
before_client = SyncQueue.objects.filter(datetime__lt=queue_object.datetime)
pos = before_client.count()

current_transfers = (
TransferSession.objects.filter(
active=True, last_activity_timestamp__gte=last_activity
)
.exclude(transfer_stage_status=transfer_statuses.ERRORED)
.count()
)
if MAX_CONCURRENT_SYNCS - current_transfers > pos:
data = {"action": SYNC, "sync_interval": sync_interval}
if queue_object is not None:
queue_object.delete()
else:
# polling time at least HANDSHAKING_TIME seconds per position in the queue to
# be greater than the time needed for the handshake part of the ssl protocol
# we add one to the zero based position, as if the position is zero and it
# got to here, it means the sync queue is currently full, so we need to wait
# we make sure that it is never less than half of the stale queue time, to
# prevent a malignant loop whereby very long queues cause clients to lose
# their place in the queue by only polling again after their queue has
# already expired!
polling = min(HANDSHAKING_TIME * (pos + 1), STALE_QUEUE_TIME / 2)
data = {
"action": QUEUED,
"keep_alive": polling,
}
if queue_object is not None:
# If the queue object exists, update it here.
queue_object.updated = time.time()
queue_object.keep_alive = polling
queue_object.save()
else:
# If no queue object, either because there was no pk
# or the pk was stale, generate a new object here.
queue_object = SyncQueue.objects.create(
user_id=user,
keep_alive=polling,
)
data = self.get_response_data(user, instance, pos, sync_interval, queue_object)

data["id"] = queue_object.id
UserSyncStatus.objects.update_or_create(
user_id=user, defaults={"queued": data["action"] == QUEUED}
)
Expand Down

0 comments on commit 4309fdf

Please sign in to comment.