Skip to content

Commit

Permalink
Restore from snapshot in prepopulation step (#592)
Browse files Browse the repository at this point in the history
* Tests: some cleanup
* Tests: close only open indices in "clean_es"
* Tests: refactor snapshot step
* Test: fix behave tests
* Tests: fix behave tests again
* Tests: fix "use_snapshot" function
* Tests: fix caching in "use_snapshot"
* Tests: move ELASTICSEARCH_BACKUPS_PATH setting; fix use_snapshot again
* Tests: add ELASTICSEARCH_BACKUPS_PATH to default_settings %)
* Tests: use snapshot option for prepopulate endpoint
* Tests: fix "prepopulate.feature" by cleaning cache
* Tests: use db names form configs
* Tests: run snapshot functions for all mongo databases
  • Loading branch information
naspeh committed Sep 29, 2016
1 parent 2d76fb0 commit e5a9804
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 87 deletions.
19 changes: 10 additions & 9 deletions apps/prepopulate/app_prepopulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from superdesk.metadata.item import ITEM_STATE, CONTENT_STATE
from superdesk.resource import Resource
from superdesk.services import BaseService
from superdesk.tests import drop_elastic, drop_mongo
from superdesk.tests import clean_dbs, use_snapshot
from superdesk.utc import utcnow
from eve.utils import date_to_str
from flask import current_app as app
Expand Down Expand Up @@ -150,21 +150,22 @@ class PrepopulateResource(Resource):


class PrepopulateService(BaseService):
def create(self, docs, **kwargs):
def _create(self, docs):
for doc in docs:
if doc.get('remove_first'):
drop_mongo(superdesk.app)
drop_elastic(superdesk.app)
app.data.init_elastic(superdesk.app)
clean_dbs(superdesk.app, force=True)

user = get_resource_service('users').find_one(username=get_default_user()['username'], req=None)
if not user:
get_resource_service('users').post([get_default_user()])
prepopulate_data(doc.get('profile') + '.json', get_default_user())
if app.config.get('SUPERDESK_TESTING'):
for provider in ['paimg', 'aapmm']:
if provider not in allowed_search_providers:
register_search_provider(provider, provider)

def create(self, docs, **kwargs):
use_snapshot(superdesk.app, 'prepopulate')(self._create)(docs)
if app.config.get('SUPERDESK_TESTING'):
for provider in ['paimg', 'aapmm']:
if provider not in allowed_search_providers:
register_search_provider(provider, provider)
return ['OK']


Expand Down
2 changes: 1 addition & 1 deletion features/prepopulate.feature
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ Feature: Prepopulate


@auth
@dbauth
@dbauth @clean_snapshots
Scenario: Prepopulate with custom profile
Given empty "roles"
Given empty "desks"
Expand Down
2 changes: 1 addition & 1 deletion superdesk/factory/default_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def env(variable, fallback_value=None):
ELASTICSEARCH_INDEX = env('ELASTICSEARCH_INDEX', 'superdesk')
if env('ELASTIC_PORT'):
ELASTICSEARCH_URL = env('ELASTIC_PORT').replace('tcp:', 'http:')

ELASTICSEARCH_BACKUPS_PATH = env('ELASTICSEARCH_BACKUPS_PATH', '')
ELASTICSEARCH_SETTINGS = {
'settings': {
'analysis': {
Expand Down
204 changes: 142 additions & 62 deletions superdesk/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# AUTHORS and LICENSE files distributed with this source code, or
# at https://www.sourcefabric.org/superdesk/license


import functools
import logging
import os
import socket
Expand Down Expand Up @@ -68,7 +68,6 @@ def get_test_settings():
test_settings['CELERY_ALWAYS_EAGER'] = 'True'
test_settings['CONTENT_EXPIRY_MINUTES'] = 99
test_settings['VERSION'] = '_current_version'
test_settings['ELASTICSEARCH_BACKUPS_PATH'] = '/tmp/es-backups'

# limit mongodb connections
test_settings['MONGO_CONNECT'] = False
Expand All @@ -88,18 +87,34 @@ def drop_elastic(app):
es.indices.delete(index, ignore=[404])


def drop_mongo(app):
with app.app_context():
drop_mongo_db(app, 'MONGO', 'MONGO_DBNAME')
drop_mongo_db(app, 'ARCHIVED', 'ARCHIVED_DBNAME')
drop_mongo_db(app, 'LEGAL_ARCHIVE', 'LEGAL_ARCHIVE_DBNAME')

def foreach_mongo(fn):
"""
Run the same actions on all mongo databases
def drop_mongo_db(app, db_prefix, dbname):
if app.config.get(dbname):
db = app.data.mongo.pymongo(prefix=db_prefix).cx
db.drop_database(app.config[dbname])
db.close()
This decorator adds two additional parameters to called function
`dbconn` and `dbname` for using proper connection and database name
"""
@functools.wraps(fn)
def inner(app, *a, **kw):
pairs = (
('MONGO', 'MONGO_DBNAME'),
('ARCHIVED', 'ARCHIVED_DBNAME'),
('LEGAL_ARCHIVE', 'LEGAL_ARCHIVE_DBNAME'),
)
with app.app_context():
for prefix, name in pairs:
if not app.config.get(name):
continue
kw['dbname'] = app.config[name]
kw['dbconn'] = app.data.mongo.pymongo(prefix=prefix).cx
fn(app, *a, **kw)
return inner


@foreach_mongo
def drop_mongo(app, dbconn, dbname):
dbconn.drop_database(dbname)
dbconn.close()


def setup_config(config):
Expand Down Expand Up @@ -128,51 +143,117 @@ def clean_dbs(app, force=False):
drop_mongo(app)


def retry(exc, count=1):
def wrapper(fn):
num = 0

@functools.wraps(fn)
def inner(*a, **kw):
global num

try:
return fn(*a, **kw)
except exc as e:
logging.exception(e)
if num < count:
num += 1
return inner(*a, **kw)
return inner
return wrapper


def _clean_es(app):
indices = '%s*' % app.config['ELASTICSEARCH_INDEX']
es = app.data.elastic.es
es.indices.delete(indices, ignore=[404])
app.data.init_elastic(app)


@retry(socket.timeout, 2)
def clean_es(app, force=False):
if not hasattr(clean_es, 'run') or force:
def run():
"""
Drop and init elasticsearch indices if backups directory doesn't exist
"""
drop_elastic(app)
app.data.init_elastic(app)

path = app.config['ELASTICSEARCH_BACKUPS_PATH']
if path and os.path.exists(path):
run() # drop and init ones

backup = ('backups', 'snapshot_1')
indices = 'sptest_*'

elastic = app.data.elastic
elastic.es.snapshot.delete(*backup, ignore=[404])
elastic.es.snapshot.create(*backup, wait_for_completion=True, body={
'indices': indices,
'allow_no_indices': False,
})

def run():
"""
Just restore elasticsearch indices if backups directory exists
"""
elastic = app.data.elastic
elastic.es.indices.close('sptest_*', allow_no_indices=False)
elastic.es.snapshot.restore(*backup, body={
'indices': indices,
'allow_no_indices': False
}, wait_for_completion=True)

clean_es.run = run

try:
clean_es.run()
except socket.timeout as e:
logging.exception(e)
# Trying to get less failures by ES timeouts
count = getattr(clean_es, 'count_calls', 0)
if count < 3:
clean_es.count_calls = count + 1
clean_es(app, force)
use_snapshot(app, 'clean', [snapshot_es], force)(_clean_es)(app)


def snapshot(fn):
"""
Call create or restore snapshot function
"""
@functools.wraps(fn)
def inner(app, name, action, **kw):
assert action in ['create', 'restore']
create, restore = fn(app, name, **kw)
{'create': create, 'restore': restore}[action]()
return inner


@snapshot
def snapshot_es(app, name):
indices = '%s*' % app.config['ELASTICSEARCH_INDEX']
backup = ('backups', '%s%s' % (indices[:-1], name))
es = app.data.elastic.es

def create():
es.snapshot.delete(*backup, ignore=[404])
es.indices.open(indices, expand_wildcards='closed')
es.snapshot.create(*backup, wait_for_completion=True, body={
'indices': indices,
'allow_no_indices': False,
})

def restore():
es.indices.close(indices, expand_wildcards='open')
es.snapshot.restore(*backup, body={
'indices': indices,
'allow_no_indices': False
}, wait_for_completion=True)
return create, restore


@foreach_mongo
@snapshot
def snapshot_mongo(app, name, dbconn, dbname):
snapshot = '%s_%s' % (dbname, name)

def create():
dbconn.drop_database(snapshot)
dbconn.admin.command('copydb', fromdb=dbname, todb=snapshot)

def restore():
dbconn.drop_database(dbname)
dbconn.admin.command('copydb', fromdb=snapshot, todb=dbname)
return create, restore


def use_snapshot(app, name, funcs=(snapshot_es, snapshot_mongo), force=False):
def snapshot(action):
for f in funcs:
f(app, name, action=action)

def wrapper(fn):
path = app.config.get('ELASTICSEARCH_BACKUPS_PATH')
enabled = path and os.path.exists(path)

@functools.wraps(fn)
def inner(*a, **kw):
if not enabled or force:
logger.debug(
'Don\'t use snapshot for %s; enabled=%s; force=%s',
fn, enabled, force
)
use_snapshot.cache.pop(fn, None)
return fn(*a, **kw)

if fn in use_snapshot.cache:
snapshot('restore')
logger.debug('Restore snapshot for %s', fn)
else:
use_snapshot.cache[fn] = fn(*a, **kw)
snapshot('create')
logger.debug('Create snapshot for %s', fn)
return use_snapshot.cache[fn]
return inner
return wrapper
use_snapshot.cache = {}


def setup(context=None, config=None, app_factory=get_app, reset=False):
Expand Down Expand Up @@ -253,11 +334,10 @@ def setup_ad_user(context, user):
"""
ad_user = user or test_user

'''
This is necessary as test_user is in Global scope and del doc['password'] removes the key from test_user and
for the next scenario, auth_data = json.dumps({'username': ad_user['username'], 'password': ad_user['password']})
will fail as password key is removed by del doc['password']
'''
# This is necessary as test_user is in Global scope and del doc['password']
# removes the key from test_user and for the next scenario,
# auth_data = json.dumps({'username': ad_user['username'], 'password': ad_user['password']})
# will fail as password key is removed by del doc['password']
ad_user = ad_user.copy()
ad_user['email'] = 'mock@mail.com.au'

Expand Down
3 changes: 3 additions & 0 deletions superdesk/tests/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ def setup_before_scenario(context, scenario, config, app_factory):
if 'alchemy' in scenario.tags and not context.app.config.get('KEYWORDS_KEY_API'):
scenario.mark_skipped()

if 'clean_snapshots' in scenario.tags:
tests.use_snapshot.cache.clear()

setup_search_provider(context.app)

if scenario.status != 'skipped' and 'auth' in scenario.tags:
Expand Down
2 changes: 1 addition & 1 deletion superdesk/tests/test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def env(variable, fallback_value=None):
ELASTICSEARCH_INDEX = env('ELASTICSEARCH_INDEX', 'superdesk')
if env('ELASTIC_PORT'):
ELASTICSEARCH_URL = env('ELASTIC_PORT').replace('tcp:', 'http:')

ELASTICSEARCH_BACKUPS_PATH = env('ELASTICSEARCH_BACKUPS_PATH', '/tmp/es-backups')
ELASTICSEARCH_INDEXES = {
'archived': 'sptest_archived',
'archive': 'sptest_archive',
Expand Down
18 changes: 5 additions & 13 deletions tests/data_updates_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,18 @@ def setUp(self):
('MAIN_DATA_UPDATES_DIR', '/tmp/global_data_updates'),
)
for name, path in dirs:
# if folder exists, removes
shutil.rmtree(path, True)
# create new folder for tests
os.mkdir(path)
self.addCleanup(lambda path=path: shutil.rmtree(path))

def rm(path=path):
shutil.rmtree(path)
self.addCleanup(rm)

patcher = mock.patch.object(superdesk.commands.data_updates, name, path)
patcher = mock.patch('superdesk.commands.data_updates.%s' % name, path)
self.addCleanup(patcher.stop)
patcher.start()

# update the default implementation for `forwards` and `backwards` function
dirs = (
('DEFAULT_DATA_UPDATE_FW_IMPLEMENTATION', 'fw'),
('DEFAULT_DATA_UPDATE_BW_IMPLEMENTATION', 'bw'),
)
for m, p in dirs:
patcher = mock.patch('superdesk.commands.data_updates.%s' % m, 'pass')
for n in ('FW', 'BW'):
name = 'DEFAULT_DATA_UPDATE_%s_IMPLEMENTATION' % n
patcher = mock.patch('superdesk.commands.data_updates.%s' % name, 'pass')
self.addCleanup(patcher.stop)
patcher.start()

Expand Down

0 comments on commit e5a9804

Please sign in to comment.