Skip to content

Commit

Permalink
Fix celery worker and enable back improved tests
Browse files Browse the repository at this point in the history
  • Loading branch information
krassowski committed Jun 10, 2019
1 parent 1444c00 commit e832163
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 93 deletions.
2 changes: 2 additions & 0 deletions website/celery_worker.py
Expand Up @@ -2,4 +2,6 @@
from app import create_app

app = create_app(config_override={'HDB_READONLY': True, 'LOAD_VIEWS': False})
from search.task import search_task

celery
2 changes: 1 addition & 1 deletion website/example_config.py
Expand Up @@ -81,7 +81,7 @@
CELERY_BROKER_URL = 'amqp://guest@localhost//'
CELERY_RESULT_BACKEND = 'redis://'
CELERY_TASK_SERIALIZER = 'auth'
CELERY_RESULT_SERIALIZER = 'pickle'
CELERY_RESULT_SERIALIZER = 'auth'
CELERY_ACCEPT_CONTENT = ['auth']
CELERY_IGNORE_RESULT = False
CELERY_SECURITY_KEY = '../celery/worker.key'
Expand Down
10 changes: 10 additions & 0 deletions website/helpers/pickle.py
@@ -0,0 +1,10 @@
import codecs
import pickle


def pickle_as_str(obj):
return codecs.encode(pickle.dumps(obj, protocol=0), 'base64').decode()


def unpickle_str(text):
return pickle.loads(codecs.decode(text.encode(), 'base64'))
2 changes: 1 addition & 1 deletion website/run_tests.py
@@ -1,6 +1,6 @@
if [ $# -eq 0 ]
then
match="not test_import_results and not test_data and not celery"
match="not test_import_results and not test_data"
else
match="$1"
fi
Expand Down
56 changes: 56 additions & 0 deletions website/search/filters.py
@@ -0,0 +1,56 @@
from flask import request

from helpers.filters import FilterManager, Filter
from models import Mutation, Protein
from search.gene import search_feature_engines

search_features = [engine.name for engine in search_feature_engines]


class Feature:
"""Target class for feature filtering"""
pass


class Search:
pass


class SearchViewFilters(FilterManager):

def __init__(self, **kwargs):

available_features = search_features
active_features = set(available_features) - {'summary'}

filters = [
# Why default = False? Due to used widget: checkbox.
# It is not possible to distinguish between user not asking for
# all mutations (so sending nothing in post, since un-checking it
# will cause it to be skipped in the form) or user doing nothing

# Why or? Take a look on table:
# is_ptm show all muts (by default only ptm) include?
# 0 0 0
# 0 1 1
# 1 0 1
# 1 1 1
Filter(
Mutation, 'is_ptm', comparators=['or'],
default=False
),
Filter(
Protein, 'has_ptm_mutations', comparators=['eq'],
as_sqlalchemy=True
),
Filter(
Feature, 'name', comparators=['in'],
default=list(active_features),
choices=available_features,
),
Filter(
Search, 'query', comparators=['eq'],
),
]
super().__init__(filters)
self.update_from_request(request)
47 changes: 47 additions & 0 deletions website/search/task.py
@@ -0,0 +1,47 @@
import pickle
from typing import Dict

from app import celery
from helpers.pickle import pickle_as_str, unpickle_str
from search.mutation import MutationSearch

from search.filters import SearchViewFilters


class SearchTask:

def __init__(self, vcf_file, textarea_query: str, filter_manager: SearchViewFilters, dataset_uri=None):
self.vcf_file = vcf_file
self.textarea_query = textarea_query
self.filter_manager = filter_manager
self.dataset_uri = dataset_uri

def serialize(self) -> Dict:
return {
'vcf_file': self.vcf_file,
'textarea_query': self.textarea_query,
'filter_manager': pickle_as_str(self.filter_manager),
'dataset_uri': self.dataset_uri
}

@classmethod
def from_serialized(cls, vcf_file, textarea_query, filter_manager, dataset_uri):
filter_manager = unpickle_str(filter_manager)

if not isinstance(filter_manager, SearchViewFilters):
# print('This weird issue again... Retrying')
filter_manager = pickle.loads(filter_manager)

return cls(
vcf_file,
textarea_query,
filter_manager,
dataset_uri
)


@celery.task
def search_task(task_data):
task = SearchTask.from_serialized(**task_data)
mutation_search = MutationSearch(task.vcf_file, task.textarea_query, task.filter_manager)
return pickle_as_str(mutation_search), task.dataset_uri
38 changes: 35 additions & 3 deletions website/tests/views/test_search.py
@@ -1,5 +1,6 @@
import re
from io import BytesIO
from time import sleep

from view_testing import ViewTest, relative_location
from miscellaneous import mock_proteins_and_genes
Expand Down Expand Up @@ -57,6 +58,15 @@ def entries_with_type(response, type_name):

class TestSearchView(ViewTest):

def test_search_task_serialization(self):
from search.task import SearchTask
from search.filters import SearchViewFilters
filters = SearchViewFilters()
task = SearchTask(vcf_file='', textarea_query='', filter_manager=filters)
serialized = task.serialize()
task_recreated = SearchTask.from_serialized(**serialized)
assert isinstance(task_recreated.filter_manager, SearchViewFilters)

def view_module(self):
from website.views import search
return search
Expand Down Expand Up @@ -401,6 +411,9 @@ def test_save_search(self):
assert unauthorized_save_response.status_code == 200

def test_save_search_with_celery(self):
"""Assumes that fully configured celery worker is available and running,
together with a broker and results backends."""
self.app.config['USE_CELERY'] = True
self.login('user@domain.org', 'password', create=True)

Expand All @@ -421,9 +434,28 @@ def test_save_search_with_celery(self):
assert raw_response.json['status'] == 'PENDING'
assert raw_response.json['progress'] == 0

# TODO: this test needs to include loading and final redirection
# attempts made so far failed, but pytest celery_session_worker
# and CELERY_TASK_ALWAYS_EAGER are the most promising ideas
remaining_trials = 100

last_status = None
last_progress = None

while remaining_trials != 0:
raw_response = self.client.get(
location.replace('/progress/', '/raw_progress/'),
follow_redirects=True
)

last_status = raw_response.json['status']
last_progress = raw_response.json['progress']

if last_status == 'SUCCESS':
break

sleep(0.1)
remaining_trials -= 1

assert last_status == 'SUCCESS'
assert last_progress == 10000

self.app.config['USE_CELERY'] = False

Expand Down
94 changes: 6 additions & 88 deletions website/views/search.py
@@ -1,7 +1,5 @@
import codecs
import pickle
from collections import defaultdict
from typing import Dict
from urllib.parse import unquote

from flask import make_response, redirect, abort
Expand All @@ -24,22 +22,21 @@
OrderedDict,
List,
)
from search.filters import SearchViewFilters
from search.mutation import MutationSearch
from models import Gene
from models import Mutation
from models import UsersMutationsDataset
from sqlalchemy import exists, or_, text
from helpers.filters.manager import quote_if_needed, FilterManager
from helpers.filters import Filter
from helpers.filters.manager import quote_if_needed
from helpers.widgets import FilterWidget
from search.mutation_result import SearchResult
from search.task import SearchTask, search_task
from views.gene import prepare_subqueries
from search.protein_mutations import get_protein_muts
from database import db, levenshtein_sorted, bdb
from search.gene import GeneMatch, search_feature_engines

search_features = [engine.name for engine in search_feature_engines]


def create_engines(options=None):
engines = OrderedDict()
Expand Down Expand Up @@ -122,55 +119,6 @@ def search_proteins(
return results[:limit]


class Feature:
"""Target class for feature filtering"""
pass


class Search:
pass


class SearchViewFilters(FilterManager):

def __init__(self, **kwargs):

available_features = search_features
active_features = set(available_features) - {'summary'}

filters = [
# Why default = False? Due to used widget: checkbox.
# It is not possible to distinguish between user not asking for
# all mutations (so sending nothing in post, since un-checking it
# will cause it to be skipped in the form) or user doing nothing

# Why or? Take a look on table:
# is_ptm show all muts (by default only ptm) include?
# 0 0 0
# 0 1 1
# 1 0 1
# 1 1 1
Filter(
Mutation, 'is_ptm', comparators=['or'],
default=False
),
Filter(
Protein, 'has_ptm_mutations', comparators=['eq'],
as_sqlalchemy=True
),
Filter(
Feature, 'name', comparators=['in'],
default=list(active_features),
choices=available_features,
),
Filter(
Search, 'query', comparators=['eq'],
),
]
super().__init__(filters)
self.update_from_request(request)


def make_widgets(filter_manager):
return {
'proteins': {
Expand Down Expand Up @@ -202,39 +150,6 @@ def make_widgets(filter_manager):
}


class SearchTask:

def __init__(self, vcf_file, textarea_query: str, filter_manager: SearchViewFilters, dataset_uri=None):
self.vcf_file = vcf_file
self.textarea_query = textarea_query
self.filter_manager = filter_manager
self.dataset_uri = dataset_uri

def serialize(self) -> Dict:
return {
'vcf_file': self.vcf_file,
'textarea_query': self.textarea_query,
'filter_manager': codecs.encode(pickle.dumps(self.filter_manager), 'base64').decode(),
'dataset_uri': self.dataset_uri
}

@classmethod
def from_serialized(cls, vcf_file, textarea_query, filter_manager, dataset_uri):
return cls(
vcf_file,
textarea_query,
pickle.loads(codecs.decode(filter_manager.encode(), 'base64')),
dataset_uri
)


@celery.task
def search_task(task_data):
task = SearchTask.from_serialized(**task_data)
mutation_search = MutationSearch(task.vcf_file, task.textarea_query, task.filter_manager)
return mutation_search, task.dataset_uri


class SearchView(FlaskView):
"""Enables searching in any of registered database models."""

Expand Down Expand Up @@ -408,6 +323,9 @@ def mutations(self):
return redirect(url_for('SearchView:mutations'))
mutation_search, dataset_uri = celery_task.result

from helpers.pickle import unpickle_str
mutation_search = unpickle_str(mutation_search)

if dataset_uri:
url = url_for(
'SearchView:user_mutations',
Expand Down

0 comments on commit e832163

Please sign in to comment.