diff --git a/neptune/new/exceptions.py b/neptune/new/exceptions.py index d3775a49b..18828ed6c 100644 --- a/neptune/new/exceptions.py +++ b/neptune/new/exceptions.py @@ -155,6 +155,29 @@ def __init__(self, run_uuid: uuid.UUID): super().__init__("Run with UUID {} not found. Could be deleted.".format(run_uuid)) +class InactiveRunException(NeptuneException): + def __init__(self, short_id: str): + message = """ +{h1} +----InactiveRunException---------------------------------------- +{end} +It seems you are trying to log (or fetch) metadata to a run that was stopped ({short_id}). +What should I do? + - Resume the run to continue logging to it: + https://docs.neptune.ai/how-to-guides/neptune-api/resume-run#how-to-resume-run + - Don't invoke `stop()` on a run that you want to access. If you want to stop monitoring only, + you can resume a run in read-only mode: + https://docs.neptune.ai/you-should-know/connection-modes#read-only +You may also want to check the following docs pages: + - https://docs.neptune.ai/api-reference/run#stop + - https://docs.neptune.ai/how-to-guides/neptune-api/resume-run#how-to-resume-run + - https://docs.neptune.ai/you-should-know/connection-modes +{correct}Need help?{end}-> https://docs.neptune.ai/getting-started/getting-help +""" + inputs = dict(list({'short_id': short_id}.items()) + list(STYLES.items())) + super().__init__(message.format(**inputs)) + + class NeptuneMissingProjectNameException(NeptuneException): def __init__(self): message = """ diff --git a/neptune/new/integrations/python_logger.py b/neptune/new/integrations/python_logger.py index be9e496ad..e2eaae957 100644 --- a/neptune/new/integrations/python_logger.py +++ b/neptune/new/integrations/python_logger.py @@ -19,6 +19,7 @@ from neptune.new import Run from neptune.new.internal.utils import verify_type from neptune.new.logging import Logger +from neptune.new.run import RunState class NeptuneHandler(logging.Handler): @@ -64,7 +65,7 @@ def emit(self, record: logging.LogRecord) -> None: if not hasattr(self._thread_local, "inside_write"): self._thread_local.inside_write = False - if self._run._started and not self._thread_local.inside_write: # pylint: disable=protected-access + if self._run._state == RunState.STARTED and not self._thread_local.inside_write: # pylint: disable=protected-access try: self._thread_local.inside_write = True message = self.format(record) diff --git a/neptune/new/run.py b/neptune/new/run.py index 513a4bc82..36458ee23 100644 --- a/neptune/new/run.py +++ b/neptune/new/run.py @@ -20,6 +20,8 @@ import uuid from contextlib import AbstractContextManager from datetime import datetime +from enum import Enum +from functools import wraps from typing import Any, Dict, List, Optional, Union import click @@ -28,7 +30,7 @@ from neptune.new.attributes import attribute_type_to_atom from neptune.new.attributes.attribute import Attribute from neptune.new.attributes.namespace import NamespaceBuilder, Namespace as NamespaceAttr -from neptune.new.exceptions import MetadataInconsistency, NeptuneException +from neptune.new.exceptions import MetadataInconsistency, NeptuneException, InactiveRunException from neptune.new.handler import Handler from neptune.new.internal.backends.api_model import AttributeType from neptune.new.internal.backends.neptune_backend import NeptuneBackend @@ -49,6 +51,24 @@ from neptune.new.types.value import Value +class RunState(Enum): + CREATED = 'created' + STARTED = 'started' + STOPPING = 'stopping' + STOPPED = 'stopped' + + +def assure_run_not_stopped(fun): + @wraps(fun) + def inner_fun(self, *args, **kwargs): + # pylint: disable=protected-access + if self._state == RunState.STOPPED: + raise InactiveRunException(short_id=self._short_id) + return fun(self, *args, **kwargs) + + return inner_fun + + class Run(AbstractContextManager): """A Run in Neptune is a representation of all metadata that you log to Neptune. @@ -110,7 +130,7 @@ def __init__( self._bg_job = background_job self._structure: RunStructure[Attribute, NamespaceAttr] = RunStructure(NamespaceBuilder(self)) self._lock = threading.RLock() - self._started = False + self._state = RunState.CREATED self._workspace = workspace self._project_name = project_name self._short_id = short_id @@ -122,15 +142,19 @@ def __exit__(self, exc_type, exc_val, exc_tb): traceback.print_exception(exc_type, exc_val, exc_tb) self.stop() + @assure_run_not_stopped def __getitem__(self, path: str) -> 'Handler': return Handler(self, path) + @assure_run_not_stopped def __setitem__(self, key: str, value) -> None: self.__getitem__(key).assign(value) + @assure_run_not_stopped def __delitem__(self, path) -> None: self.pop(path) + @assure_run_not_stopped def assign(self, value, wait: bool = False) -> None: """Assign values to multiple fields from a dictionary. You can use this method to quickly log all run's parameters. @@ -165,6 +189,7 @@ def assign(self, value, wait: bool = False) -> None: """ self._get_root_handler().assign(value, wait) + @assure_run_not_stopped def fetch(self) -> dict: """Fetch values of all non-File Atom fields as a dictionary. The result will preserve the hierarchical structure of the run's metadata, but will contain only non-File Atom @@ -198,7 +223,7 @@ def start(self): atexit.register(self._shutdown_hook) self._op_processor.start() self._bg_job.start(self) - self._started = True + self._state = RunState.STARTED def stop(self, seconds: Optional[Union[float, int]] = None) -> None: """Stops the tracked run and kills the synchronization thread. @@ -258,9 +283,10 @@ def stop(self, seconds: Optional[Union[float, int]] = None) -> None: https://docs.neptune.ai/api-reference/run#stop """ verify_type("seconds", seconds, (float, int, type(None))) - if not self._started: + if self._state != RunState.STARTED: return - self._started = False + + self._state = RunState.STOPPING ts = time.time() click.echo(f"Shutting down background jobs, please wait a moment...") self._bg_job.stop() @@ -269,6 +295,7 @@ def stop(self, seconds: Optional[Union[float, int]] = None) -> None: with self._lock: sec_left = None if seconds is None else seconds - (time.time() - ts) self._op_processor.stop(sec_left) + self._state = RunState.STOPPED def get_structure(self) -> Dict[str, Any]: """Returns a run's metadata structure in form of a dictionary. diff --git a/tests/neptune/new/test_run.py b/tests/neptune/new/test_run.py index 4e3e88a4f..b046b45a5 100644 --- a/tests/neptune/new/test_run.py +++ b/tests/neptune/new/test_run.py @@ -19,7 +19,7 @@ from neptune.new import ANONYMOUS, init from neptune.new.envs import API_TOKEN_ENV_NAME, PROJECT_ENV_NAME -from neptune.new.exceptions import MetadataInconsistency +from neptune.new.exceptions import MetadataInconsistency, InactiveRunException from neptune.new.types.atoms.float import Float from neptune.new.types.atoms.string import String from neptune.new.types.series import FloatSeries, StringSeries @@ -113,3 +113,16 @@ def test_assign_false(self): exp["params"] = {'predictor.cheat': False} self.assertFalse(exp["params/predictor.cheat"].fetch()) + + def test_access_blocked_after_stop(self): + exp = init(mode="debug") + exp['attr1'] = 1 + + exp.stop() + + with self.assertRaises(InactiveRunException): + exp['attr1'].fetch() + with self.assertRaises(InactiveRunException): + exp['attr2'] = 2 + with self.assertRaises(InactiveRunException): + exp['series'].log(1)