Skip to content

Commit

Permalink
Jk/npt 9386 prevent logging after stop (#602)
Browse files Browse the repository at this point in the history
  • Loading branch information
shnela committed Jun 21, 2021
1 parent 02565ea commit 85bdf17
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 7 deletions.
23 changes: 23 additions & 0 deletions neptune/new/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,29 @@ def __init__(self, run_uuid: uuid.UUID):
super().__init__("Run with UUID {} not found. Could be deleted.".format(run_uuid))


class InactiveRunException(NeptuneException):
def __init__(self, short_id: str):
message = """
{h1}
----InactiveRunException----------------------------------------
{end}
It seems you are trying to log (or fetch) metadata to a run that was stopped ({short_id}).
What should I do?
- Resume the run to continue logging to it:
https://docs.neptune.ai/how-to-guides/neptune-api/resume-run#how-to-resume-run
- Don't invoke `stop()` on a run that you want to access. If you want to stop monitoring only,
you can resume a run in read-only mode:
https://docs.neptune.ai/you-should-know/connection-modes#read-only
You may also want to check the following docs pages:
- https://docs.neptune.ai/api-reference/run#stop
- https://docs.neptune.ai/how-to-guides/neptune-api/resume-run#how-to-resume-run
- https://docs.neptune.ai/you-should-know/connection-modes
{correct}Need help?{end}-> https://docs.neptune.ai/getting-started/getting-help
"""
inputs = dict(list({'short_id': short_id}.items()) + list(STYLES.items()))
super().__init__(message.format(**inputs))


class NeptuneMissingProjectNameException(NeptuneException):
def __init__(self):
message = """
Expand Down
3 changes: 2 additions & 1 deletion neptune/new/integrations/python_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from neptune.new import Run
from neptune.new.internal.utils import verify_type
from neptune.new.logging import Logger
from neptune.new.run import RunState


class NeptuneHandler(logging.Handler):
Expand Down Expand Up @@ -64,7 +65,7 @@ def emit(self, record: logging.LogRecord) -> None:
if not hasattr(self._thread_local, "inside_write"):
self._thread_local.inside_write = False

if self._run._started and not self._thread_local.inside_write: # pylint: disable=protected-access
if self._run._state == RunState.STARTED and not self._thread_local.inside_write: # pylint: disable=protected-access
try:
self._thread_local.inside_write = True
message = self.format(record)
Expand Down
37 changes: 32 additions & 5 deletions neptune/new/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import uuid
from contextlib import AbstractContextManager
from datetime import datetime
from enum import Enum
from functools import wraps
from typing import Any, Dict, List, Optional, Union

import click
Expand All @@ -28,7 +30,7 @@
from neptune.new.attributes import attribute_type_to_atom
from neptune.new.attributes.attribute import Attribute
from neptune.new.attributes.namespace import NamespaceBuilder, Namespace as NamespaceAttr
from neptune.new.exceptions import MetadataInconsistency, NeptuneException
from neptune.new.exceptions import MetadataInconsistency, NeptuneException, InactiveRunException
from neptune.new.handler import Handler
from neptune.new.internal.backends.api_model import AttributeType
from neptune.new.internal.backends.neptune_backend import NeptuneBackend
Expand All @@ -49,6 +51,24 @@
from neptune.new.types.value import Value


class RunState(Enum):
CREATED = 'created'
STARTED = 'started'
STOPPING = 'stopping'
STOPPED = 'stopped'


def assure_run_not_stopped(fun):
@wraps(fun)
def inner_fun(self, *args, **kwargs):
# pylint: disable=protected-access
if self._state == RunState.STOPPED:
raise InactiveRunException(short_id=self._short_id)
return fun(self, *args, **kwargs)

return inner_fun


class Run(AbstractContextManager):
"""A Run in Neptune is a representation of all metadata that you log to Neptune.
Expand Down Expand Up @@ -110,7 +130,7 @@ def __init__(
self._bg_job = background_job
self._structure: RunStructure[Attribute, NamespaceAttr] = RunStructure(NamespaceBuilder(self))
self._lock = threading.RLock()
self._started = False
self._state = RunState.CREATED
self._workspace = workspace
self._project_name = project_name
self._short_id = short_id
Expand All @@ -122,15 +142,19 @@ def __exit__(self, exc_type, exc_val, exc_tb):
traceback.print_exception(exc_type, exc_val, exc_tb)
self.stop()

@assure_run_not_stopped
def __getitem__(self, path: str) -> 'Handler':
return Handler(self, path)

@assure_run_not_stopped
def __setitem__(self, key: str, value) -> None:
self.__getitem__(key).assign(value)

@assure_run_not_stopped
def __delitem__(self, path) -> None:
self.pop(path)

@assure_run_not_stopped
def assign(self, value, wait: bool = False) -> None:
"""Assign values to multiple fields from a dictionary.
You can use this method to quickly log all run's parameters.
Expand Down Expand Up @@ -165,6 +189,7 @@ def assign(self, value, wait: bool = False) -> None:
"""
self._get_root_handler().assign(value, wait)

@assure_run_not_stopped
def fetch(self) -> dict:
"""Fetch values of all non-File Atom fields as a dictionary.
The result will preserve the hierarchical structure of the run's metadata, but will contain only non-File Atom
Expand Down Expand Up @@ -198,7 +223,7 @@ def start(self):
atexit.register(self._shutdown_hook)
self._op_processor.start()
self._bg_job.start(self)
self._started = True
self._state = RunState.STARTED

def stop(self, seconds: Optional[Union[float, int]] = None) -> None:
"""Stops the tracked run and kills the synchronization thread.
Expand Down Expand Up @@ -258,9 +283,10 @@ def stop(self, seconds: Optional[Union[float, int]] = None) -> None:
https://docs.neptune.ai/api-reference/run#stop
"""
verify_type("seconds", seconds, (float, int, type(None)))
if not self._started:
if self._state != RunState.STARTED:
return
self._started = False

self._state = RunState.STOPPING
ts = time.time()
click.echo(f"Shutting down background jobs, please wait a moment...")
self._bg_job.stop()
Expand All @@ -269,6 +295,7 @@ def stop(self, seconds: Optional[Union[float, int]] = None) -> None:
with self._lock:
sec_left = None if seconds is None else seconds - (time.time() - ts)
self._op_processor.stop(sec_left)
self._state = RunState.STOPPED

def get_structure(self) -> Dict[str, Any]:
"""Returns a run's metadata structure in form of a dictionary.
Expand Down
15 changes: 14 additions & 1 deletion tests/neptune/new/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from neptune.new import ANONYMOUS, init
from neptune.new.envs import API_TOKEN_ENV_NAME, PROJECT_ENV_NAME
from neptune.new.exceptions import MetadataInconsistency
from neptune.new.exceptions import MetadataInconsistency, InactiveRunException
from neptune.new.types.atoms.float import Float
from neptune.new.types.atoms.string import String
from neptune.new.types.series import FloatSeries, StringSeries
Expand Down Expand Up @@ -113,3 +113,16 @@ def test_assign_false(self):
exp["params"] = {'predictor.cheat': False}

self.assertFalse(exp["params/predictor.cheat"].fetch())

def test_access_blocked_after_stop(self):
exp = init(mode="debug")
exp['attr1'] = 1

exp.stop()

with self.assertRaises(InactiveRunException):
exp['attr1'].fetch()
with self.assertRaises(InactiveRunException):
exp['attr2'] = 2
with self.assertRaises(InactiveRunException):
exp['series'].log(1)

0 comments on commit 85bdf17

Please sign in to comment.