Skip to content
This repository has been archived by the owner on Mar 23, 2021. It is now read-only.

Commit

Permalink
Merge pull request #10 from praekeltfoundation/feature/GPE-351-migrat…
Browse files Browse the repository at this point in the history
…e-subscriptions

Add subscription migration logic
  • Loading branch information
rudigiesler committed Aug 25, 2017
2 parents 2861088 + e668517 commit 2112ab3
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 6 deletions.
15 changes: 15 additions & 0 deletions mapper/sequence_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals


class SequenceMapper(object):
"""
Provides the logic for mapping the sequence number from one message set to
another.
"""
def map(self, from_messageset, to_messageset, sequence):
# Default to no mapping
return sequence


map_sequence = SequenceMapper().map
72 changes: 69 additions & 3 deletions mapper/tasks.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals

from celery.task import Task
from celery.utils.log import get_task_logger
from django.conf import settings
from django.db import connections, transaction
from django.db.models import F
from django.utils import timezone
from logging import INFO, ERROR
from logging import INFO, ERROR, WARNING
from seed_services_client.stage_based_messaging import (
StageBasedMessagingApiClient)
from uuid import uuid4

from .models import LogEvent, MigrateSubscription
from .sequence_mapper import map_sequence


class MigrateSubscriptionsTask(Task):
Expand Down Expand Up @@ -65,12 +71,72 @@ def fetch_identities(self, migrate):
for row in chunk:
yield row[0]

def _get_stage_based_messaging_client(self):
"""
Get the cached stage based messaging client, or create and cache on the
instance.
"""
if getattr(self, 'sbm_client', None) is None:
self.sbm_client = StageBasedMessagingApiClient(
settings.STAGE_BASED_MESSAGING_TOKEN,
settings.STAGE_BASED_MESSAGING_URL)
return self.sbm_client

def get_messageset(self, messageset_id):
if getattr(self, 'messagesets', None) is None:
self.messagesets = {}
if messageset_id not in self.messagesets:
client = self._get_stage_based_messaging_client()
self.messagesets[messageset_id] = client.get_messageset(
messageset_id)
return self.messagesets[messageset_id]

def migrate_identity(self, migrate, identity):
"""
Migrates an identity from one messageset to another.
"""
# TODO: Actually migrate the identity's subscription
pass
client = self._get_stage_based_messaging_client()
existing_subs = list(client.get_subscriptions({
'identity': identity,
'messageset': migrate.from_messageset,
})['results'])
if len(existing_subs) == 0:
self.log(
migrate, ERROR,
"Identity {identity} has no existing subscriptions to {ms}. "
"Not migrating identity.".format(
identity=identity, ms=self.get_messageset(
migrate.from_messageset)['short_name'])
)
return
elif len(existing_subs) > 1:
self.log(
migrate, WARNING,
"Identity {identity} has {num} subscriptions to {messageset}. "
"All will be cancelled.".format(
identity=identity, num=len(existing_subs),
messageset=self.get_messageset(
migrate.from_messageset)['short_name'])
)

for sub in existing_subs:
client.update_subscription(sub['id'], data={'active': False})

sequence_number = map_sequence(
self.get_messageset(migrate.from_messageset)['short_name'],
self.get_messageset(migrate.to_messageset)['short_name'],
sub['next_sequence_number'],
)

client.create_subscription({
'identity': identity,
'messageset': migrate.to_messageset,
'initial_sequence_number': sequence_number,
'next_sequence_number': sequence_number,
'lang': sub['lang'],
'schedule': self.get_messageset(
migrate.to_messageset)['default_schedule'],
})

def run(self, migrate_subscription_id, **kwargs):
migrate = MigrateSubscription.objects.get(pk=migrate_subscription_id)
Expand Down
178 changes: 175 additions & 3 deletions mapper/tests.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from __future__ import absolute_import, unicode_literals

from django.db import connections
from django.conf import settings
Expand All @@ -19,6 +19,7 @@
import unittest.mock as mock

from .models import LogEvent, MigrateSubscription
from .sequence_mapper import map_sequence
from .tasks import migrate_subscriptions


Expand Down Expand Up @@ -66,6 +67,57 @@ def mock_get_messagesets(messagesets):
})


def mock_get_messageset(messageset_id, messageset_details):
responses.add(
responses.GET,
'{url}/messageset/{messageset_id}/'.format(
url=settings.STAGE_BASED_MESSAGING_URL,
messageset_id=messageset_id),
json=messageset_details,
)


def mock_get_subscriptions(subscriptions, querystring=''):
responses.add(
responses.GET,
'{url}/subscriptions/{querystring}'.format(
url=settings.STAGE_BASED_MESSAGING_URL,
querystring=querystring),
json={
"count": len(subscriptions),
"next": None,
"previous": None,
"results": subscriptions,
})


def mock_update_subscription(subscription_id):
responses.add(
responses.PATCH,
'{url}/subscriptions/{subscription_id}/'.format(
url=settings.STAGE_BASED_MESSAGING_URL,
subscription_id=subscription_id),
json={}
)


def mock_create_subscription():
responses.add(
responses.POST,
'{}/subscriptions/'.format(settings.STAGE_BASED_MESSAGING_URL),
json={}
)


def get_calls_to_url(url):
"""
Filters out responses calls to just the ones to the specified url.
"""
for r in responses.calls:
if r.request.url == url:
yield r


class MigrationSubscriptionsListViewTests(TestCase):
@responses.activate
def test_login_required(self):
Expand Down Expand Up @@ -196,8 +248,8 @@ def test_retry_button(self):
@responses.activate
def test_cancel_button(self):
"""
If a listed migration is in the starting or running state, then there
should be a cancel button.
If a listed migration can be cancelled, then there should be a cancel
button.
"""
m = MigrateSubscription.objects.create(
from_messageset=1, to_messageset=2,
Expand Down Expand Up @@ -781,6 +833,117 @@ def cancel_migration(*args):
self.assertEqual(migrate.current, 1)
self.assertEqual(LogEvent.objects.last().message, "Stopping task run")

@responses.activate
def test_migrate_identity_no_existing_subs(self):
"""
If an identity doesn't have any existing subscriptions to the specified
from messageset, then it should be skipped, and the appropriate log
message should be created.
"""
migrate = MigrateSubscription.objects.create(
from_messageset=1, to_messageset=2,
table_name='table', column_name='column')
mock_get_subscriptions([], '?messageset=1&identity=test-identity')
# To get the messageset name for the log entry
mock_get_messageset(1, {'short_name': 'from_messageset'})

migrate_subscriptions.migrate_identity(migrate, 'test-identity')

log = LogEvent.objects.last()
self.assertEqual(log.log_level, logging.ERROR)
self.assertEqual(
log.message,
'Identity test-identity has no existing subscriptions to '
'from_messageset. Not migrating identity.')

@responses.activate
def test_migrate_identity_multiple_subscriptions(self):
"""
If an identity has multiple subscriptions to the specified from
messageset, then a warning should be logged, and all of those
messagesets should be disabled.
"""
migrate = MigrateSubscription.objects.create(
from_messageset=1, to_messageset=2,
table_name='table', column_name='column')
mock_get_subscriptions(
[{'id': 1, 'next_sequence_number': 1, 'lang': 'afr'},
{'id': 2, 'next_sequence_number': 2, 'lang': 'eng'}],
'?messageset=1&identity=test-identity')
mock_update_subscription(1)
mock_update_subscription(2)
# To get the messageset name for the log entry + mapping
mock_get_messageset(1, {'short_name': 'from_messageset'})
mock_get_messageset(2, {
'short_name': 'to_messageset', 'default_schedule': 4})
mock_create_subscription()

migrate_subscriptions.migrate_identity(migrate, 'test-identity')

log = LogEvent.objects.last()
self.assertEqual(log.log_level, logging.WARNING)
self.assertEqual(
log.message, 'Identity test-identity has 2 subscriptions to '
'from_messageset. All will be cancelled.')

[cancel_sub1] = list(get_calls_to_url(
'{url}/subscriptions/{subscription_id}/'.format(
url=settings.STAGE_BASED_MESSAGING_URL, subscription_id=1)))
self.assertEqual(cancel_sub1.request.method, responses.PATCH)
self.assertEqual(
json.loads(cancel_sub1.request.body), {'active': False})

[cancel_sub2] = list(get_calls_to_url(
'{url}/subscriptions/{subscription_id}/'.format(
url=settings.STAGE_BASED_MESSAGING_URL, subscription_id=2)))
self.assertEqual(cancel_sub2.request.method, responses.PATCH)
self.assertEqual(
json.loads(cancel_sub2.request.body), {'active': False})

@responses.activate
def test_migrate_identity_single_subscription(self):
"""
If the identity has a single subscription to the from messageset, then
that should be cancelled, and a new subscription should be created
for the to messageset.
"""
migrate = MigrateSubscription.objects.create(
from_messageset=1, to_messageset=2,
table_name='table', column_name='column')
mock_get_subscriptions(
[{'id': 1, 'next_sequence_number': 5, 'lang': 'eng'}],
'?messageset=1&identity=test-identity')
mock_update_subscription(1)
# To get the messageset name for the log entry + mapping
mock_get_messageset(1, {'short_name': 'from_messageset'})
mock_get_messageset(2, {
'short_name': 'to_messageset', 'default_schedule': 4})
mock_create_subscription()

migrate_subscriptions.migrate_identity(migrate, 'test-identity')

self.assertEqual(LogEvent.objects.count(), 0)

[cancel_sub1] = list(get_calls_to_url(
'{url}/subscriptions/{subscription_id}/'.format(
url=settings.STAGE_BASED_MESSAGING_URL, subscription_id=1)))
self.assertEqual(cancel_sub1.request.method, responses.PATCH)
self.assertEqual(
json.loads(cancel_sub1.request.body), {'active': False})

[create_sub] = list(get_calls_to_url(
'{url}/subscriptions/'.format(
url=settings.STAGE_BASED_MESSAGING_URL, subscription_id=1)))
self.assertEqual(create_sub.request.method, responses.POST)
self.assertEqual(json.loads(create_sub.request.body), {
'identity': 'test-identity',
'initial_sequence_number': 5,
'next_sequence_number': 5,
'lang': 'eng',
'messageset': 2,
'schedule': 4,
})


class LogEventModelTests(TestCase):
def test_log_event_display(self):
Expand Down Expand Up @@ -945,3 +1108,12 @@ def test_successful_cancel(self):
self.assertEqual(log.migrate_subscription, migrate)
self.assertEqual(log.log_level, logging.INFO)
self.assertEqual(log.message, "Cancelling task")


class MapSubscriptionsTest(TestCase):
def test_default_mapping(self):
"""
By default, the output sequence should equal the input sequence.
"""
self.assertEqual(map_sequence(None, None, 1), 1)
self.assertEqual(map_sequence(None, None, 100), 100)
3 changes: 3 additions & 0 deletions mapper/urls.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals

from django.conf.urls import url

from .views import (
Expand Down

0 comments on commit 2112ab3

Please sign in to comment.