Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jk/npt 9386 prevent logging after stop #602

Merged
merged 4 commits into from
Jun 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 23 additions & 0 deletions neptune/new/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,29 @@ def __init__(self, run_uuid: uuid.UUID):
super().__init__("Run with UUID {} not found. Could be deleted.".format(run_uuid))


class InactiveRunException(NeptuneException):
def __init__(self, short_id: str):
message = """
{h1}
----InactiveRunException----------------------------------------
pkasprzyk marked this conversation as resolved.
Show resolved Hide resolved
{end}
It seems you are trying to log (or fetch) metadata to a run that was stopped ({short_id}).
What should I do?
- Resume the run to continue logging to it:
https://docs.neptune.ai/how-to-guides/neptune-api/resume-run#how-to-resume-run
- Don't invoke `stop()` on a run that you want to access. If you want to stop monitoring only,
you can resume a run in read-only mode:
https://docs.neptune.ai/you-should-know/connection-modes#read-only
You may also want to check the following docs pages:
- https://docs.neptune.ai/api-reference/run#stop
- https://docs.neptune.ai/how-to-guides/neptune-api/resume-run#how-to-resume-run
- https://docs.neptune.ai/you-should-know/connection-modes
{correct}Need help?{end}-> https://docs.neptune.ai/getting-started/getting-help
"""
inputs = dict(list({'short_id': short_id}.items()) + list(STYLES.items()))
super().__init__(message.format(**inputs))


class NeptuneMissingProjectNameException(NeptuneException):
def __init__(self):
message = """
Expand Down
3 changes: 2 additions & 1 deletion neptune/new/integrations/python_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from neptune.new import Run
from neptune.new.internal.utils import verify_type
from neptune.new.logging import Logger
from neptune.new.run import RunState


class NeptuneHandler(logging.Handler):
Expand Down Expand Up @@ -64,7 +65,7 @@ def emit(self, record: logging.LogRecord) -> None:
if not hasattr(self._thread_local, "inside_write"):
self._thread_local.inside_write = False

if self._run._started and not self._thread_local.inside_write: # pylint: disable=protected-access
if self._run._state == RunState.STARTED and not self._thread_local.inside_write: # pylint: disable=protected-access
try:
self._thread_local.inside_write = True
message = self.format(record)
Expand Down
37 changes: 32 additions & 5 deletions neptune/new/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import uuid
from contextlib import AbstractContextManager
from datetime import datetime
from enum import Enum
from functools import wraps
from typing import Any, Dict, List, Optional, Union

import click
Expand All @@ -28,7 +30,7 @@
from neptune.new.attributes import attribute_type_to_atom
from neptune.new.attributes.attribute import Attribute
from neptune.new.attributes.namespace import NamespaceBuilder, Namespace as NamespaceAttr
from neptune.new.exceptions import MetadataInconsistency, NeptuneException
from neptune.new.exceptions import MetadataInconsistency, NeptuneException, InactiveRunException
from neptune.new.handler import Handler
from neptune.new.internal.backends.api_model import AttributeType
from neptune.new.internal.backends.neptune_backend import NeptuneBackend
Expand All @@ -49,6 +51,24 @@
from neptune.new.types.value import Value


class RunState(Enum):
CREATED = 'created'
STARTED = 'started'
STOPPING = 'stopping'
STOPPED = 'stopped'


def assure_run_not_stopped(fun):
@wraps(fun)
def inner_fun(self, *args, **kwargs):
# pylint: disable=protected-access
if self._state == RunState.STOPPED:
raise InactiveRunException(short_id=self._short_id)
return fun(self, *args, **kwargs)

return inner_fun


class Run(AbstractContextManager):
"""A Run in Neptune is a representation of all metadata that you log to Neptune.

Expand Down Expand Up @@ -110,7 +130,7 @@ def __init__(
self._bg_job = background_job
self._structure: RunStructure[Attribute, NamespaceAttr] = RunStructure(NamespaceBuilder(self))
self._lock = threading.RLock()
self._started = False
self._state = RunState.CREATED
self._workspace = workspace
self._project_name = project_name
self._short_id = short_id
Expand All @@ -122,15 +142,19 @@ def __exit__(self, exc_type, exc_val, exc_tb):
traceback.print_exception(exc_type, exc_val, exc_tb)
self.stop()

@assure_run_not_stopped
def __getitem__(self, path: str) -> 'Handler':
return Handler(self, path)

@assure_run_not_stopped
def __setitem__(self, key: str, value) -> None:
self.__getitem__(key).assign(value)

@assure_run_not_stopped
def __delitem__(self, path) -> None:
self.pop(path)

@assure_run_not_stopped
def assign(self, value, wait: bool = False) -> None:
"""Assign values to multiple fields from a dictionary.
You can use this method to quickly log all run's parameters.
Expand Down Expand Up @@ -165,6 +189,7 @@ def assign(self, value, wait: bool = False) -> None:
"""
self._get_root_handler().assign(value, wait)

@assure_run_not_stopped
def fetch(self) -> dict:
"""Fetch values of all non-File Atom fields as a dictionary.
The result will preserve the hierarchical structure of the run's metadata, but will contain only non-File Atom
Expand Down Expand Up @@ -198,7 +223,7 @@ def start(self):
atexit.register(self._shutdown_hook)
self._op_processor.start()
self._bg_job.start(self)
self._started = True
self._state = RunState.STARTED

def stop(self, seconds: Optional[Union[float, int]] = None) -> None:
"""Stops the tracked run and kills the synchronization thread.
Expand Down Expand Up @@ -258,9 +283,10 @@ def stop(self, seconds: Optional[Union[float, int]] = None) -> None:
https://docs.neptune.ai/api-reference/run#stop
"""
verify_type("seconds", seconds, (float, int, type(None)))
if not self._started:
if self._state != RunState.STARTED:
return
self._started = False

self._state = RunState.STOPPING
ts = time.time()
click.echo(f"Shutting down background jobs, please wait a moment...")
self._bg_job.stop()
Expand All @@ -269,6 +295,7 @@ def stop(self, seconds: Optional[Union[float, int]] = None) -> None:
with self._lock:
sec_left = None if seconds is None else seconds - (time.time() - ts)
self._op_processor.stop(sec_left)
self._state = RunState.STOPPED

def get_structure(self) -> Dict[str, Any]:
"""Returns a run's metadata structure in form of a dictionary.
Expand Down
15 changes: 14 additions & 1 deletion tests/neptune/new/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from neptune.new import ANONYMOUS, init
from neptune.new.envs import API_TOKEN_ENV_NAME, PROJECT_ENV_NAME
from neptune.new.exceptions import MetadataInconsistency
from neptune.new.exceptions import MetadataInconsistency, InactiveRunException
from neptune.new.types.atoms.float import Float
from neptune.new.types.atoms.string import String
from neptune.new.types.series import FloatSeries, StringSeries
Expand Down Expand Up @@ -113,3 +113,16 @@ def test_assign_false(self):
exp["params"] = {'predictor.cheat': False}

self.assertFalse(exp["params/predictor.cheat"].fetch())

def test_access_blocked_after_stop(self):
exp = init(mode="debug")
exp['attr1'] = 1

exp.stop()

with self.assertRaises(InactiveRunException):
exp['attr1'].fetch()
with self.assertRaises(InactiveRunException):
exp['attr2'] = 2
with self.assertRaises(InactiveRunException):
exp['series'].log(1)