Skip to content

Commit

Permalink
Merge branch 'develop' into feature/issue-942-remove-unnecessary-mess…
Browse files Browse the repository at this point in the history
…age-store-indexes
  • Loading branch information
jerith committed May 27, 2015
2 parents c2e8e45 + 2e86dbb commit 44eb8d1
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 18 deletions.
42 changes: 29 additions & 13 deletions vumi/components/message_store_migrators.py
Expand Up @@ -53,17 +53,21 @@ def migrate_from_1(self, mdata):
mdata.set_value('batches', mdata.old_data.get('batches', []))
mdata.copy_values('message')
mdata.copy_indexes('message_bin')
mdata.copy_indexes('message_with_status_bin')

return mdata

def reverse_from_2(self, mdata):
# We copy the `batches` field even though the older model version
# doesn't know about it. This lets us migrate v2 -> v1 -> v2 without
# losing data.
# We copy the `batches` field and related indexes even though the older
# model version doesn't know about them. This lets us migrate
# v2 -> v1 -> v2 without losing data.
mdata.set_value('$VERSION', 1)
self._copy_msg_field('event', mdata)
mdata.copy_values('message', 'batches')
mdata.copy_indexes('message_bin')
mdata.copy_indexes('message_with_status_bin')
mdata.copy_indexes('batches_bin')
mdata.copy_indexes('batches_with_statuses_reverse_bin')

return mdata

Expand All @@ -83,7 +87,7 @@ def migrate_from_1(self, mdata):
mdata.set_value('$VERSION', 2)
self._copy_msg_field('msg', mdata)
mdata.copy_values('batches')
mdata.copy_indexes('batches')
mdata.copy_indexes('batches_bin')

return mdata

Expand All @@ -93,7 +97,8 @@ def migrate_from_2(self, mdata):
mdata.set_value('$VERSION', 3)
self._copy_msg_field('msg', mdata)
mdata.copy_values('batches')
mdata.copy_indexes('batches')
mdata.copy_indexes('batches_bin')
mdata.copy_indexes('batches_with_timestamps_bin')

return mdata

Expand All @@ -104,7 +109,8 @@ def reverse_from_3(self, mdata):
mdata.set_value('$VERSION', 2)
self._copy_msg_field('msg', mdata)
mdata.copy_values('batches')
mdata.copy_indexes('batches')
mdata.copy_indexes('batches_bin')
mdata.copy_indexes('batches_with_timestamps_bin')

return mdata

Expand All @@ -114,7 +120,9 @@ def migrate_from_3(self, mdata):
mdata.set_value('$VERSION', 4)
self._copy_msg_field('msg', mdata)
mdata.copy_values('batches')
mdata.copy_indexes('batches')
mdata.copy_indexes('batches_bin')
mdata.copy_indexes('batches_with_timestamps_bin')
mdata.copy_indexes('batches_with_addresses_bin')

return mdata

Expand All @@ -125,7 +133,9 @@ def reverse_from_4(self, mdata):
mdata.set_value('$VERSION', 3)
self._copy_msg_field('msg', mdata)
mdata.copy_values('batches')
mdata.copy_indexes('batches')
mdata.copy_indexes('batches_bin')
mdata.copy_indexes('batches_with_timestamps_bin')
mdata.copy_indexes('batches_with_addresses_bin')

return mdata

Expand All @@ -145,7 +155,7 @@ def migrate_from_1(self, mdata):
mdata.set_value('$VERSION', 2)
self._copy_msg_field('msg', mdata)
mdata.copy_values('batches')
mdata.copy_indexes('batches')
mdata.copy_indexes('batches_bin')

return mdata

Expand All @@ -155,7 +165,8 @@ def migrate_from_2(self, mdata):
mdata.set_value('$VERSION', 3)
self._copy_msg_field('msg', mdata)
mdata.copy_values('batches')
mdata.copy_indexes('batches')
mdata.copy_indexes('batches_bin')
mdata.copy_indexes('batches_with_timestamps_bin')

return mdata

Expand All @@ -166,7 +177,8 @@ def reverse_from_3(self, mdata):
mdata.set_value('$VERSION', 2)
self._copy_msg_field('msg', mdata)
mdata.copy_values('batches')
mdata.copy_indexes('batches')
mdata.copy_indexes('batches_bin')
mdata.copy_indexes('batches_with_timestamps_bin')

return mdata

Expand All @@ -176,7 +188,9 @@ def migrate_from_3(self, mdata):
mdata.set_value('$VERSION', 4)
self._copy_msg_field('msg', mdata)
mdata.copy_values('batches')
mdata.copy_indexes('batches')
mdata.copy_indexes('batches_bin')
mdata.copy_indexes('batches_with_timestamps_bin')
mdata.copy_indexes('batches_with_addresses_bin')

return mdata

Expand All @@ -187,6 +201,8 @@ def reverse_from_4(self, mdata):
mdata.set_value('$VERSION', 3)
self._copy_msg_field('msg', mdata)
mdata.copy_values('batches')
mdata.copy_indexes('batches')
mdata.copy_indexes('batches_bin')
mdata.copy_indexes('batches_with_timestamps_bin')
mdata.copy_indexes('batches_with_addresses_bin')

return mdata
4 changes: 3 additions & 1 deletion vumi/persist/model.py
Expand Up @@ -139,7 +139,9 @@ def __init__(self, riak_object):

def get_riak_object(self):
self.riak_object.set_data(self.new_data)
# Note: This keeps old indexes.
# We need to explicitly remove old indexes before adding new ones.
for field in self.old_index:
self.riak_object.remove_index(field)
for field, values in self.new_index.iteritems():
for value in values:
self.riak_object.add_index(field, value)
Expand Down
87 changes: 83 additions & 4 deletions vumi/persist/tests/test_model.py
Expand Up @@ -133,6 +133,43 @@ def reverse_from_3(self, migration_data):
migration_data.set_value('text', migration_data.old_data['text'])
return migration_data

def migrate_from_3(self, migration_data):
# Migrator assertions
assert self.data_version == 3
assert self.model_class is IndexRemovedVersionedModel
assert isinstance(self.manager, Manager)

# Data assertions
assert set(migration_data.old_data.keys()) == set(
['$VERSION', 'c', 'text'])
assert migration_data.old_data['$VERSION'] == 3
assert migration_data.old_index == {"text_bin": ["hi"]}

# Actual migration
migration_data.set_value('$VERSION', 4)
migration_data.copy_values('c')
migration_data.set_value('text', migration_data.old_data['text'])
return migration_data

def reverse_from_4(self, migration_data):
# Migrator assertions
assert self.data_version == 4
assert self.model_class is IndexRemovedVersionedModel
assert isinstance(self.manager, Manager)

# Data assertions
assert set(migration_data.old_data.keys()) == set(
['$VERSION', 'c', 'text'])
assert migration_data.old_data['$VERSION'] == 4
assert migration_data.old_index == {}

# Actual migration
migration_data.set_value('$VERSION', 3)
migration_data.copy_values('c')
migration_data.set_value(
'text', migration_data.old_data['text'], index='text_bin')
return migration_data


class UnversionedModel(Model):
bucket = 'versionedmodel'
Expand Down Expand Up @@ -160,8 +197,16 @@ class IndexedVersionedModel(Model):
text = Unicode(null=True, index=True)


class UnknownVersionedModel(Model):
class IndexRemovedVersionedModel(Model):
VERSION = 4
MIGRATOR = VersionedModelMigrator
bucket = 'versionedmodel'
c = Integer()
text = Unicode(null=True)


class UnknownVersionedModel(Model):
VERSION = 5
bucket = 'versionedmodel'
d = Integer()

Expand Down Expand Up @@ -827,8 +872,8 @@ def test_version_reverse_migration_new_index(self):
foo_old = yield old_model.load("foo")
self.assertEqual(foo_old.c, 1)
self.assertEqual(foo_old.text, "hi")
# Old indexes are kept across migrations.
self.assertEqual(self.get_model_indexes(foo_old), {"text_bin": ["hi"]})
# Old indexes are no longer kept across migrations.
self.assertEqual(self.get_model_indexes(foo_old), {})
self.assertEqual(foo_old.was_migrated, False)

@Manager.calls_manager
Expand Down Expand Up @@ -857,6 +902,40 @@ def test_version_migration_new_index_None(self):
self.assertEqual(foo_new.text, None)
self.assertEqual(self.get_model_indexes(foo_new), {})

@Manager.calls_manager
def test_version_migration_remove_index(self):
"""
An index can be removed in a migration.
"""
old_model = self.manager.proxy(IndexedVersionedModel)
new_model = self.manager.proxy(IndexRemovedVersionedModel)
foo_old = old_model("foo", c=1, text=u"hi")
yield foo_old.save()

foo_new = yield new_model.load("foo")
self.assertEqual(foo_new.c, 1)
self.assertEqual(foo_new.text, "hi")
self.assertEqual(self.get_model_indexes(foo_new), {})
self.assertEqual(foo_new.was_migrated, True)

@Manager.calls_manager
def test_version_reverse_migration_remove_index(self):
"""
A removed index can be restored in a reverse migration.
"""
old_model = self.manager.proxy(IndexedVersionedModel)
new_model = self.manager.proxy(IndexRemovedVersionedModel)
foo_new = new_model("foo", c=1, text=u"hi")
model_name = "%s.%s" % (
VersionedModel.__module__, IndexRemovedVersionedModel.__name__)
self.manager.store_versions[model_name] = IndexedVersionedModel.VERSION
yield foo_new.save()

foo_old = yield old_model.load("foo")
self.assertEqual(foo_old.c, 1)
self.assertEqual(foo_old.text, "hi")
self.assertEqual(self.get_model_indexes(foo_new), {"text_bin": ["hi"]})

@Manager.calls_manager
def test_version_migration_failure(self):
odd_model = self.manager.proxy(UnknownVersionedModel)
Expand All @@ -869,7 +948,7 @@ def test_version_migration_failure(self):
self.fail('Expected ModelMigrationError.')
except ModelMigrationError, e:
self.assertEqual(
e.args[0], 'No migrators defined for VersionedModel version 4')
e.args[0], 'No migrators defined for VersionedModel version 5')

@Manager.calls_manager
def test_dynamic_field_migration(self):
Expand Down

0 comments on commit 44eb8d1

Please sign in to comment.