From e8321631084524bb4002a5d9134f21fd4ff02dd7 Mon Sep 17 00:00:00 2001 From: krassowski Date: Mon, 10 Jun 2019 17:05:38 +0100 Subject: [PATCH] Fix celery worker and enable back improved tests --- website/celery_worker.py | 2 + website/example_config.py | 2 +- website/helpers/pickle.py | 10 ++++ website/run_tests.py | 2 +- website/search/filters.py | 56 ++++++++++++++++++ website/search/task.py | 47 +++++++++++++++ website/tests/views/test_search.py | 38 +++++++++++- website/views/search.py | 94 ++---------------------------- 8 files changed, 158 insertions(+), 93 deletions(-) create mode 100644 website/helpers/pickle.py create mode 100644 website/search/filters.py create mode 100644 website/search/task.py diff --git a/website/celery_worker.py b/website/celery_worker.py index 67675f626..f6606620a 100644 --- a/website/celery_worker.py +++ b/website/celery_worker.py @@ -2,4 +2,6 @@ from app import create_app app = create_app(config_override={'HDB_READONLY': True, 'LOAD_VIEWS': False}) +from search.task import search_task + celery diff --git a/website/example_config.py b/website/example_config.py index 4aa09bea0..236021f24 100644 --- a/website/example_config.py +++ b/website/example_config.py @@ -81,7 +81,7 @@ CELERY_BROKER_URL = 'amqp://guest@localhost//' CELERY_RESULT_BACKEND = 'redis://' CELERY_TASK_SERIALIZER = 'auth' -CELERY_RESULT_SERIALIZER = 'pickle' +CELERY_RESULT_SERIALIZER = 'auth' CELERY_ACCEPT_CONTENT = ['auth'] CELERY_IGNORE_RESULT = False CELERY_SECURITY_KEY = '../celery/worker.key' diff --git a/website/helpers/pickle.py b/website/helpers/pickle.py new file mode 100644 index 000000000..26f60dda0 --- /dev/null +++ b/website/helpers/pickle.py @@ -0,0 +1,10 @@ +import codecs +import pickle + + +def pickle_as_str(obj): + return codecs.encode(pickle.dumps(obj, protocol=0), 'base64').decode() + + +def unpickle_str(text): + return pickle.loads(codecs.decode(text.encode(), 'base64')) diff --git a/website/run_tests.py b/website/run_tests.py index dfc4619e4..339a68ab7 100755 --- a/website/run_tests.py +++ b/website/run_tests.py @@ -1,6 +1,6 @@ if [ $# -eq 0 ] then - match="not test_import_results and not test_data and not celery" + match="not test_import_results and not test_data" else match="$1" fi diff --git a/website/search/filters.py b/website/search/filters.py new file mode 100644 index 000000000..4ae8509de --- /dev/null +++ b/website/search/filters.py @@ -0,0 +1,56 @@ +from flask import request + +from helpers.filters import FilterManager, Filter +from models import Mutation, Protein +from search.gene import search_feature_engines + +search_features = [engine.name for engine in search_feature_engines] + + +class Feature: + """Target class for feature filtering""" + pass + + +class Search: + pass + + +class SearchViewFilters(FilterManager): + + def __init__(self, **kwargs): + + available_features = search_features + active_features = set(available_features) - {'summary'} + + filters = [ + # Why default = False? Due to used widget: checkbox. + # It is not possible to distinguish between user not asking for + # all mutations (so sending nothing in post, since un-checking it + # will cause it to be skipped in the form) or user doing nothing + + # Why or? Take a look on table: + # is_ptm show all muts (by default only ptm) include? + # 0 0 0 + # 0 1 1 + # 1 0 1 + # 1 1 1 + Filter( + Mutation, 'is_ptm', comparators=['or'], + default=False + ), + Filter( + Protein, 'has_ptm_mutations', comparators=['eq'], + as_sqlalchemy=True + ), + Filter( + Feature, 'name', comparators=['in'], + default=list(active_features), + choices=available_features, + ), + Filter( + Search, 'query', comparators=['eq'], + ), + ] + super().__init__(filters) + self.update_from_request(request) diff --git a/website/search/task.py b/website/search/task.py new file mode 100644 index 000000000..dbe2bd827 --- /dev/null +++ b/website/search/task.py @@ -0,0 +1,47 @@ +import pickle +from typing import Dict + +from app import celery +from helpers.pickle import pickle_as_str, unpickle_str +from search.mutation import MutationSearch + +from search.filters import SearchViewFilters + + +class SearchTask: + + def __init__(self, vcf_file, textarea_query: str, filter_manager: SearchViewFilters, dataset_uri=None): + self.vcf_file = vcf_file + self.textarea_query = textarea_query + self.filter_manager = filter_manager + self.dataset_uri = dataset_uri + + def serialize(self) -> Dict: + return { + 'vcf_file': self.vcf_file, + 'textarea_query': self.textarea_query, + 'filter_manager': pickle_as_str(self.filter_manager), + 'dataset_uri': self.dataset_uri + } + + @classmethod + def from_serialized(cls, vcf_file, textarea_query, filter_manager, dataset_uri): + filter_manager = unpickle_str(filter_manager) + + if not isinstance(filter_manager, SearchViewFilters): + # print('This weird issue again... Retrying') + filter_manager = pickle.loads(filter_manager) + + return cls( + vcf_file, + textarea_query, + filter_manager, + dataset_uri + ) + + +@celery.task +def search_task(task_data): + task = SearchTask.from_serialized(**task_data) + mutation_search = MutationSearch(task.vcf_file, task.textarea_query, task.filter_manager) + return pickle_as_str(mutation_search), task.dataset_uri diff --git a/website/tests/views/test_search.py b/website/tests/views/test_search.py index 3047c0cee..964808587 100644 --- a/website/tests/views/test_search.py +++ b/website/tests/views/test_search.py @@ -1,5 +1,6 @@ import re from io import BytesIO +from time import sleep from view_testing import ViewTest, relative_location from miscellaneous import mock_proteins_and_genes @@ -57,6 +58,15 @@ def entries_with_type(response, type_name): class TestSearchView(ViewTest): + def test_search_task_serialization(self): + from search.task import SearchTask + from search.filters import SearchViewFilters + filters = SearchViewFilters() + task = SearchTask(vcf_file='', textarea_query='', filter_manager=filters) + serialized = task.serialize() + task_recreated = SearchTask.from_serialized(**serialized) + assert isinstance(task_recreated.filter_manager, SearchViewFilters) + def view_module(self): from website.views import search return search @@ -401,6 +411,9 @@ def test_save_search(self): assert unauthorized_save_response.status_code == 200 def test_save_search_with_celery(self): + """Assumes that fully configured celery worker is available and running, + + together with a broker and results backends.""" self.app.config['USE_CELERY'] = True self.login('user@domain.org', 'password', create=True) @@ -421,9 +434,28 @@ def test_save_search_with_celery(self): assert raw_response.json['status'] == 'PENDING' assert raw_response.json['progress'] == 0 - # TODO: this test needs to include loading and final redirection - # attempts made so far failed, but pytest celery_session_worker - # and CELERY_TASK_ALWAYS_EAGER are the most promising ideas + remaining_trials = 100 + + last_status = None + last_progress = None + + while remaining_trials != 0: + raw_response = self.client.get( + location.replace('/progress/', '/raw_progress/'), + follow_redirects=True + ) + + last_status = raw_response.json['status'] + last_progress = raw_response.json['progress'] + + if last_status == 'SUCCESS': + break + + sleep(0.1) + remaining_trials -= 1 + + assert last_status == 'SUCCESS' + assert last_progress == 10000 self.app.config['USE_CELERY'] = False diff --git a/website/views/search.py b/website/views/search.py index 362988a8a..29589c8c5 100644 --- a/website/views/search.py +++ b/website/views/search.py @@ -1,7 +1,5 @@ -import codecs import pickle from collections import defaultdict -from typing import Dict from urllib.parse import unquote from flask import make_response, redirect, abort @@ -24,22 +22,21 @@ OrderedDict, List, ) +from search.filters import SearchViewFilters from search.mutation import MutationSearch from models import Gene from models import Mutation from models import UsersMutationsDataset from sqlalchemy import exists, or_, text -from helpers.filters.manager import quote_if_needed, FilterManager -from helpers.filters import Filter +from helpers.filters.manager import quote_if_needed from helpers.widgets import FilterWidget from search.mutation_result import SearchResult +from search.task import SearchTask, search_task from views.gene import prepare_subqueries from search.protein_mutations import get_protein_muts from database import db, levenshtein_sorted, bdb from search.gene import GeneMatch, search_feature_engines -search_features = [engine.name for engine in search_feature_engines] - def create_engines(options=None): engines = OrderedDict() @@ -122,55 +119,6 @@ def search_proteins( return results[:limit] -class Feature: - """Target class for feature filtering""" - pass - - -class Search: - pass - - -class SearchViewFilters(FilterManager): - - def __init__(self, **kwargs): - - available_features = search_features - active_features = set(available_features) - {'summary'} - - filters = [ - # Why default = False? Due to used widget: checkbox. - # It is not possible to distinguish between user not asking for - # all mutations (so sending nothing in post, since un-checking it - # will cause it to be skipped in the form) or user doing nothing - - # Why or? Take a look on table: - # is_ptm show all muts (by default only ptm) include? - # 0 0 0 - # 0 1 1 - # 1 0 1 - # 1 1 1 - Filter( - Mutation, 'is_ptm', comparators=['or'], - default=False - ), - Filter( - Protein, 'has_ptm_mutations', comparators=['eq'], - as_sqlalchemy=True - ), - Filter( - Feature, 'name', comparators=['in'], - default=list(active_features), - choices=available_features, - ), - Filter( - Search, 'query', comparators=['eq'], - ), - ] - super().__init__(filters) - self.update_from_request(request) - - def make_widgets(filter_manager): return { 'proteins': { @@ -202,39 +150,6 @@ def make_widgets(filter_manager): } -class SearchTask: - - def __init__(self, vcf_file, textarea_query: str, filter_manager: SearchViewFilters, dataset_uri=None): - self.vcf_file = vcf_file - self.textarea_query = textarea_query - self.filter_manager = filter_manager - self.dataset_uri = dataset_uri - - def serialize(self) -> Dict: - return { - 'vcf_file': self.vcf_file, - 'textarea_query': self.textarea_query, - 'filter_manager': codecs.encode(pickle.dumps(self.filter_manager), 'base64').decode(), - 'dataset_uri': self.dataset_uri - } - - @classmethod - def from_serialized(cls, vcf_file, textarea_query, filter_manager, dataset_uri): - return cls( - vcf_file, - textarea_query, - pickle.loads(codecs.decode(filter_manager.encode(), 'base64')), - dataset_uri - ) - - -@celery.task -def search_task(task_data): - task = SearchTask.from_serialized(**task_data) - mutation_search = MutationSearch(task.vcf_file, task.textarea_query, task.filter_manager) - return mutation_search, task.dataset_uri - - class SearchView(FlaskView): """Enables searching in any of registered database models.""" @@ -408,6 +323,9 @@ def mutations(self): return redirect(url_for('SearchView:mutations')) mutation_search, dataset_uri = celery_task.result + from helpers.pickle import unpickle_str + mutation_search = unpickle_str(mutation_search) + if dataset_uri: url = url_for( 'SearchView:user_mutations',