Skip to content
This repository has been archived by the owner on Jul 16, 2021. It is now read-only.

Commit

Permalink
Merge pull request #389 from opentargets/mk-refactor
Browse files Browse the repository at this point in the history
Fix formatting, cleanup globalstats, fix evs writer performance
  • Loading branch information
afaulconbridge committed Dec 5, 2018
2 parents 6b0e969 + 268a545 commit 9f327c6
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 48 deletions.
2 changes: 1 addition & 1 deletion mrtarget/CommandLine.py
Expand Up @@ -292,7 +292,7 @@ def main():
input_files = [x.rstrip() for x in f.readlines()]

num_workers = Config.WORKERS_NUMBER
num_writers = min(1, max(16, Config.WORKERS_NUMBER))
num_writers = max(1, min(16, Config.WORKERS_NUMBER))
process_evidences_pipeline(filenames=input_files,
first_n=args.first_n,
es_client=connectors.es,
Expand Down
46 changes: 9 additions & 37 deletions mrtarget/common/EvidencesHelpers.py
Expand Up @@ -105,14 +105,15 @@ def put(self, line, **kwargs):
(left, right) = line
if right is not None:
self.kwargs.es_loader.put(body=right['line'], ID=right['hash'],
index_name=self.kwargs.index_name_validated,
doc_type=self.kwargs.doc_type_validated,
create_index=True)
index_name=self.kwargs.index_name_validated,
doc_type=self.kwargs.doc_type_validated,
create_index=True, auto_optimise=True)
elif left is not None:
self.kwargs.es_loader.put(body=serialise_object_to_json(left), ID=left['id'],
index_name=self.kwargs.index_name_invalidated,
doc_type=self.kwargs.doc_type_invalidated,
create_index=True)
index_name=self.kwargs.index_name_invalidated,
doc_type=self.kwargs.doc_type_invalidated,
create_index=True,
auto_optimise=True)

def __del__(self):
self.close()
Expand All @@ -125,7 +126,7 @@ def close(self):
pass


def open_writers_on_start(enable_output_to_es=False, output_folder='.', dry_run=False):
def open_writers_on_start(enable_output_to_es, output_folder, dry_run):
"""construct the processcontext to write lines to the files. we have to sets,
the good validated ones and the failed ones.
"""
Expand All @@ -137,6 +138,7 @@ def open_writers_on_start(enable_output_to_es=False, output_folder='.', dry_run=
else:
pc = ProcessContextFileWriter(output_folder=output_folder)

pc.logger.debug("called open_writers on_start from %s", str(os.getpid()))
return pc


Expand All @@ -160,36 +162,6 @@ def reduce_tuple_with_sum(iterable):
return functools.reduce(lambda x, y: (x[0] + y[0], x[1] + y[1]), iterable, (0, 0))


def emit_global_stats_from_evidence(ev):
"""generate a list of tuples containing the required information to map-reduce"""
def _strip_lit_id(lit_url):
return lit_url['lit_id'].split('/')[-1]

internal_ev = ev.evidence
ev_id = internal_ev['id']
ev_t = internal_ev['target']['id']
ev_d = internal_ev['disease']['id']
ids = []
r = []

if 'literature' in internal_ev and 'references' in internal_ev['literature']:
ids += list(itertools.imap(_strip_lit_id, iter(internal_ev['literature']['references'])))

if 'unique_experiment_reference' in internal_ev:
ids += [internal_ev['unique_experiment_reference']]

r += list(itertools.izip(itertools.cycle([ev_id]),
iter(ids),
itertools.cycle([ev_t]),
itertools.cycle([1])))

r += list(itertools.izip(itertools.cycle([ev_id]),
iter(ids),
itertools.cycle([ev_d]),
itertools.cycle([1])))
return r


def make_validated_evs_obj(filename, hash, line, line_n, is_valid=False, explanation_type='', explanation_str='',
target_id=None, efo_id=None, data_type=None, id=None):
return addict.Dict(is_valid=is_valid, explanation_type=explanation_type, explanation_str=explanation_str,
Expand Down
8 changes: 2 additions & 6 deletions mrtarget/modules/Evidences.py
@@ -1,6 +1,6 @@
import hashlib
import logging
import sys
import datetime
import os
import json
import pypeln.process as pr
Expand All @@ -17,8 +17,7 @@
from mrtarget.common.EvidenceJsonUtils import DatatStructureFlattener
from mrtarget.common.EvidencesHelpers import (ProcessContext, make_lookup_data,
make_validated_evs_obj, open_writers_on_start,
close_writers_on_done, reduce_tuple_with_sum,
emit_global_stats_from_evidence)
close_writers_on_done, reduce_tuple_with_sum)
from mrtarget.common.connection import new_redis_client, new_es_client, PipelineConnectors
from mrtarget.modules.EvidenceString import EvidenceManager, Evidence

Expand All @@ -32,7 +31,6 @@ def fix_and_score_evidence(validated_evs, process_context):

(fixed_ev, _) = process_context.kwargs.evidence_manager.fix_evidence(ev)

# TODO global stats is disabled at the moment
(is_valid, problem_str) = \
process_context.kwargs.evidence_manager.check_is_valid_evs(fixed_ev, datasource=fixed_ev.datasource)
if is_valid:
Expand All @@ -43,8 +41,6 @@ def fix_and_score_evidence(validated_evs, process_context):
fixed_ev_ext = process_context.kwargs.evidence_manager.get_extended_evidence(fixed_ev)
process_context.kwargs.evidence_manager.inject_loci(fixed_ev_ext)
validated_evs.is_valid = True
# TODO emit data for global stats
# validated_evs.global_stats = emit_global_stats_from_evidence(fixed_ev_ext)
validated_evs.line = fixed_ev_ext.to_json()
right = validated_evs

Expand Down
5 changes: 1 addition & 4 deletions requirements.txt
Expand Up @@ -16,13 +16,10 @@ SPARQLWrapper>=1.7.6
jsonpickle
simplejson

opentargets-validator>=0.3.0
opentargets-ontologyutils

#when installing from GitHub, a specific commit must be used for consistency
#and to ensure dependency caching works as intended
#git+https://github.com/opentargets/validator.git@711c098df9c82e66ff9b95412f4561c28a63b572#egg=opentargets_validator
opentargets-validator>=0.3.0
opentargets-validator>=0.4.0
opentargets-ontologyutils

numpy
Expand Down

0 comments on commit 9f327c6

Please sign in to comment.