Skip to content
This repository has been archived by the owner on Aug 9, 2024. It is now read-only.

Celery performance improvements #78

Merged
merged 2 commits into from
Aug 26, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ores/score_processors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from .score_processor import (ScoreProcessor, ScoreResult, SimpleScoreProcessor,
SimpleScoreResult)
from .score_processor import ScoreProcessor, SimpleScoreProcessor
from .timeout import Timeout
from .celery import Celery
113 changes: 84 additions & 29 deletions ores/score_processors/celery.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import logging

import celery
from celery.signals import before_task_publish

from ..score_caches import ScoreCache
from .score_processor import ScoreResult
from .timeout import timeout as timeout_func
from .timeout import Timeout

logger = logging.getLogger("ores.score_processors.celery")

APPLICATIONS = []

class CeleryTimeoutResult(ScoreResult):
@before_task_publish.connect
def update_sent_state(sender=None, body=None, **kwargs):

def __init__(self, async_result, timeout):
self.async_result = async_result
self.timeout = timeout
for application in APPLICATIONS:
task = application.tasks.get(sender)
backend = task.backend if task else application.backend

def get(self):
return self.async_result.get(timeout=self.timeout)
logger.debug("Setting state to 'SENT' for {0}".format(body['id']))
backend.store_result(body['id'], result=None, status="SENT")


class Celery(Timeout):
Expand All @@ -27,39 +29,93 @@ def __init__(self, *args, application, **kwargs):
self.application = application

@self.application.task
def _process(context, model, cache):
scoring_context = self[context]
score = scoring_context.score(model, cache)
return score
def _process_task(context, model, cache):
return Timeout._process(self, context, model, cache)

self._process = _process
@self.application.task
def _score_task(context, model, rev_id, cache=None):
return Timeout._score(self, context, model, rev_id, cache=cache)


APPLICATIONS.append(application)

self._process_task = _process_task

self._score_task = _score_task

def _score_in_celery(self, context, model, rev_ids, caches):
scores = {}
results = {}

if len(rev_ids) == 0:
return scores
if len(rev_ids) == 1: # Special case -- do everything in celery
rev_id = rev_ids.pop()
id_string = self._generate_id(context, model, rev_id)
cache = (caches or {}).get(rev_id, {})
result = self._score_task.apply_async(
args=(context, model, rev_id), kwargs={'cache': cache},
task_id=id_string
)
results[rev_id] = result
else: # Otherwise, try and batch
# Get the root datasources for the rest of the batch (IO)
root_ds_caches = self._get_root_ds(context, model, rev_ids,
caches=caches)

# Extract features and generate scores (CPU)
for rev_id, (error, cache) in root_ds_caches.items():
if error is not None:
scores[rev_id] = {
'error': {
'type': str(type(error)),
'message': str(error)
}
}
else:
id_string = self._generate_id(context, model, rev_id)
result = self._process_task.apply_async(
args=(context, model, cache),
task_id=id_string
)
results[rev_id] = result

# Process async results
for rev_id, result in results.items():
try:
score = result.get(self.timeout)
scores[rev_id] = score
self._store(context, model, rev_id, score)
except Exception as error:
scores[rev_id] = {
'error': {
'type': str(type(error)),
'message': str(error)
}
}

return scores

def _generate_id(self, context, model, rev_id):
scorer_model = self[context][model]
version = scorer_model.version

return ":".join(str(v) for v in [context, model, rev_id, version])

def process(self, context, model, rev_id, cache):
id_string = self._generate_id(context, model, rev_id)

result = self._process.apply_async(args=(context, model, cache),
task_id=id_string)
return CeleryTimeoutResult(result, self.timeout)

def score(self, context, model, rev_ids):
def score(self, context, model, rev_ids, caches=None):
rev_ids = set(rev_ids)

# Lookup scoring results that are currently in progress
results = self._lookup_inprogress_results(context, model, rev_ids)
missing_rev_ids = rev_ids - results.keys()
missing_ids = rev_ids - results.keys()

# Lookup scoring results that are in the cache
scores = self._lookup_cached_scores(context, model, missing_rev_ids)
missing_rev_ids = missing_rev_ids - scores.keys()
scores = self._lookup_cached_scores(context, model, missing_ids)
missing_ids = missing_ids - scores.keys()

# Generate scores for missing rev_ids
scores.update(self._score(context, model, missing_rev_ids))
scores.update(self._score_in_celery(context, model, missing_ids,
caches=caches))

# Gather results
for rev_id in results:
Expand All @@ -76,7 +132,6 @@ def score(self, context, model, rev_ids):
# Return scores
return scores


def _lookup_inprogress_results(self, context, model, rev_ids):
scorer_model = self[context][model]
version = scorer_model.version
Expand All @@ -94,10 +149,10 @@ def _lookup_inprogress_results(self, context, model, rev_ids):
def _get_result(self, id_string):

# Try to get an async_result for an in_progress task
logger.debug("Checking if {0} is already being processed"
.format(repr(id_string)))
result = self._process.AsyncResult(task_id=id_string)
if result.state not in ("STARTED", "SUCCESS"):
result = self._score_task.AsyncResult(task_id=id_string)
logger.debug("Checking if {0} is already being processed [{1}]"
.format(repr(id_string), result.state))
if result.state not in ("SENT", "STARTED", "SUCCESS"):
raise KeyError(id_string)
else:
logger.debug("Found AsyncResult for {0}".format(repr(id_string)))
Expand Down
144 changes: 67 additions & 77 deletions ores/score_processors/score_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,55 @@
logger = logging.getLogger("ores.score_processors.score_processor")


class ScoreResult():
def get(self, *args, **kwargs):
raise NotImplementedError()

class ScoreProcessor(dict):

def __init__(self, scoring_contexts, score_cache=None):
super().__init__()
self.update(scoring_contexts)
self.score_cache = score_cache or Empty()

def score(self, context, model, rev_ids, caches=None):
# Remove dupes and prepares for set differences
def _get_root_ds(self, context, model, rev_ids, caches=None):
"""
Pure IO. Batch extract root datasources for a set of features that the
model needs.
"""
rev_ids = set(rev_ids)
scoring_context = self[context]
return scoring_context.extract_roots(model, rev_ids, caches=caches)

# Lookup cached scores
scores = self._lookup_cached_scores(context, model, rev_ids)
missing_rev_ids = rev_ids - scores.keys()
def _process(self, context, model, cache):
"""
Pure CPU. Extract features from datasources in the cache and apply the
model to arrive at a score.
"""
scoring_context = self[context]
score = scoring_context.score(model, cache)
return score

# Generate scores for the rest
scores.update(self._score(context, model, missing_rev_ids,
caches=caches))
def _score(self, context, model, rev_id, cache=None):
"""
Both IO and CPU. Generates a single score or an error.
"""
error, process_cache = self._get_root_ds(context, model, [rev_id],
caches={rev_id: cache})[rev_id]

return scores
if error is not None:
raise error

def _lookup_cached_scores(self, context, model, rev_ids):
scores = {}
return self._process(context, model, process_cache)

def _store(self, context, model, rev_id, score):
scorer_model = self[context][model]
version = scorer_model.version

self.score_cache.store(context, model, rev_id, score, version=version)

# Lookup scores that are in the cache

def _lookup_cached_scores(self, context, model, rev_ids):
scorer_model = self[context][model]
version = scorer_model.version

scores = {}
for rev_id in rev_ids:
try:
score = self.score_cache.lookup(context, model, rev_id,
Expand All @@ -54,45 +71,6 @@ def _lookup_cached_scores(self, context, model, rev_ids):

return scores

def _score(self, context, model, rev_ids, caches=None):
scores = {}

# Batch extract root datasources for features of the missing ids
scoring_context = self[context]
root_ds_caches = scoring_context.extract_roots(model, rev_ids,
caches=caches)

# Process scores for each revision using the cached data
results = {}
for rev_id in rev_ids:
error, cache = root_ds_caches[rev_id]

if error is None:
results[rev_id] = self.process(context, model, rev_id, cache)
else:
scores[rev_id] = {
'error': {
'type': str(type(error)),
'message': str(error)
}
}

for rev_id in results:
try:
scores[rev_id] = results[rev_id].get()
except Exception as error:
scores[rev_id] = {
'error': {
'type': str(type(error)),
'message': str(error)
}
}

return scores

def process(self, context, model, rev_id, cache):
raise NotImplementedError()

@classmethod
def from_config(cls, config, name, section_key="score_processors"):
logger.info("Loading ScoreProcessor '{0}' from config.".format(name))
Expand All @@ -106,25 +84,37 @@ def from_config(cls, config, name, section_key="score_processors"):

class SimpleScoreProcessor(ScoreProcessor):

def _process(self, context, model, rev_id, cache):
scoring_context = self[context]
return scoring_context.score(model, cache)

def process(self, context, model, rev_id, cache):
try:
score = self._process(context, model, rev_id, cache)
return SimpleScoreResult(score=score)
except Exception as e:
return SimpleScoreResult(error=e)

class SimpleScoreResult(ScoreResult):

def __init__(self, *, score=None, error=None):
self.score = score
self.error = error

def get(self):
if self.error is not None:
raise self.error
else:
return self.score
def score(self, context, model, rev_ids, caches=None):
rev_ids = set(rev_ids)

# Look in the cache
scores = self._lookup_cached_scores(context, model, rev_ids)
missing_ids = rev_ids - scores.keys()

# Get the root datasources for the rest of the batch (IO)
root_ds_caches = self._get_root_ds(context, model, missing_ids,
caches=caches)

# Extract features and generate scores (CPU)
for rev_id, (error, cache) in root_ds_caches.items():
if error is not None:
scores[rev_id] = {
'error': {
'type': str(type(error)),
'message': str(error)
}
}
else:
try:
score = self._process(context, model, cache)
scores[rev_id] = score
self._store(context, model, rev_id, score)
except Exception as error:
scores[rev_id] = {
'error': {
'type': str(type(error)),
'message': str(error)
}
}

return scores
13 changes: 1 addition & 12 deletions ores/score_processors/tests/test_score_processor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,3 @@
from nose.tools import eq_, raises

from ..score_processor import SimpleScoreResult


def test_simple_score_result():
ssr = SimpleScoreResult(score=5)
eq_(ssr.get(), 5)


@raises(RuntimeError)
def test_simple_score_error():
ssr = SimpleScoreResult(error=RuntimeError())
eq_(ssr.get(), 5)
from ..score_processor import ScoreProcessor, SimpleScoreProcessor
6 changes: 3 additions & 3 deletions ores/score_processors/timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import stopit

from .score_processor import SimpleScoreProcessor, SimpleScoreResult
from .score_processor import SimpleScoreProcessor

logger = logging.getLogger("ores.score_processors.timeout")

Expand All @@ -15,8 +15,8 @@ def __init__(self, *args, timeout=None, **kwargs):
super().__init__(*args, **kwargs)
self.timeout = float(timeout) if timeout is not None else None

def _process(self, context, model, rev_id, cache):
return timeout(super()._process, context, model, rev_id, cache,
def _process(self, context, model, cache):
return timeout(super()._process, context, model, cache,
seconds=self.timeout)


Expand Down