Skip to content

Commit

Permalink
Control caching of analysis results
Browse files Browse the repository at this point in the history
  • Loading branch information
lukacu committed Apr 24, 2024
1 parent a6caa78 commit 4851e5f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
5 changes: 5 additions & 0 deletions vot/analysis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ def axes(self) -> Axes:
""" Returns axes semantic description for the result grid """
raise NotImplementedError()

@property
def cached(self) -> bool:
"""Returns whether the analysis should be cached."""
return True

def commit(self, experiment: Experiment, trackers: List[Tracker], sequences: List[Sequence]):
"""Commits the analysis for execution on default processor."""
return AnalysisProcessor.commit_default(self, experiment, trackers, sequences)
Expand Down
32 changes: 16 additions & 16 deletions vot/analysis/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,13 +491,8 @@ def __init__(self, key):
"""

super().__init__()
self._key = key

@property
def key(self):
"""Gets the key of the analysis."""
return self._key

self.key = key

def __repr__(self) -> str:
"""Gets a string representation of the future."""
return "<AnalysisFuture key={}>".format(self._key)
Expand Down Expand Up @@ -545,7 +540,7 @@ def commit(self, analysis: Analysis, experiment: Experiment,

promise = self._exists(key)

if not promise is None:
if not promise is None and analysis.cached:
return promise

promise = AnalysisFuture(key)
Expand All @@ -569,7 +564,7 @@ def select_dependencies(analysis: SeparableAnalysis, tracker: int, sequence: int
partkey = hashkey(analysis, experiment, unwrap(part.trackers), unwrap(part.sequences))

partpromise = self._exists(partkey)
if not partpromise is None:
if not partpromise is None and analysis.cached:
partpromises.append(partpromise)
continue

Expand All @@ -579,15 +574,18 @@ def select_dependencies(analysis: SeparableAnalysis, tracker: int, sequence: int
executorpromise = self._executor.submit(AnalysisPartTask(analysis, experiment, part.trackers, part.sequences), *dependencies,
mapping=partial(select_dependencies, analysis, part.tid, part.sid))
self._promises[partkey] = [partpromise]
executorpromise.cached = analysis.cached
self._pending[partkey] = executorpromise
executorpromise.add_done_callback(self._future_done)

executorpromise = self._executor.submit(AnalysisJoinTask(analysis, experiment, trackers, sequences),
*partpromises, mapping=lambda *x: [list(x)])
executorpromise.cached = analysis.cached
self._pending[key] = executorpromise
else:
task = AnalysisTask(analysis, experiment, trackers, sequences)
executorpromise = self._executor.submit(task, *dependencies, mapping=lambda *x: [list(x)])
executorpromise.cached = analysis.cached
self._pending[key] = executorpromise

self._promises[key] = [promise]
Expand Down Expand Up @@ -637,7 +635,8 @@ def _future_done(self, future: Future):

try:
result = future.result()
self._cache[key] = result
if self._cache is not None and getattr(future, "cached", False):
self._cache[key] = result
error = None
except (AnalysisError, RuntimeError) as e:
error = e
Expand Down Expand Up @@ -769,7 +768,7 @@ def default():
processor = getattr(AnalysisProcessor._context, 'analysis_processor', None)

if processor is None:
logger.warning("Default analysis processor not set for thread %s, using a simple one.", threading.current_thread().name)
logger.debug("Default analysis processor not set for thread %s, using a simple one.", threading.current_thread().name)
from vot.utilities import ThreadPoolExecutor
from cachetools import LRUCache
executor = ThreadPoolExecutor(1)
Expand Down Expand Up @@ -850,14 +849,14 @@ def insert(future: Future):
"""Inserts the result of a computation into a container."""
try:
container[key] = future.result()
except AnalysisError as e:
errors.append(e)
except Exception as e:
logger.exception(e)
errors.append(e)
with condition:
condition.notify()
return insert

if isinstance(trackers, Tracker): trackers = [trackers]

for experiment in workspace.stack:

logger.debug("Traversing experiment %s", experiment.identifier)
Expand Down Expand Up @@ -892,6 +891,7 @@ def insert(future: Future):

progress.absolute(processor.total - processor.pending)
if processor.pending == 0:
progress.absolute(processor.total)
break

with condition:
Expand All @@ -907,8 +907,8 @@ def insert(future: Future):
logger.info("Errors occured during analysis, incomplete.")
for e in errors:
logger.info("Failed task {}: {}".format(e.task, e.root_cause))
if logger.isEnabledFor(logging.DEBUG):
e.print(logger)
#if logger.isEnabledFor(logging.DEBUG):
# e.print(logger)
return None

return results

0 comments on commit 4851e5f

Please sign in to comment.