diff --git a/CHANGELOG.md b/CHANGELOG.md index b5ccd8b39..e31617d56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## [UNRELEASED] neptune 1.1.0 + +### Features +- `Run`, `Model`, `ModelVersion` and `Project` could be created with constructor in addition to `init_*` functions ([#1246](https://github.com/neptune-ai/neptune-client/pull/1246)) + ## neptune 1.0.2 ### Fixes diff --git a/src/neptune/__init__.py b/src/neptune/__init__.py index 062f393fd..7c2b713a2 100644 --- a/src/neptune/__init__.py +++ b/src/neptune/__init__.py @@ -20,20 +20,27 @@ "init_project", "init_run", "Run", + "Model", + "ModelVersion", + "Project", "__version__", ] from neptune.common.patches import apply_patches from neptune.constants import ANONYMOUS_API_TOKEN -from neptune.internal.init import ( - init_model, - init_model_version, - init_project, - init_run, +from neptune.metadata_containers import ( + Model, + ModelVersion, + Project, + Run, ) -from neptune.metadata_containers import Run from neptune.version import __version__ # Apply patches of external libraries apply_patches() + +init_run = Run +init_model = Model +init_model_version = ModelVersion +init_project = Project diff --git a/src/neptune/internal/backends/neptune_backend_mock.py b/src/neptune/internal/backends/neptune_backend_mock.py index f08fa81c6..d3b39ff79 100644 --- a/src/neptune/internal/backends/neptune_backend_mock.py +++ b/src/neptune/internal/backends/neptune_backend_mock.py @@ -111,7 +111,6 @@ from neptune.internal.utils import base64_decode from neptune.internal.utils.generic_attribute_mapper import NoValue from neptune.internal.utils.paths import path_to_str -from neptune.metadata_containers import Model from neptune.types import ( Boolean, Integer, @@ -268,7 +267,7 @@ def get_metadata_container( elif expected_container_type == ContainerType.MODEL: return ApiExperiment( id=UniqueId(str(uuid.uuid4())), - type=Model.container_type, + type=ContainerType.MODEL, sys_id=SysId(container_id.rsplit("/", 1)[-1]), workspace=self.WORKSPACE_NAME, project_name=self.PROJECT_NAME, diff --git a/src/neptune/internal/init/__init__.py b/src/neptune/internal/init/__init__.py index 6556a9d1a..b5e585d90 100644 --- a/src/neptune/internal/init/__init__.py +++ b/src/neptune/internal/init/__init__.py @@ -13,20 +13,3 @@ # See the License for the specific language governing permissions and # limitations under the License. # -__all__ = [ - "init_model", - "init_model_version", - "init_project", - "init_run", - "Mode", - "RunMode", -] - - -from neptune.internal.init.model import init_model -from neptune.internal.init.model_version import init_model_version -from neptune.internal.init.project import init_project -from neptune.internal.init.run import init_run -from neptune.types.mode import Mode - -RunMode = Mode diff --git a/src/neptune/internal/init/model.py b/src/neptune/internal/init/model.py deleted file mode 100644 index 325fc4eac..000000000 --- a/src/neptune/internal/init/model.py +++ /dev/null @@ -1,146 +0,0 @@ -# -# Copyright (c) 2022, Neptune Labs Sp. z o.o. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -__all__ = ["init_model"] - -import os -import threading -from typing import Optional - -from neptune.attributes import constants as attr_consts -from neptune.common.exceptions import NeptuneException -from neptune.envs import CONNECTION_MODE -from neptune.exceptions import ( - NeedExistingModelForReadOnlyMode, - NeptuneMissingRequiredInitParameter, - NeptuneModelKeyAlreadyExistsError, - NeptuneObjectCreationConflict, -) -from neptune.internal import id_formats -from neptune.internal.backends.factory import get_backend -from neptune.internal.backends.project_name_lookup import project_name_lookup -from neptune.internal.backgroud_job_list import BackgroundJobList -from neptune.internal.id_formats import QualifiedName -from neptune.internal.init.parameters import ( - DEFAULT_FLUSH_PERIOD, - DEFAULT_NAME, - OFFLINE_PROJECT_QUALIFIED_NAME, -) -from neptune.internal.operation_processors.factory import get_operation_processor -from neptune.internal.utils import verify_type -from neptune.internal.utils.ping_background_job import PingBackgroundJob -from neptune.metadata_containers import Model -from neptune.types.mode import Mode - - -def init_model( - with_id: Optional[str] = None, - *, - name: Optional[str] = None, - key: Optional[str] = None, - project: Optional[str] = None, - api_token: Optional[str] = None, - mode: Optional[str] = None, - flush_period: float = DEFAULT_FLUSH_PERIOD, - proxies: Optional[dict] = None, -) -> Model: - verify_type("with_id", with_id, (str, type(None))) - verify_type("name", name, (str, type(None))) - verify_type("key", key, (str, type(None))) - verify_type("project", project, (str, type(None))) - verify_type("api_token", api_token, (str, type(None))) - verify_type("mode", mode, (str, type(None))) - verify_type("flush_period", flush_period, (int, float)) - verify_type("proxies", proxies, (dict, type(None))) - - # make mode proper Enum instead of string - mode = Mode(mode or os.getenv(CONNECTION_MODE) or Mode.ASYNC.value) - - if mode == Mode.OFFLINE: - raise NeptuneException("Model can't be initialized in OFFLINE mode") - - name = DEFAULT_NAME if with_id is None and name is None else name - - backend = get_backend(mode=mode, api_token=api_token, proxies=proxies) - - if mode == Mode.OFFLINE or mode == Mode.DEBUG: - project = OFFLINE_PROJECT_QUALIFIED_NAME - - project = id_formats.conform_optional(project, QualifiedName) - project_obj = project_name_lookup(backend=backend, name=project) - project = f"{project_obj.workspace}/{project_obj.name}" - - if with_id is not None: - # with_id (resume existing model) has priority over key (creating a new model) - # additional creation parameters (e.g. name) are simply ignored in this scenario - model_id = QualifiedName(project + "/" + with_id) - api_model = backend.get_metadata_container(container_id=model_id, expected_container_type=Model.container_type) - elif key is not None: - if mode == Mode.READ_ONLY: - raise NeedExistingModelForReadOnlyMode() - - try: - api_model = backend.create_model(project_id=project_obj.id, key=key) - except NeptuneObjectCreationConflict as e: - base_url = backend.get_display_address() - raise NeptuneModelKeyAlreadyExistsError( - model_key=key, - models_tab_url=f"{base_url}/{project_obj.workspace}/{project_obj.name}/models", - ) from e - - else: - raise NeptuneMissingRequiredInitParameter( - parameter_name="key", - called_function="init_model", - ) - - model_lock = threading.RLock() - - operation_processor = get_operation_processor( - mode=mode, - container_id=api_model.id, - container_type=Model.container_type, - backend=backend, - lock=model_lock, - flush_period=flush_period, - ) - - background_jobs = [] - if mode != Mode.READ_ONLY: - background_jobs.append(PingBackgroundJob()) - - _model = Model( - id_=api_model.id, - mode=mode, - backend=backend, - op_processor=operation_processor, - background_job=BackgroundJobList(background_jobs), - lock=model_lock, - workspace=api_model.workspace, - project_name=api_model.project_name, - sys_id=api_model.sys_id, - project_id=project_obj.id, - ) - - if mode != Mode.OFFLINE: - _model.sync(wait=False) - - if mode != Mode.READ_ONLY: - if name is not None: - _model[attr_consts.SYSTEM_NAME_ATTRIBUTE_PATH] = name - - _model._startup(debug_mode=mode == Mode.DEBUG) - - return _model diff --git a/src/neptune/internal/init/model_version.py b/src/neptune/internal/init/model_version.py deleted file mode 100644 index 619d7a3f9..000000000 --- a/src/neptune/internal/init/model_version.py +++ /dev/null @@ -1,141 +0,0 @@ -# -# Copyright (c) 2022, Neptune Labs Sp. z o.o. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -__all__ = ["init_model_version"] - -import os -import threading -from typing import Optional - -from neptune.attributes import constants as attr_consts -from neptune.common.exceptions import NeptuneException -from neptune.envs import CONNECTION_MODE -from neptune.exceptions import ( - NeedExistingModelVersionForReadOnlyMode, - NeptuneMissingRequiredInitParameter, -) -from neptune.internal import id_formats -from neptune.internal.backends.factory import get_backend -from neptune.internal.backends.project_name_lookup import project_name_lookup -from neptune.internal.backgroud_job_list import BackgroundJobList -from neptune.internal.id_formats import QualifiedName -from neptune.internal.init.parameters import ( - DEFAULT_FLUSH_PERIOD, - DEFAULT_NAME, - OFFLINE_PROJECT_QUALIFIED_NAME, -) -from neptune.internal.operation_processors.factory import get_operation_processor -from neptune.internal.utils import verify_type -from neptune.internal.utils.ping_background_job import PingBackgroundJob -from neptune.metadata_containers import ( - Model, - ModelVersion, -) -from neptune.types.mode import Mode - - -def init_model_version( - with_id: Optional[str] = None, - *, - name: Optional[str] = None, - model: Optional[str] = None, - project: Optional[str] = None, - api_token: Optional[str] = None, - mode: Optional[str] = None, - flush_period: float = DEFAULT_FLUSH_PERIOD, - proxies: Optional[dict] = None, -) -> ModelVersion: - verify_type("with_id", with_id, (str, type(None))) - verify_type("name", name, (str, type(None))) - verify_type("model", model, (str, type(None))) - verify_type("project", project, (str, type(None))) - verify_type("api_token", api_token, (str, type(None))) - verify_type("mode", mode, (str, type(None))) - verify_type("flush_period", flush_period, (int, float)) - verify_type("proxies", proxies, (dict, type(None))) - - # make mode proper Enum instead of string - mode = Mode(mode or os.getenv(CONNECTION_MODE) or Mode.ASYNC.value) - - if mode == Mode.OFFLINE: - raise NeptuneException("Model can't be initialized in OFFLINE mode") - - name = DEFAULT_NAME if model is None and name is None else name - - backend = get_backend(mode=mode, api_token=api_token, proxies=proxies) - - if mode == Mode.OFFLINE or mode == Mode.DEBUG: - project = OFFLINE_PROJECT_QUALIFIED_NAME - - project = id_formats.conform_optional(project, QualifiedName) - project_obj = project_name_lookup(backend, project) - project = f"{project_obj.workspace}/{project_obj.name}" - - if with_id is not None: - # with_id (resume existing model_version) has priority over model (creating a new model_version) - version_id = QualifiedName(project + "/" + with_id) - api_model_version = backend.get_metadata_container( - container_id=version_id, expected_container_type=ModelVersion.container_type - ) - elif model is not None: - if mode == Mode.READ_ONLY: - raise NeedExistingModelVersionForReadOnlyMode() - - model_id = QualifiedName(project + "/" + model) - api_model = backend.get_metadata_container(container_id=model_id, expected_container_type=Model.container_type) - api_model_version = backend.create_model_version(project_id=project_obj.id, model_id=api_model.id) - else: - raise NeptuneMissingRequiredInitParameter( - parameter_name="model", - called_function="init_model_version", - ) - - model_lock = threading.RLock() - - operation_processor = get_operation_processor( - mode=mode, - container_id=api_model_version.id, - container_type=ModelVersion.container_type, - backend=backend, - lock=model_lock, - flush_period=flush_period, - ) - - background_jobs = [] - if mode != Mode.READ_ONLY: - background_jobs.append(PingBackgroundJob()) - - _model_version = ModelVersion( - id_=api_model_version.id, - mode=mode, - backend=backend, - op_processor=operation_processor, - background_job=BackgroundJobList(background_jobs), - lock=model_lock, - workspace=api_model_version.workspace, - project_name=api_model_version.project_name, - sys_id=api_model_version.sys_id, - project_id=project_obj.id, - ) - if mode != Mode.OFFLINE: - _model_version.sync(wait=False) - - if mode != Mode.READ_ONLY: - if name is not None: - _model_version[attr_consts.SYSTEM_NAME_ATTRIBUTE_PATH] = name - - _model_version._startup(debug_mode=mode == Mode.DEBUG) - - return _model_version diff --git a/src/neptune/internal/init/project.py b/src/neptune/internal/init/project.py deleted file mode 100644 index b50fe024f..000000000 --- a/src/neptune/internal/init/project.py +++ /dev/null @@ -1,89 +0,0 @@ -# -# Copyright (c) 2022, Neptune Labs Sp. z o.o. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -__all__ = ["init_project"] - -import os -import threading -from typing import Optional - -from neptune.common.exceptions import NeptuneException -from neptune.envs import CONNECTION_MODE -from neptune.internal import id_formats -from neptune.internal.backends.factory import get_backend -from neptune.internal.backends.project_name_lookup import project_name_lookup -from neptune.internal.backgroud_job_list import BackgroundJobList -from neptune.internal.id_formats import QualifiedName -from neptune.internal.init.parameters import DEFAULT_FLUSH_PERIOD -from neptune.internal.operation_processors.factory import get_operation_processor -from neptune.internal.utils import verify_type -from neptune.metadata_containers import Project -from neptune.types.mode import Mode - - -def init_project( - project: Optional[str] = None, - *, - api_token: Optional[str] = None, - mode: Optional[str] = None, - flush_period: float = DEFAULT_FLUSH_PERIOD, - proxies: Optional[dict] = None, -) -> Project: - verify_type("project", project, (str, type(None))) - verify_type("api_token", api_token, (str, type(None))) - verify_type("mode", mode, (str, type(None))) - verify_type("flush_period", flush_period, (int, float)) - verify_type("proxies", proxies, (dict, type(None))) - - # make mode proper Enum instead of string - mode = Mode(mode or os.getenv(CONNECTION_MODE) or Mode.ASYNC.value) - - if mode == Mode.OFFLINE: - raise NeptuneException("Project can't be initialized in OFFLINE mode") - - project = id_formats.conform_optional(project, QualifiedName) - backend = get_backend(mode=mode, api_token=api_token, proxies=proxies) - project_obj = project_name_lookup(backend=backend, name=project) - - project_lock = threading.RLock() - - operation_processor = get_operation_processor( - mode=mode, - container_id=project_obj.id, - container_type=Project.container_type, - backend=backend, - lock=project_lock, - flush_period=flush_period, - ) - - background_jobs = [] - - npt_project = Project( - id_=project_obj.id, - mode=mode, - backend=backend, - op_processor=operation_processor, - background_job=BackgroundJobList(background_jobs), - lock=project_lock, - workspace=project_obj.workspace, - project_name=project_obj.name, - sys_id=project_obj.sys_id, - ) - - if mode != Mode.OFFLINE: - npt_project.sync(wait=False) - - npt_project._startup(debug_mode=mode == Mode.DEBUG) - return npt_project diff --git a/src/neptune/internal/init/run.py b/src/neptune/internal/init/run.py deleted file mode 100644 index c98c29810..000000000 --- a/src/neptune/internal/init/run.py +++ /dev/null @@ -1,427 +0,0 @@ -# -# Copyright (c) 2022, Neptune Labs Sp. z o.o. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -__all__ = ["init_run"] - -import os -import threading -import typing -from platform import node as get_hostname -from typing import ( - List, - Optional, - Union, -) - -from neptune.attributes import constants as attr_consts -from neptune.common.warnings import ( - NeptuneWarning, - warn_once, -) -from neptune.envs import ( - CONNECTION_MODE, - CUSTOM_RUN_ID_ENV_NAME, - MONITORING_NAMESPACE, - NEPTUNE_NOTEBOOK_ID, - NEPTUNE_NOTEBOOK_PATH, -) -from neptune.exceptions import ( - NeedExistingRunForReadOnlyMode, - NeptunePossibleLegacyUsageException, - NeptuneRunResumeAndCustomIdCollision, -) -from neptune.internal import id_formats -from neptune.internal.backends.factory import get_backend -from neptune.internal.backends.neptune_backend import NeptuneBackend -from neptune.internal.backends.project_name_lookup import project_name_lookup -from neptune.internal.backgroud_job_list import BackgroundJobList -from neptune.internal.hardware.hardware_metric_reporting_job import HardwareMetricReportingJob -from neptune.internal.id_formats import QualifiedName -from neptune.internal.init.parameters import ( - DEFAULT_FLUSH_PERIOD, - DEFAULT_NAME, - OFFLINE_PROJECT_QUALIFIED_NAME, -) -from neptune.internal.notebooks.notebooks import create_checkpoint -from neptune.internal.operation_processors.factory import get_operation_processor -from neptune.internal.streams.std_capture_background_job import ( - StderrCaptureBackgroundJob, - StdoutCaptureBackgroundJob, -) -from neptune.internal.utils import ( - verify_collection_type, - verify_type, -) -from neptune.internal.utils.git import ( - discover_git_repo_location, - get_git_info, -) -from neptune.internal.utils.hashing import generate_hash -from neptune.internal.utils.limits import custom_run_id_exceeds_length -from neptune.internal.utils.ping_background_job import PingBackgroundJob -from neptune.internal.utils.runningmode import ( - in_interactive, - in_notebook, -) -from neptune.internal.utils.source_code import upload_source_code -from neptune.internal.utils.traceback_job import TracebackJob -from neptune.internal.websockets.websocket_signals_background_job import WebsocketSignalsBackgroundJob -from neptune.metadata_containers import Run -from neptune.types.mode import Mode -from neptune.types.series.string_series import StringSeries - -LEGACY_KWARGS = ("project_qualified_name", "backend") - - -def _check_for_extra_kwargs(caller_name, kwargs: dict): - for name in LEGACY_KWARGS: - if name in kwargs: - raise NeptunePossibleLegacyUsageException() - if kwargs: - first_key = next(iter(kwargs.keys())) - raise TypeError(f"{caller_name}() got an unexpected keyword argument '{first_key}'") - - -def init_run( - with_id: Optional[str] = None, - *, - project: Optional[str] = None, - api_token: Optional[str] = None, - custom_run_id: Optional[str] = None, - mode: Optional[str] = None, - name: Optional[str] = None, - description: Optional[str] = None, - tags: Optional[Union[List[str], str]] = None, - source_files: Optional[Union[List[str], str]] = None, - capture_stdout: Optional[bool] = None, - capture_stderr: Optional[bool] = None, - capture_hardware_metrics: Optional[bool] = None, - fail_on_exception: bool = True, - monitoring_namespace: Optional[str] = None, - flush_period: float = DEFAULT_FLUSH_PERIOD, - proxies: Optional[dict] = None, - capture_traceback: bool = True, - **kwargs, -) -> Run: - """Starts a new tracked run and adds it to the top of the runs table. - - If you provide the ID of an existing run, that run is resumed and no new run is created. - - Args: - project: Name of the project where the run should go, in the form "workspace-name/project_name". - If None (default), the value of the NEPTUNE_PROJECT environment variable is used. - api_token: User's API token. Defaults to None. - If None (default), the value of the NEPTUNE_API_TOKEN environment variable is used. - Note: To keep your API token secure, save it to the NEPTUNE_API_TOKEN environment variable rather than - placing it in plain text in the source code. - with_id: If you want to resume a run, the identifier of the existing run. - For example, 'SAN-1'. A run with such an ID must exist. - If None (default) is passed, starts a new tracked run. - custom_run_id: A unique identifier to be used when running Neptune in pipelines. - Make sure to use the same identifier throughout the whole pipeline execution. - mode: Connection mode in which the tracking will work. - If None (default), the value of the NEPTUNE_MODE environment variable is used. - If no value was set for the environment variable, 'async' is used by default. - Possible values are 'async', 'sync', 'offline', 'read-only', and 'debug'. - name: Editable name of the run. Defaults to 'Untitled'. - The name is displayed in the run details and as a column in the runs table. - description: Editable description of the run. Defaults to `''`. - The description is displayed in the run details and can be added to the runs table as a column. - tags: Tags of the run as a list of strings. Defaults to `[]`. - Tags are displayed in the run details and in the runs table as a column. - You can edit the tags after the run is created, either through the app or the API. - source_files: List of source files to be uploaded. - Uploaded source files are displayed in the 'Source code' tab of the run view. - To not upload anything, pass an empty list (`[]`). - Unix style pathname pattern expansion is supported. For example, you can pass `*.py` to upload - all Python files from the current directory. - If None is passed, the Python file from which the run was created will be uploaded. - capture_stdout: Whether to log the stdout of the run. Defaults to True. - The data is logged under the monitoring namespace (see the 'monitoring_namespace' parameter). - capture_stderr: Whether to log the stderr of the run. Defaults to True. - The data is logged under the monitoring namespace (see the 'monitoring_namespace' parameter). - capture_hardware_metrics: Whether to send hardware monitoring logs (CPU, GPU, and memory utilization). - Defaults to True. - The data is logged under the monitoring namespace (see the 'monitoring_namespace' parameter). - fail_on_exception: Whether to register an uncaught exception handler to this process and, - in case of an exception, set the 'sys/failed' field of the run to True. - An exception is always logged. - monitoring_namespace: Namespace inside which all hardware monitoring logs are stored. - Defaults to 'monitoring/', where the hash is generated based on environment information, - to ensure that it's unique for each process. - flush_period: In the asynchronous (default) connection mode, how often disk flushing is triggered. - Defaults to 5 (every 5 seconds). - proxies: Argument passed to HTTP calls made via the Requests library, as dictionary of strings. - For more information, see the 'Proxies' section in the Requests documentation. - capture_traceback: Whether to log the traceback of the run in case of an exception. - Defaults to True. - The tracked metadata is stored in the '/traceback' namespace (see the - 'monitoring_namespace' parameter). - - Returns: - Run object that is used to manage the tracked run and log metadata to it. - - Examples: - - Creating a new run: - - >>> import neptune - - >>> # Minimal invoke - ... # (creates a run in the project specified by the NEPTUNE_PROJECT environment variable) - ... run = neptune.init_run() - - >>> # Create a tracked run with a name and description, and no sources files uploaded - >>> run = neptune.init_run( - ... name="neural-net-mnist", - ... description="neural net trained on MNIST", - ... source_files=[], - ... ) - - >>> # Log all .py files from all subdirectories, excluding hidden files - ... run = neptune.init_run(source_files="**/*.py") - - >>> # Log all files and directories in the current working directory, excluding hidden files - ... run = neptune.init_run(source_files="*") - - >>> # Larger example - ... run = neptune.init_run( - ... project="ml-team/classification", - ... name="first-pytorch-ever", - ... description="Longer description of the run goes here", - ... tags=["tags", "go-here", "as-list-of-strings"], - ... source_files=["training_with_pytorch.py", "net.py"], - ... monitoring_namespace="system_metrics", - ... capture_stderr=False, - ... ) - - Connecting to an existing run: - - >>> # Resume logging to an existing run with the ID "SAN-3" - ... run = neptune.init_run(with_id="SAN-3") - ... run["parameters/lr"] = 0.1 # modify or add metadata - - >>> # Initialize an existing run in read-only mode (logging new data is not possible, only fetching) - ... run = neptune.init_run(with_id="SAN-4", mode="read-only") - ... learning_rate = run["parameters/lr"].fetch() - - For more, see the API reference: - https://docs.neptune.ai/api/neptune#init_run - """ - _check_for_extra_kwargs(init_run.__name__, kwargs) - - verify_type("project", project, (str, type(None))) - verify_type("api_token", api_token, (str, type(None))) - verify_type("with_id", with_id, (str, type(None))) - verify_type("custom_run_id", custom_run_id, (str, type(None))) - verify_type("mode", mode, (str, type(None))) - verify_type("name", name, (str, type(None))) - verify_type("description", description, (str, type(None))) - verify_type("capture_stdout", capture_stdout, (bool, type(None))) - verify_type("capture_stderr", capture_stderr, (bool, type(None))) - verify_type("capture_hardware_metrics", capture_hardware_metrics, (bool, type(None))) - verify_type("monitoring_namespace", monitoring_namespace, (str, type(None))) - verify_type("flush_period", flush_period, (int, float)) - verify_type("proxies", proxies, (dict, type(None))) - verify_type("capture_traceback", capture_traceback, bool) - if tags is not None: - if isinstance(tags, str): - tags = [tags] - else: - verify_collection_type("tags", tags, str) - if source_files is not None: - if isinstance(source_files, str): - source_files = [source_files] - else: - verify_collection_type("source_files", source_files, str) - - # for backward compatibility imports - mode = Mode(mode or os.getenv(CONNECTION_MODE) or Mode.ASYNC.value) - name = DEFAULT_NAME if with_id is None and name is None else name - description = "" if with_id is None and description is None else description - custom_run_id = custom_run_id or os.getenv(CUSTOM_RUN_ID_ENV_NAME) - - hostname = get_hostname() - pid = os.getpid() - tid = threading.get_ident() - - monitoring_namespace = ( - monitoring_namespace or os.getenv(MONITORING_NAMESPACE) or generate_monitoring_namespace(hostname, pid, tid) - ) - - if capture_stdout is None: - capture_stdout = capture_only_if_non_interactive(mode=mode) - - if capture_stderr is None: - capture_stderr = capture_only_if_non_interactive(mode=mode) - - if capture_hardware_metrics is None: - capture_hardware_metrics = capture_only_if_non_interactive(mode=mode) - - if with_id and custom_run_id: - raise NeptuneRunResumeAndCustomIdCollision() - - backend = get_backend(mode=mode, api_token=api_token, proxies=proxies) - - if mode == Mode.OFFLINE or mode == Mode.DEBUG: - project = OFFLINE_PROJECT_QUALIFIED_NAME - - project = id_formats.conform_optional(project, QualifiedName) - project_obj = project_name_lookup(backend, project) - project = f"{project_obj.workspace}/{project_obj.name}" - - if with_id: - api_run = backend.get_metadata_container( - container_id=QualifiedName(project + "/" + with_id), - expected_container_type=Run.container_type, - ) - else: - if mode == Mode.READ_ONLY: - raise NeedExistingRunForReadOnlyMode() - git_ref = get_git_info(discover_git_repo_location()) - if custom_run_id_exceeds_length(custom_run_id): - custom_run_id = None - - notebook_id, checkpoint_id = _create_notebook_checkpoint(backend) - - api_run = backend.create_run( - project_id=project_obj.id, - git_ref=git_ref, - custom_run_id=custom_run_id, - notebook_id=notebook_id, - checkpoint_id=checkpoint_id, - ) - - run_lock = threading.RLock() - - operation_processor = get_operation_processor( - mode=mode, - container_id=api_run.id, - container_type=Run.container_type, - backend=backend, - lock=run_lock, - flush_period=flush_period, - ) - - stdout_path = "{}/stdout".format(monitoring_namespace) - stderr_path = "{}/stderr".format(monitoring_namespace) - traceback_path = "{}/traceback".format(monitoring_namespace) - - background_jobs = [] - if mode != Mode.READ_ONLY: - if capture_stdout: - background_jobs.append(StdoutCaptureBackgroundJob(attribute_name=stdout_path)) - - if capture_stderr: - background_jobs.append(StderrCaptureBackgroundJob(attribute_name=stderr_path)) - - if capture_hardware_metrics: - background_jobs.append(HardwareMetricReportingJob(attribute_namespace=monitoring_namespace)) - - if capture_traceback: - background_jobs.append(TracebackJob(traceback_path, fail_on_exception)) - - websockets_factory = backend.websockets_factory(project_obj.id, api_run.id) - if websockets_factory: - background_jobs.append(WebsocketSignalsBackgroundJob(websockets_factory)) - - background_jobs.append(PingBackgroundJob()) - - _run = Run( - id_=api_run.id, - mode=mode, - backend=backend, - op_processor=operation_processor, - background_job=BackgroundJobList(background_jobs), - lock=run_lock, - workspace=api_run.workspace, - project_name=api_run.project_name, - sys_id=api_run.sys_id, - project_id=project_obj.id, - monitoring_namespace=monitoring_namespace, - ) - if mode != Mode.OFFLINE: - _run.sync(wait=False) - - if mode != Mode.READ_ONLY: - if name is not None: - _run[attr_consts.SYSTEM_NAME_ATTRIBUTE_PATH] = name - - if description is not None: - _run[attr_consts.SYSTEM_DESCRIPTION_ATTRIBUTE_PATH] = description - - if hostname is not None: - _run[f"{monitoring_namespace}/hostname"] = hostname - if with_id is None: - _run[attr_consts.SYSTEM_HOSTNAME_ATTRIBUTE_PATH] = hostname - - if pid is not None: - _run[f"{monitoring_namespace}/pid"] = str(pid) - - if tid is not None: - _run[f"{monitoring_namespace}/tid"] = str(tid) - - if tags is not None: - _run[attr_consts.SYSTEM_TAGS_ATTRIBUTE_PATH].add(tags) - - if with_id is None: - _run[attr_consts.SYSTEM_FAILED_ATTRIBUTE_PATH] = False - - if capture_stdout and not _run.exists(stdout_path): - _run.define(stdout_path, StringSeries([])) - if capture_stderr and not _run.exists(stderr_path): - _run.define(stderr_path, StringSeries([])) - - if with_id is None or source_files is not None: - # upload default sources ONLY if creating a new run - upload_source_code(source_files=source_files, run=_run) - - _run._startup(debug_mode=mode == Mode.DEBUG) - - return _run - - -def _create_notebook_checkpoint( - backend: NeptuneBackend, -) -> typing.Tuple[typing.Optional[str], typing.Optional[str]]: - notebook_id = None - if os.getenv(NEPTUNE_NOTEBOOK_ID, None) is not None: - notebook_id = os.environ[NEPTUNE_NOTEBOOK_ID] - - notebook_path = None - if os.getenv(NEPTUNE_NOTEBOOK_PATH, None) is not None: - notebook_path = os.environ[NEPTUNE_NOTEBOOK_PATH] - - checkpoint_id = None - if notebook_id is not None and notebook_path is not None: - checkpoint_id = create_checkpoint(backend=backend, notebook_id=notebook_id, notebook_path=notebook_path) - return notebook_id, checkpoint_id - - -def capture_only_if_non_interactive(mode) -> bool: - if in_interactive() or in_notebook(): - if mode in {Mode.OFFLINE, Mode.SYNC, Mode.ASYNC}: - warn_once( - "To avoid unintended consumption of logging hours during interactive sessions, the" - " following monitoring options are disabled unless set to 'True' when initializing" - " the run: 'capture_stdout', 'capture_stderr', and 'capture_hardware_metrics'.", - exception=NeptuneWarning, - ) - return False - return True - - -def generate_monitoring_namespace(*descriptors): - return f"monitoring/{generate_hash(*descriptors, length=8)}" diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index d0d5c577b..31f981be1 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -43,17 +43,27 @@ NeptunePossibleLegacyUsageException, ) from neptune.handler import Handler -from neptune.internal.backends.api_model import AttributeType +from neptune.internal.backends.api_model import ( + ApiExperiment, + AttributeType, + Project, +) +from neptune.internal.backends.factory import get_backend from neptune.internal.backends.neptune_backend import NeptuneBackend from neptune.internal.backends.nql import NQLQuery -from neptune.internal.background_job import BackgroundJob +from neptune.internal.backends.project_name_lookup import project_name_lookup +from neptune.internal.backgroud_job_list import BackgroundJobList from neptune.internal.container_structure import ContainerStructure from neptune.internal.container_type import ContainerType from neptune.internal.id_formats import ( + QualifiedName, SysId, UniqueId, + conform_optional, ) +from neptune.internal.init.parameters import DEFAULT_FLUSH_PERIOD from neptune.internal.operation import DeleteAttribute +from neptune.internal.operation_processors.factory import get_operation_processor from neptune.internal.operation_processors.operation_processor import OperationProcessor from neptune.internal.state import ContainerState from neptune.internal.utils import verify_type @@ -83,29 +93,69 @@ class MetadataContainer(AbstractContextManager): def __init__( self, *, - id_: UniqueId, - mode: Mode, - backend: NeptuneBackend, - op_processor: OperationProcessor, - background_job: BackgroundJob, - lock: threading.RLock, - project_id: UniqueId, - project_name: str, - workspace: str, - sys_id: SysId, + project: Optional[str] = None, + api_token: Optional[str] = None, + mode: Mode = Mode.ASYNC, + flush_period: float = DEFAULT_FLUSH_PERIOD, + proxies: Optional[dict] = None, ): - self._id = id_ - self._mode = mode - self._project_id = project_id - self._project_name = project_name - self._workspace = workspace - self._backend = backend - self._op_processor = op_processor - self._bg_job = background_job + verify_type("project", project, (str, type(None))) + verify_type("api_token", api_token, (str, type(None))) + verify_type("mode", mode, Mode) + verify_type("flush_period", flush_period, (int, float)) + verify_type("proxies", proxies, (dict, type(None))) + + self._mode: Mode = mode + self._lock: threading.RLock = threading.RLock() + self._state: ContainerState = ContainerState.CREATED + + self._backend: NeptuneBackend = get_backend(mode=mode, api_token=api_token, proxies=proxies) + + self._project_qualified_name: Optional[str] = conform_optional(project, QualifiedName) + self._project_api_object: Project = project_name_lookup( + backend=self._backend, name=self._project_qualified_name + ) + self._project_id: UniqueId = self._project_api_object.id + + self._api_object: ApiExperiment = self._get_or_create_api_object() + self._id: UniqueId = self._api_object.id + self._sys_id: SysId = self._api_object.sys_id + self._workspace: str = self._api_object.workspace + self._project_name: str = self._api_object.project_name + + self._op_processor: OperationProcessor = get_operation_processor( + mode=mode, + container_id=self._id, + container_type=self.container_type, + backend=self._backend, + lock=self._lock, + flush_period=flush_period, + ) + self._bg_job: BackgroundJobList = self._prepare_background_jobs_if_non_read_only() self._structure: ContainerStructure[Attribute, NamespaceAttr] = ContainerStructure(NamespaceBuilder(self)) - self._lock = lock - self._state = ContainerState.CREATED - self._sys_id = sys_id + + if self._mode != Mode.OFFLINE: + self.sync(wait=False) + + if self._mode != Mode.READ_ONLY: + self._write_initial_attributes() + + self._startup(debug_mode=mode == Mode.DEBUG) + + def _prepare_background_jobs_if_non_read_only(self) -> BackgroundJobList: + if self._mode != Mode.READ_ONLY: + return self._prepare_background_jobs() + return BackgroundJobList([]) + + @abc.abstractmethod + def _get_or_create_api_object(self) -> ApiExperiment: + raise NotImplementedError + + def _prepare_background_jobs(self) -> BackgroundJobList: + return BackgroundJobList([]) + + def _write_initial_attributes(self): + pass def __exit__(self, exc_type, exc_val, exc_tb): if exc_tb is not None: diff --git a/src/neptune/metadata_containers/model.py b/src/neptune/metadata_containers/model.py index 90c51a9cc..c4870725d 100644 --- a/src/neptune/metadata_containers/model.py +++ b/src/neptune/metadata_containers/model.py @@ -15,12 +15,23 @@ # __all__ = ["Model"] +import os from typing import ( Iterable, Optional, ) -from neptune.exceptions import InactiveModelException +from neptune.attributes.constants import SYSTEM_NAME_ATTRIBUTE_PATH +from neptune.common.exceptions import NeptuneException +from neptune.envs import CONNECTION_MODE +from neptune.exceptions import ( + InactiveModelException, + NeedExistingModelForReadOnlyMode, + NeptuneMissingRequiredInitParameter, + NeptuneModelKeyAlreadyExistsError, + NeptuneObjectCreationConflict, +) +from neptune.internal.backends.api_model import ApiExperiment from neptune.internal.backends.nql import ( NQLAggregator, NQLAttributeOperator, @@ -28,10 +39,20 @@ NQLQueryAggregate, NQLQueryAttribute, ) +from neptune.internal.backgroud_job_list import BackgroundJobList from neptune.internal.container_type import ContainerType +from neptune.internal.id_formats import QualifiedName +from neptune.internal.init.parameters import ( + DEFAULT_FLUSH_PERIOD, + DEFAULT_NAME, + OFFLINE_PROJECT_QUALIFIED_NAME, +) from neptune.internal.state import ContainerState +from neptune.internal.utils import verify_type +from neptune.internal.utils.ping_background_job import PingBackgroundJob from neptune.metadata_containers import MetadataContainer from neptune.metadata_containers.metadata_containers_table import Table +from neptune.types.mode import Mode class Model(MetadataContainer): @@ -45,6 +66,76 @@ class Model(MetadataContainer): container_type = ContainerType.MODEL + def __init__( + self, + with_id: Optional[str] = None, + *, + name: Optional[str] = None, + key: Optional[str] = None, + project: Optional[str] = None, + api_token: Optional[str] = None, + mode: Optional[str] = None, + flush_period: float = DEFAULT_FLUSH_PERIOD, + proxies: Optional[dict] = None, + ): + verify_type("with_id", with_id, (str, type(None))) + verify_type("name", name, (str, type(None))) + verify_type("key", key, (str, type(None))) + verify_type("project", project, (str, type(None))) + verify_type("mode", mode, (str, type(None))) + + self._key: Optional[str] = key + self._with_id: Optional[str] = with_id + self._name: Optional[str] = DEFAULT_NAME if with_id is None and name is None else name + + # make mode proper Enum instead of string + mode = Mode(mode or os.getenv(CONNECTION_MODE) or Mode.ASYNC.value) + + if mode == Mode.OFFLINE: + raise NeptuneException("Model can't be initialized in OFFLINE mode") + + if mode == Mode.DEBUG: + project = OFFLINE_PROJECT_QUALIFIED_NAME + + super().__init__(project=project, api_token=api_token, mode=mode, flush_period=flush_period, proxies=proxies) + + def _get_or_create_api_object(self) -> ApiExperiment: + project_workspace = self._project_api_object.workspace + project_name = self._project_api_object.name + project_qualified_name = f"{project_workspace}/{project_name}" + + if self._with_id is not None: + # with_id (resume existing model) has priority over key (creating a new model) + # additional creation parameters (e.g. name) are simply ignored in this scenario + return self._backend.get_metadata_container( + container_id=QualifiedName(project_qualified_name + "/" + self._with_id), + expected_container_type=self.container_type, + ) + elif self._key is not None: + if self._mode == Mode.READ_ONLY: + raise NeedExistingModelForReadOnlyMode() + + try: + return self._backend.create_model(project_id=self._project_api_object.id, key=self._key) + except NeptuneObjectCreationConflict as e: + base_url = self._backend.get_display_address() + raise NeptuneModelKeyAlreadyExistsError( + model_key=self._key, + models_tab_url=f"{base_url}/{project_workspace}/{project_name}/models", + ) from e + else: + raise NeptuneMissingRequiredInitParameter( + parameter_name="key", + called_function="init_model", + ) + + def _prepare_background_jobs(self) -> BackgroundJobList: + return BackgroundJobList([PingBackgroundJob()]) + + def _write_initial_attributes(self): + if self._name is not None: + self[SYSTEM_NAME_ATTRIBUTE_PATH] = self._name + def _raise_if_stopped(self): if self._state == ContainerState.STOPPED: raise InactiveModelException(label=self._sys_id) diff --git a/src/neptune/metadata_containers/model_version.py b/src/neptune/metadata_containers/model_version.py index c07c20752..b8944b7f5 100644 --- a/src/neptune/metadata_containers/model_version.py +++ b/src/neptune/metadata_containers/model_version.py @@ -15,15 +15,36 @@ # __all__ = ["ModelVersion"] -from neptune.attributes.constants import SYSTEM_STAGE_ATTRIBUTE_PATH +import os +from typing import Optional + +from neptune.attributes.constants import ( + SYSTEM_NAME_ATTRIBUTE_PATH, + SYSTEM_STAGE_ATTRIBUTE_PATH, +) +from neptune.common.exceptions import NeptuneException +from neptune.envs import CONNECTION_MODE from neptune.exceptions import ( InactiveModelVersionException, + NeedExistingModelVersionForReadOnlyMode, + NeptuneMissingRequiredInitParameter, NeptuneOfflineModeChangeStageException, ) +from neptune.internal.backends.api_model import ApiExperiment +from neptune.internal.backgroud_job_list import BackgroundJobList from neptune.internal.container_type import ContainerType +from neptune.internal.id_formats import QualifiedName +from neptune.internal.init.parameters import ( + DEFAULT_FLUSH_PERIOD, + DEFAULT_NAME, + OFFLINE_PROJECT_QUALIFIED_NAME, +) from neptune.internal.operation_processors.offline_operation_processor import OfflineOperationProcessor from neptune.internal.state import ContainerState +from neptune.internal.utils import verify_type +from neptune.internal.utils.ping_background_job import PingBackgroundJob from neptune.metadata_containers import MetadataContainer +from neptune.types.mode import Mode from neptune.types.model_version_stage import ModelVersionStage @@ -38,6 +59,72 @@ class ModelVersion(MetadataContainer): container_type = ContainerType.MODEL_VERSION + def __init__( + self, + with_id: Optional[str] = None, + *, + name: Optional[str] = None, + model: Optional[str] = None, + project: Optional[str] = None, + api_token: Optional[str] = None, + mode: Optional[str] = None, + flush_period: float = DEFAULT_FLUSH_PERIOD, + proxies: Optional[dict] = None, + ): + verify_type("with_id", with_id, (str, type(None))) + verify_type("name", name, (str, type(None))) + verify_type("model", model, (str, type(None))) + verify_type("project", project, (str, type(None))) + verify_type("mode", mode, (str, type(None))) + + self._model: Optional[str] = model + self._with_id: Optional[str] = with_id + self._name: Optional[str] = DEFAULT_NAME if model is None and name is None else name + + # make mode proper Enum instead of string + mode = Mode(mode or os.getenv(CONNECTION_MODE) or Mode.ASYNC.value) + + if mode == Mode.OFFLINE: + raise NeptuneException("ModelVersion can't be initialized in OFFLINE mode") + + if mode == Mode.DEBUG: + project = OFFLINE_PROJECT_QUALIFIED_NAME + + super().__init__(project=project, api_token=api_token, mode=mode, flush_period=flush_period, proxies=proxies) + + def _get_or_create_api_object(self) -> ApiExperiment: + project_workspace = self._project_api_object.workspace + project_name = self._project_api_object.name + project_qualified_name = f"{project_workspace}/{project_name}" + + if self._with_id is not None: + # with_id (resume existing model_version) has priority over model (creating a new model_version) + return self._backend.get_metadata_container( + container_id=QualifiedName(project_qualified_name + "/" + self._with_id), + expected_container_type=self.container_type, + ) + elif self._model is not None: + if self._mode == Mode.READ_ONLY: + raise NeedExistingModelVersionForReadOnlyMode() + + api_model = self._backend.get_metadata_container( + container_id=QualifiedName(project_qualified_name + "/" + self._model), + expected_container_type=ContainerType.MODEL, + ) + return self._backend.create_model_version(project_id=self._project_api_object.id, model_id=api_model.id) + else: + raise NeptuneMissingRequiredInitParameter( + parameter_name="model", + called_function="init_model_version", + ) + + def _prepare_background_jobs(self) -> BackgroundJobList: + return BackgroundJobList([PingBackgroundJob()]) + + def _write_initial_attributes(self): + if self._name is not None: + self[SYSTEM_NAME_ATTRIBUTE_PATH] = self._name + def _raise_if_stopped(self): if self._state == ContainerState.STOPPED: raise InactiveModelVersionException(label=self._sys_id) diff --git a/src/neptune/metadata_containers/project.py b/src/neptune/metadata_containers/project.py index f6a078113..009ae2a0a 100644 --- a/src/neptune/metadata_containers/project.py +++ b/src/neptune/metadata_containers/project.py @@ -15,7 +15,7 @@ # __all__ = ["Project"] -import threading +import os from typing import ( Any, Dict, @@ -24,8 +24,10 @@ Union, ) +from neptune.common.exceptions import NeptuneException +from neptune.envs import CONNECTION_MODE from neptune.exceptions import InactiveProjectException -from neptune.internal.backends.neptune_backend import NeptuneBackend +from neptune.internal.backends.api_model import ApiExperiment from neptune.internal.backends.nql import ( NQLAggregator, NQLAttributeOperator, @@ -33,15 +35,13 @@ NQLQueryAggregate, NQLQueryAttribute, ) -from neptune.internal.background_job import BackgroundJob from neptune.internal.container_type import ContainerType -from neptune.internal.id_formats import ( - SysId, - UniqueId, -) -from neptune.internal.operation_processors.operation_processor import OperationProcessor +from neptune.internal.init.parameters import DEFAULT_FLUSH_PERIOD from neptune.internal.state import ContainerState -from neptune.internal.utils import as_list +from neptune.internal.utils import ( + as_list, + verify_type, +) from neptune.internal.utils.run_state import RunState from neptune.metadata_containers import MetadataContainer from neptune.metadata_containers.metadata_containers_table import Table @@ -61,28 +61,30 @@ class Project(MetadataContainer): def __init__( self, + project: Optional[str] = None, *, - id_: UniqueId, - mode: Mode, - backend: NeptuneBackend, - op_processor: OperationProcessor, - background_job: BackgroundJob, - lock: threading.RLock, - workspace: str, - project_name: str, - sys_id: SysId, + api_token: Optional[str] = None, + mode: Optional[str] = None, + flush_period: float = DEFAULT_FLUSH_PERIOD, + proxies: Optional[dict] = None, ): - super().__init__( - id_=id_, - mode=mode, - backend=backend, - op_processor=op_processor, - background_job=background_job, - lock=lock, - project_id=id_, - project_name=project_name, - workspace=workspace, - sys_id=sys_id, + verify_type("mode", mode, (str, type(None))) + + # make mode proper Enum instead of string + mode = Mode(mode or os.getenv(CONNECTION_MODE) or Mode.ASYNC.value) + + if mode == Mode.OFFLINE: + raise NeptuneException("Project can't be initialized in OFFLINE mode") + + super().__init__(project=project, api_token=api_token, mode=mode, flush_period=flush_period, proxies=proxies) + + def _get_or_create_api_object(self) -> ApiExperiment: + return ApiExperiment( + id=self._project_api_object.id, + type=ContainerType.PROJECT, + sys_id=self._project_api_object.sys_id, + workspace=self._project_api_object.workspace, + project_name=self._project_api_object.name, ) def _raise_if_stopped(self): @@ -262,6 +264,7 @@ def fetch_runs_table( tags = as_list("tag", tag) nql_query = self._prepare_nql_query(ids, states, owners, tags) + return MetadataContainer._fetch_entries( self, child_type=ContainerType.RUN, diff --git a/src/neptune/metadata_containers/run.py b/src/neptune/metadata_containers/run.py index fc489d09b..b80702c27 100644 --- a/src/neptune/metadata_containers/run.py +++ b/src/neptune/metadata_containers/run.py @@ -15,26 +15,80 @@ # __all__ = ["Run"] +import os import threading +from platform import node as get_hostname from typing import ( Any, Dict, + List, Optional, + Tuple, Union, ) -from neptune.exceptions import InactiveRunException +from neptune.attributes.constants import ( + SYSTEM_DESCRIPTION_ATTRIBUTE_PATH, + SYSTEM_FAILED_ATTRIBUTE_PATH, + SYSTEM_HOSTNAME_ATTRIBUTE_PATH, + SYSTEM_NAME_ATTRIBUTE_PATH, + SYSTEM_TAGS_ATTRIBUTE_PATH, +) +from neptune.common.warnings import ( + NeptuneWarning, + warn_once, +) +from neptune.envs import ( + CONNECTION_MODE, + CUSTOM_RUN_ID_ENV_NAME, + MONITORING_NAMESPACE, + NEPTUNE_NOTEBOOK_ID, + NEPTUNE_NOTEBOOK_PATH, +) +from neptune.exceptions import ( + InactiveRunException, + NeedExistingRunForReadOnlyMode, + NeptunePossibleLegacyUsageException, + NeptuneRunResumeAndCustomIdCollision, +) +from neptune.internal.backends.api_model import ApiExperiment from neptune.internal.backends.neptune_backend import NeptuneBackend -from neptune.internal.background_job import BackgroundJob +from neptune.internal.backgroud_job_list import BackgroundJobList from neptune.internal.container_type import ContainerType -from neptune.internal.id_formats import ( - SysId, - UniqueId, +from neptune.internal.hardware.hardware_metric_reporting_job import HardwareMetricReportingJob +from neptune.internal.id_formats import QualifiedName +from neptune.internal.init.parameters import ( + DEFAULT_FLUSH_PERIOD, + DEFAULT_NAME, + OFFLINE_PROJECT_QUALIFIED_NAME, ) -from neptune.internal.operation_processors.operation_processor import OperationProcessor +from neptune.internal.notebooks.notebooks import create_checkpoint from neptune.internal.state import ContainerState +from neptune.internal.streams.std_capture_background_job import ( + StderrCaptureBackgroundJob, + StdoutCaptureBackgroundJob, +) +from neptune.internal.utils import ( + verify_collection_type, + verify_type, +) +from neptune.internal.utils.git import ( + discover_git_repo_location, + get_git_info, +) +from neptune.internal.utils.hashing import generate_hash +from neptune.internal.utils.limits import custom_run_id_exceeds_length +from neptune.internal.utils.ping_background_job import PingBackgroundJob +from neptune.internal.utils.runningmode import ( + in_interactive, + in_notebook, +) +from neptune.internal.utils.source_code import upload_source_code +from neptune.internal.utils.traceback_job import TracebackJob +from neptune.internal.websockets.websocket_signals_background_job import WebsocketSignalsBackgroundJob from neptune.metadata_containers import MetadataContainer from neptune.types.mode import Mode +from neptune.types.series.string_series import StringSeries class Run(MetadataContainer): @@ -106,32 +160,289 @@ class Run(MetadataContainer): def __init__( self, + with_id: Optional[str] = None, *, - id_: UniqueId, - mode: Mode, - backend: NeptuneBackend, - op_processor: OperationProcessor, - background_job: BackgroundJob, - lock: threading.RLock, - workspace: str, - project_name: str, - sys_id: SysId, - project_id: UniqueId, - monitoring_namespace: str = "monitoring", + project: Optional[str] = None, + api_token: Optional[str] = None, + custom_run_id: Optional[str] = None, + mode: Optional[str] = None, + name: Optional[str] = None, + description: Optional[str] = None, + tags: Optional[Union[List[str], str]] = None, + source_files: Optional[Union[List[str], str]] = None, + capture_stdout: Optional[bool] = None, + capture_stderr: Optional[bool] = None, + capture_hardware_metrics: Optional[bool] = None, + fail_on_exception: bool = True, + monitoring_namespace: Optional[str] = None, + flush_period: float = DEFAULT_FLUSH_PERIOD, + proxies: Optional[dict] = None, + capture_traceback: bool = True, + **kwargs, ): - super().__init__( - id_=id_, - mode=mode, - backend=backend, - op_processor=op_processor, - background_job=background_job, - lock=lock, - project_id=project_id, - project_name=project_name, - workspace=workspace, - sys_id=sys_id, + """Starts a new tracked run and adds it to the top of the runs table. + + If you provide the ID of an existing run, that run is resumed and no new run is created. + + Args: + project: Name of the project where the run should go, in the form "workspace-name/project_name". + If None (default), the value of the NEPTUNE_PROJECT environment variable is used. + api_token: User's API token. Defaults to None. + If None (default), the value of the NEPTUNE_API_TOKEN environment variable is used. + Note: To keep your API token secure, save it to the NEPTUNE_API_TOKEN environment variable rather than + placing it in plain text in the source code. + with_id: If you want to resume a run, the identifier of the existing run. + For example, 'SAN-1'. A run with such an ID must exist. + If None (default) is passed, starts a new tracked run. + custom_run_id: A unique identifier to be used when running Neptune in pipelines. + Make sure to use the same identifier throughout the whole pipeline execution. + mode: Connection mode in which the tracking will work. + If None (default), the value of the NEPTUNE_MODE environment variable is used. + If no value was set for the environment variable, 'async' is used by default. + Possible values are 'async', 'sync', 'offline', 'read-only', and 'debug'. + name: Editable name of the run. Defaults to 'Untitled'. + The name is displayed in the run details and as a column in the runs table. + description: Editable description of the run. Defaults to `''`. + The description is displayed in the run details and can be added to the runs table as a column. + tags: Tags of the run as a list of strings. Defaults to `[]`. + Tags are displayed in the run details and in the runs table as a column. + You can edit the tags after the run is created, either through the app or the API. + source_files: List of source files to be uploaded. + Uploaded source files are displayed in the 'Source code' tab of the run view. + To not upload anything, pass an empty list (`[]`). + Unix style pathname pattern expansion is supported. For example, you can pass `*.py` to upload + all Python files from the current directory. + If None is passed, the Python file from which the run was created will be uploaded. + capture_stdout: Whether to log the stdout of the run. Defaults to True. + The data is logged under the monitoring namespace (see the 'monitoring_namespace' parameter). + capture_stderr: Whether to log the stderr of the run. Defaults to True. + The data is logged under the monitoring namespace (see the 'monitoring_namespace' parameter). + capture_hardware_metrics: Whether to send hardware monitoring logs (CPU, GPU, and memory utilization). + Defaults to True. + The data is logged under the monitoring namespace (see the 'monitoring_namespace' parameter). + fail_on_exception: Whether to register an uncaught exception handler to this process and, + in case of an exception, set the 'sys/failed' field of the run to True. + An exception is always logged. + monitoring_namespace: Namespace inside which all hardware monitoring logs are stored. + Defaults to 'monitoring/', where the hash is generated based on environment information, + to ensure that it's unique for each process. + flush_period: In the asynchronous (default) connection mode, how often disk flushing is triggered. + Defaults to 5 (every 5 seconds). + proxies: Argument passed to HTTP calls made via the Requests library, as dictionary of strings. + For more information, see the 'Proxies' section in the Requests documentation. + capture_traceback: Whether to log the traceback of the run in case of an exception. + Defaults to True. + The tracked metadata is stored in the '/traceback' namespace (see the + 'monitoring_namespace' parameter). + + Returns: + Run object that is used to manage the tracked run and log metadata to it. + + Examples: + + Creating a new run: + + >>> import neptune + + >>> # Minimal invoke + ... # (creates a run in the project specified by the NEPTUNE_PROJECT environment variable) + ... run = neptune.init_run() + + >>> # Create a tracked run with a name and description, and no sources files uploaded + >>> run = neptune.init_run( + ... name="neural-net-mnist", + ... description="neural net trained on MNIST", + ... source_files=[], + ... ) + + >>> # Log all .py files from all subdirectories, excluding hidden files + ... run = neptune.init_run(source_files="**/*.py") + + >>> # Log all files and directories in the current working directory, excluding hidden files + ... run = neptune.init_run(source_files="*") + + >>> # Larger example + ... run = neptune.init_run( + ... project="ml-team/classification", + ... name="first-pytorch-ever", + ... description="Longer description of the run goes here", + ... tags=["tags", "go-here", "as-list-of-strings"], + ... source_files=["training_with_pytorch.py", "net.py"], + ... monitoring_namespace="system_metrics", + ... capture_stderr=False, + ... ) + + Connecting to an existing run: + + >>> # Resume logging to an existing run with the ID "SAN-3" + ... run = neptune.init_run(with_id="SAN-3") + ... run["parameters/lr"] = 0.1 # modify or add metadata + + >>> # Initialize an existing run in read-only mode (logging new data is not possible, only fetching) + ... run = neptune.init_run(with_id="SAN-4", mode="read-only") + ... learning_rate = run["parameters/lr"].fetch() + + For more, see the API reference: + https://docs.neptune.ai/api/neptune#init_run + """ + check_for_extra_kwargs("Run", kwargs) + + verify_type("with_id", with_id, (str, type(None))) + verify_type("project", project, (str, type(None))) + verify_type("custom_run_id", custom_run_id, (str, type(None))) + verify_type("mode", mode, (str, type(None))) + verify_type("name", name, (str, type(None))) + verify_type("description", description, (str, type(None))) + verify_type("capture_stdout", capture_stdout, (bool, type(None))) + verify_type("capture_stderr", capture_stderr, (bool, type(None))) + verify_type("capture_hardware_metrics", capture_hardware_metrics, (bool, type(None))) + verify_type("fail_on_exception", fail_on_exception, bool) + verify_type("monitoring_namespace", monitoring_namespace, (str, type(None))) + verify_type("capture_traceback", capture_traceback, bool) + if tags is not None: + if isinstance(tags, str): + tags = [tags] + else: + verify_collection_type("tags", tags, str) + if source_files is not None: + if isinstance(source_files, str): + source_files = [source_files] + else: + verify_collection_type("source_files", source_files, str) + + self._with_id: Optional[str] = with_id + self._name: Optional[str] = DEFAULT_NAME if with_id is None and name is None else name + self._description: Optional[str] = "" if with_id is None and description is None else description + self._custom_run_id: Optional[str] = custom_run_id or os.getenv(CUSTOM_RUN_ID_ENV_NAME) + self._hostname: str = get_hostname() + self._pid: int = os.getpid() + self._tid: int = threading.get_ident() + self._tags: Optional[List[str]] = tags + self._source_files: Optional[List[str]] = source_files + self._fail_on_exception: bool = fail_on_exception + self._capture_traceback: bool = capture_traceback + + self._monitoring_namespace: str = ( + monitoring_namespace + or os.getenv(MONITORING_NAMESPACE) + or generate_monitoring_namespace(self._hostname, self._pid, self._tid) ) - self.monitoring_namespace = monitoring_namespace + + # for backward compatibility imports + mode = Mode(mode or os.getenv(CONNECTION_MODE) or Mode.ASYNC.value) + + self._stdout_path: str = "{}/stdout".format(self._monitoring_namespace) + self._capture_stdout: bool = capture_stdout + if capture_stdout is None: + self._capture_stdout = capture_only_if_non_interactive(mode=mode) + + self._stderr_path: str = "{}/stderr".format(self._monitoring_namespace) + self._capture_stderr: bool = capture_stderr + if capture_stderr is None: + self._capture_stderr = capture_only_if_non_interactive(mode=mode) + + self._capture_hardware_metrics: bool = capture_hardware_metrics + if capture_hardware_metrics is None: + self._capture_hardware_metrics = capture_only_if_non_interactive(mode=mode) + + if with_id and custom_run_id: + raise NeptuneRunResumeAndCustomIdCollision() + + if mode == Mode.OFFLINE or mode == Mode.DEBUG: + project = OFFLINE_PROJECT_QUALIFIED_NAME + + super().__init__(project=project, api_token=api_token, mode=mode, flush_period=flush_period, proxies=proxies) + + def _get_or_create_api_object(self) -> ApiExperiment: + project_workspace = self._project_api_object.workspace + project_name = self._project_api_object.name + project_qualified_name = f"{project_workspace}/{project_name}" + + if self._with_id: + return self._backend.get_metadata_container( + container_id=QualifiedName(project_qualified_name + "/" + self._with_id), + expected_container_type=Run.container_type, + ) + else: + if self._mode == Mode.READ_ONLY: + raise NeedExistingRunForReadOnlyMode() + + git_ref = get_git_info(discover_git_repo_location()) + + custom_run_id = self._custom_run_id + if custom_run_id_exceeds_length(self._custom_run_id): + custom_run_id = None + + notebook_id, checkpoint_id = create_notebook_checkpoint(backend=self._backend) + + return self._backend.create_run( + project_id=self._project_api_object.id, + git_ref=git_ref, + custom_run_id=custom_run_id, + notebook_id=notebook_id, + checkpoint_id=checkpoint_id, + ) + + def _prepare_background_jobs(self) -> BackgroundJobList: + background_jobs = [PingBackgroundJob()] + + websockets_factory = self._backend.websockets_factory(self._project_api_object.id, self._id) + if websockets_factory: + background_jobs.append(WebsocketSignalsBackgroundJob(websockets_factory)) + + if self._capture_stdout: + background_jobs.append(StdoutCaptureBackgroundJob(attribute_name=self._stdout_path)) + + if self._capture_stderr: + background_jobs.append(StderrCaptureBackgroundJob(attribute_name=self._stderr_path)) + + if self._capture_hardware_metrics: + background_jobs.append(HardwareMetricReportingJob(attribute_namespace=self._monitoring_namespace)) + + if self._capture_traceback: + background_jobs.append( + TracebackJob(path=f"{self._monitoring_namespace}/traceback", fail_on_exception=self._fail_on_exception) + ) + + return BackgroundJobList(background_jobs) + + def _write_initial_attributes(self): + if self._name is not None: + self[SYSTEM_NAME_ATTRIBUTE_PATH] = self._name + + if self._description is not None: + self[SYSTEM_DESCRIPTION_ATTRIBUTE_PATH] = self._description + + if self._hostname is not None: + self[f"{self._monitoring_namespace}/hostname"] = self._hostname + if self._with_id is None: + self[SYSTEM_HOSTNAME_ATTRIBUTE_PATH] = self._hostname + + if self._pid is not None: + self[f"{self._monitoring_namespace}/pid"] = str(self._pid) + + if self._tid is not None: + self[f"{self._monitoring_namespace}/tid"] = str(self._tid) + + if self._tags is not None: + self[SYSTEM_TAGS_ATTRIBUTE_PATH].add(self._tags) + + if self._with_id is None: + self[SYSTEM_FAILED_ATTRIBUTE_PATH] = False + + if self._capture_stdout and not self.exists(self._stdout_path): + self.define(self._stdout_path, StringSeries([])) + + if self._capture_stderr and not self.exists(self._stderr_path): + self.define(self._stderr_path, StringSeries([])) + + if self._with_id is None or self._source_files is not None: + # upload default sources ONLY if creating a new run + upload_source_code(source_files=self._source_files, run=self) + + @property + def monitoring_namespace(self) -> str: + return self._monitoring_namespace def _raise_if_stopped(self): if self._state == ContainerState.STOPPED: @@ -311,3 +622,43 @@ def sync(self, *, wait: bool = True) -> None: https://docs.neptune.ai/api/run#sync """ return super().sync(wait=wait) + + +def capture_only_if_non_interactive(mode) -> bool: + if in_interactive() or in_notebook(): + if mode in {Mode.OFFLINE, Mode.SYNC, Mode.ASYNC}: + warn_once( + "To avoid unintended consumption of logging hours during interactive sessions, the" + " following monitoring options are disabled unless set to 'True' when initializing" + " the run: 'capture_stdout', 'capture_stderr', and 'capture_hardware_metrics'.", + exception=NeptuneWarning, + ) + return False + return True + + +def generate_monitoring_namespace(*descriptors) -> str: + return f"monitoring/{generate_hash(*descriptors, length=8)}" + + +def check_for_extra_kwargs(caller_name: str, kwargs: dict): + legacy_kwargs = ("project_qualified_name", "backend") + + for name in legacy_kwargs: + if name in kwargs: + raise NeptunePossibleLegacyUsageException() + + if kwargs: + first_key = next(iter(kwargs.keys())) + raise TypeError(f"{caller_name}() got an unexpected keyword argument '{first_key}'") + + +def create_notebook_checkpoint(backend: NeptuneBackend) -> Tuple[Optional[str], Optional[str]]: + notebook_id = os.getenv(NEPTUNE_NOTEBOOK_ID, None) + notebook_path = os.getenv(NEPTUNE_NOTEBOOK_PATH, None) + + checkpoint_id = None + if notebook_id is not None and notebook_path is not None: + checkpoint_id = create_checkpoint(backend=backend, notebook_id=notebook_id, notebook_path=notebook_path) + + return notebook_id, checkpoint_id diff --git a/tests/unit/neptune/new/attributes/atoms/test_artifact.py b/tests/unit/neptune/new/attributes/atoms/test_artifact.py index c8c402036..14ab2d7e8 100644 --- a/tests/unit/neptune/new/attributes/atoms/test_artifact.py +++ b/tests/unit/neptune/new/attributes/atoms/test_artifact.py @@ -22,8 +22,10 @@ from mock import ( MagicMock, call, + patch, ) +from neptune import Run from neptune.attributes.atoms.artifact import Artifact from neptune.exceptions import NeptuneUnhandledArtifactTypeException from neptune.internal.artifacts.types import ( @@ -39,12 +41,20 @@ class TestArtifact(TestAttributeBase): - def setUp(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def setUp(self, get_operation_processor): self.monkeypatch = MonkeyPatch() self.wait = self._random_wait() self.op_processor = MagicMock() - self.exp = self._create_run(processor=self.op_processor) + get_operation_processor.return_value = self.op_processor + self.exp = Run( + mode="debug", + capture_stdout=False, + capture_stderr=False, + capture_traceback=False, + capture_hardware_metrics=False, + ) self.path = self._random_path() self.path_str = path_to_str(self.path) @@ -93,6 +103,7 @@ def download_file(cls, destination: pathlib.Path, file_definition: ArtifactFileD self.test_artifact_driver = TestArtifactDriver def tearDown(self): + self.exp.stop() self.monkeypatch.undo() def test_fetch_hash(self): diff --git a/tests/unit/neptune/new/attributes/atoms/test_artifact_hash.py b/tests/unit/neptune/new/attributes/atoms/test_artifact_hash.py index 3bcb4c471..0328d7236 100644 --- a/tests/unit/neptune/new/attributes/atoms/test_artifact_hash.py +++ b/tests/unit/neptune/new/attributes/atoms/test_artifact_hash.py @@ -29,31 +29,31 @@ def test_assign_type_error(self): Artifact(MagicMock(), MagicMock()).assign(value) def test_fetch(self): - exp, path = self._create_run(), self._random_path() - var = Artifact(exp, path) - var._enqueue_operation( - AssignArtifact( - var._path, - ArtifactVal("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855").hash, - ), - wait=False, - ) - self.assertEqual( - ArtifactVal("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"), - var.fetch(), - ) + with self._exp() as exp: + var = Artifact(exp, self._random_path()) + var._enqueue_operation( + AssignArtifact( + var._path, + ArtifactVal("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855").hash, + ), + wait=False, + ) + self.assertEqual( + ArtifactVal("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"), + var.fetch(), + ) def test_fetch_hash(self): - exp, path = self._create_run(), self._random_path() - var = Artifact(exp, path) - var._enqueue_operation( - AssignArtifact( - var._path, - ArtifactVal("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855").hash, - ), - wait=False, - ) - self.assertEqual( - "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - var.fetch_hash(), - ) + with self._exp() as exp: + var = Artifact(exp, self._random_path()) + var._enqueue_operation( + AssignArtifact( + var._path, + ArtifactVal("e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855").hash, + ), + wait=False, + ) + self.assertEqual( + "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + var.fetch_hash(), + ) diff --git a/tests/unit/neptune/new/attributes/atoms/test_datetime.py b/tests/unit/neptune/new/attributes/atoms/test_datetime.py index ae50df5e4..7555f68df 100644 --- a/tests/unit/neptune/new/attributes/atoms/test_datetime.py +++ b/tests/unit/neptune/new/attributes/atoms/test_datetime.py @@ -16,7 +16,10 @@ from datetime import datetime -from mock import MagicMock +from mock import ( + MagicMock, + patch, +) from neptune.attributes.atoms.datetime import ( Datetime, @@ -27,7 +30,8 @@ class TestDatetime(TestAttributeBase): - def test_assign(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_assign(self, get_operation_processor): now = datetime.now() value_and_expected = [ (now, now.replace(microsecond=1000 * int(now.microsecond / 1000))), @@ -39,14 +43,15 @@ def test_assign(self): for value, expected in value_and_expected: processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), + get_operation_processor.return_value = processor + path, wait = ( self._random_path(), self._random_wait(), ) - var = Datetime(exp, path) - var.assign(value, wait=wait) - processor.enqueue_operation.assert_called_once_with(AssignDatetime(path, expected), wait=wait) + with self._exp() as exp: + var = Datetime(exp, path) + var.assign(value, wait=wait) + processor.enqueue_operation.assert_called_with(AssignDatetime(path, expected), wait=wait) def test_assign_type_error(self): values = [55, None] @@ -55,9 +60,9 @@ def test_assign_type_error(self): Datetime(MagicMock(), MagicMock()).assign(value) def test_get(self): - exp, path = self._create_run(), self._random_path() - var = Datetime(exp, path) - now = datetime.now() - now = now.replace(microsecond=int(now.microsecond / 1000) * 1000) - var.assign(now) - self.assertEqual(now, var.fetch()) + with self._exp() as exp: + var = Datetime(exp, self._random_path()) + now = datetime.now() + now = now.replace(microsecond=int(now.microsecond / 1000) * 1000) + var.assign(now) + self.assertEqual(now, var.fetch()) diff --git a/tests/unit/neptune/new/attributes/atoms/test_file.py b/tests/unit/neptune/new/attributes/atoms/test_file.py index 245440f97..21305b458 100644 --- a/tests/unit/neptune/new/attributes/atoms/test_file.py +++ b/tests/unit/neptune/new/attributes/atoms/test_file.py @@ -22,7 +22,10 @@ from pathlib import Path from unittest.mock import PropertyMock -from mock import MagicMock +from mock import ( + MagicMock, + patch, +) from neptune.attributes.atoms.file import ( File, @@ -44,7 +47,8 @@ class TestFile(TestAttributeBase): @unittest.skipIf(IS_WINDOWS, "Windows behaves strangely") - def test_assign(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_assign(self, get_operation_processor): def get_tmp_uploaded_file_name(tmp_upload_dir): """Get tmp file to uploaded from `upload_path` - here's assumption that we upload only one file per one path in test""" @@ -79,23 +83,25 @@ def get_tmp_uploaded_file_name(tmp_upload_dir): with tmp_context() as tmp_upload_dir: processor = MagicMock() processor._operation_storage = PropertyMock(upload_path=Path(tmp_upload_dir)) - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = File(exp, path) - var.assign(value, wait=wait) - - if value.file_type is not FileType.LOCAL_FILE: - tmp_uploaded_file = get_tmp_uploaded_file_name(tmp_upload_dir) - self.assertTrue(os.path.exists(tmp_uploaded_file)) - else: - tmp_uploaded_file = None - - processor.enqueue_operation.assert_called_once_with( - operation_factory(path, tmp_uploaded_file), wait=wait - ) + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = File(exp, path) + var.assign(value, wait=wait) + + if value.file_type is not FileType.LOCAL_FILE: + tmp_uploaded_file = get_tmp_uploaded_file_name(tmp_upload_dir) + self.assertTrue(os.path.exists(tmp_uploaded_file)) + else: + tmp_uploaded_file = None + + processor.enqueue_operation.assert_called_with( + operation_factory(path, tmp_uploaded_file), wait=wait + ) def test_assign_type_error(self): values = [55, None, []] @@ -104,36 +110,42 @@ def test_assign_type_error(self): File(MagicMock(), MagicMock()).assign(value) @unittest.skipIf(IS_WINDOWS, "Windows behaves strangely") - def test_save(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_save(self, get_operation_processor): value_and_expected = [("some/path", os.getcwd() + "/some/path")] for value, expected in value_and_expected: processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = File(exp, path) - var.upload(value, wait=wait) - processor.enqueue_operation.assert_called_once_with( - UploadFile(path=path, ext="", file_path=expected), wait=wait - ) + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = File(exp, path) + var.upload(value, wait=wait) + processor.enqueue_operation.assert_called_with( + UploadFile(path=path, ext="", file_path=expected), wait=wait + ) @unittest.skipIf(IS_WINDOWS, "Windows behaves strangely") - def test_save_files(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_save_files(self, get_operation_processor): value_and_expected = [("some/path/*", [os.getcwd() + "/some/path/*"])] for value, expected in value_and_expected: processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = FileSet(exp, path) - var.upload_files(value, wait=wait) - processor.enqueue_operation.assert_called_once_with(UploadFileSet(path, expected, False), wait=wait) + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = FileSet(exp, path) + var.upload_files(value, wait=wait) + processor.enqueue_operation.assert_called_with(UploadFileSet(path, expected, False), wait=wait) def test_save_type_error(self): values = [55, None, [], FileVal] @@ -157,21 +169,21 @@ def test_fetch_extension(self): ] for value, expected_ext in value_and_expected_ext: - exp, path, wait = ( - self._create_run(), - self._random_path(), - self._random_wait(), - ) - var = File(exp, path) - var.assign(value, wait=wait) - self.assertEqual(expected_ext, var.fetch_extension()) + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = File(exp, path) + var.assign(value, wait=wait) + self.assertEqual(expected_ext, var.fetch_extension()) def test_clean_files_on_close(self): - run = self._create_run() - data_path = run._op_processor._operation_storage.data_path + with self._exp() as run: + data_path = run._op_processor._operation_storage.data_path - assert os.path.exists(data_path) + assert os.path.exists(data_path) - run.stop() + run.stop() - assert not os.path.exists(data_path) + assert not os.path.exists(data_path) diff --git a/tests/unit/neptune/new/attributes/atoms/test_float.py b/tests/unit/neptune/new/attributes/atoms/test_float.py index 95e447b94..fd2d1b51c 100644 --- a/tests/unit/neptune/new/attributes/atoms/test_float.py +++ b/tests/unit/neptune/new/attributes/atoms/test_float.py @@ -13,7 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from mock import MagicMock +from mock import ( + MagicMock, + patch, +) from neptune.attributes.atoms.float import ( Float, @@ -24,7 +27,11 @@ class TestFloat(TestAttributeBase): - def test_assign(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_assign(self, get_operation_processor): + processor = MagicMock() + get_operation_processor.return_value = processor + value_and_expected = [ (13, 13), (15.3, 15.3), @@ -33,15 +40,14 @@ def test_assign(self): ] for value, expected in value_and_expected: - processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), + path, wait = ( self._random_path(), self._random_wait(), ) - var = Float(exp, path) - var.assign(value, wait=wait) - processor.enqueue_operation.assert_called_once_with(AssignFloat(path, expected), wait=wait) + with self._exp() as run: + var = Float(run, path) + var.assign(value, wait=wait) + processor.enqueue_operation.assert_called_with(AssignFloat(path, expected), wait=wait) def test_assign_type_error(self): values = ["string", None] @@ -50,7 +56,7 @@ def test_assign_type_error(self): Float(MagicMock(), MagicMock()).assign(value) def test_get(self): - exp, path = self._create_run(), self._random_path() - var = Float(exp, path) - var.assign(5) - self.assertEqual(5, var.fetch()) + with self._exp() as run: + var = Float(run, self._random_path()) + var.assign(5) + self.assertEqual(5, var.fetch()) diff --git a/tests/unit/neptune/new/attributes/atoms/test_string.py b/tests/unit/neptune/new/attributes/atoms/test_string.py index 6703e4dc6..f17f596aa 100644 --- a/tests/unit/neptune/new/attributes/atoms/test_string.py +++ b/tests/unit/neptune/new/attributes/atoms/test_string.py @@ -13,7 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from mock import MagicMock +from mock import ( + MagicMock, + patch, +) from neptune.attributes.atoms.string import ( String, @@ -24,25 +27,28 @@ class TestString(TestAttributeBase): - def test_assign(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_assign(self, get_operation_processor): + processor = MagicMock() + get_operation_processor.return_value = processor + value_and_expected = [ ("qwertyu", "qwertyu"), (StringVal("Some string"), "Some string"), ] for value, expected in value_and_expected: - processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), + path, wait = ( self._random_path(), self._random_wait(), ) - var = String(exp, path) - var.assign(value, wait=wait) - processor.enqueue_operation.assert_called_once_with(AssignString(path, expected), wait=wait) + with self._exp() as exp: + var = String(exp, path) + var.assign(value, wait=wait) + processor.enqueue_operation.assert_called_with(AssignString(path, expected), wait=wait) def test_get(self): - exp, path = self._create_run(), self._random_path() - var = String(exp, path) - var.assign("adfh") - self.assertEqual("adfh", var.fetch()) + with self._exp() as exp: + var = String(exp, self._random_path()) + var.assign("adfh") + self.assertEqual("adfh", var.fetch()) diff --git a/tests/unit/neptune/new/attributes/series/test_file_series.py b/tests/unit/neptune/new/attributes/series/test_file_series.py index b80b8cbbe..adbfe646d 100644 --- a/tests/unit/neptune/new/attributes/series/test_file_series.py +++ b/tests/unit/neptune/new/attributes/series/test_file_series.py @@ -51,86 +51,18 @@ def test_log_type_error(self): with self.assertRaises(TypeError): FileSeries(MagicMock(), MagicMock()).log(value) - def test_log_content(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_log_content(self, get_operation_processor): # given wait = self._random_wait() path = self._random_path() - op_processor = MagicMock() - exp = self._create_run(processor=op_processor) - attr = FileSeries(exp, path) - - file = File.as_image(numpy.random.rand(10, 10) * 255) - - # when - attr.log( - file, - step=3, - timestamp=self._now(), - wait=wait, - name="nazwa", - description="opis", - ) - - # then - op_processor.enqueue_operation.assert_called_once_with( - LogImages( - path, - [ - LogImages.ValueType( - ImageValue(base64_encode(file.content), "nazwa", "opis"), - 3, - self._now(), - ) - ], - ), - wait=wait, - ) + processor = MagicMock() + get_operation_processor.return_value = processor - def test_assign_content(self): - # given - wait = self._random_wait() - path = self._random_path() - op_processor = MagicMock() - exp = self._create_run(processor=op_processor) - attr = FileSeries(exp, path) - - file = File.as_image(numpy.random.rand(10, 10) * 255) - - # when - attr.assign([file], wait=wait) - - # then - op_processor.enqueue_operation.assert_has_calls( - [ - call(ClearImageLog(path), wait=False), - call( - LogImages( - path, - [ - LogImages.ValueType( - ImageValue(base64_encode(file.content), None, None), - None, - self._now(), - ) - ], - ), - wait=wait, - ), - ] - ) - - def test_log_path(self): - # given - wait = self._random_wait() - path = self._random_path() - op_processor = MagicMock() - exp = self._create_run(processor=op_processor) - attr = FileSeries(exp, path) + with self._exp() as exp: + attr = FileSeries(exp, path) - file = File.as_image(numpy.random.rand(10, 10) * 255) - stream = File.from_stream(io.BytesIO(file.content)) - with create_file(file.content, binary_mode=True) as tmp_filename: - saved_file = File(tmp_filename) + file = File.as_image(numpy.random.rand(10, 10) * 255) # when attr.log( @@ -138,95 +70,176 @@ def test_log_path(self): step=3, timestamp=self._now(), wait=wait, - description="something", - ) - attr.log( - [stream, saved_file], - timestamp=self._now(), - wait=wait, - description="something", + name="nazwa", + description="opis", ) # then - def generate_expected_call(wait, step): - log_operation = LogImages( - path=path, - values=[ + processor.enqueue_operation.assert_called_with( + LogImages( + path, + [ LogImages.ValueType( - value=ImageValue(base64_encode(file.content), None, "something"), - step=step, - ts=self._now(), + ImageValue(base64_encode(file.content), "nazwa", "opis"), + 3, + self._now(), ) ], - ) - return call( - log_operation, - wait=wait, - ) + ), + wait=wait, + ) + + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_assign_content(self, get_operation_processor): + # given + wait = self._random_wait() + path = self._random_path() + processor = MagicMock() + get_operation_processor.return_value = processor - op_processor.enqueue_operation.assert_has_calls( + with self._exp() as exp: + attr = FileSeries(exp, path) + + file = File.as_image(numpy.random.rand(10, 10) * 255) + + # when + attr.assign([file], wait=wait) + + # then + processor.enqueue_operation.assert_has_calls( [ - generate_expected_call(wait, step=3), - generate_expected_call(wait, step=None), - generate_expected_call(wait, step=None), + call(ClearImageLog(path), wait=False), + call( + LogImages( + path, + [ + LogImages.ValueType( + ImageValue(base64_encode(file.content), None, None), + None, + self._now(), + ) + ], + ), + wait=wait, + ), ] ) + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_log_path(self, get_operation_processor): + # given + wait = self._random_wait() + path = self._random_path() + processor = MagicMock() + get_operation_processor.return_value = processor + + with self._exp() as exp: + attr = FileSeries(exp, path) + + file = File.as_image(numpy.random.rand(10, 10) * 255) + stream = File.from_stream(io.BytesIO(file.content)) + with create_file(file.content, binary_mode=True) as tmp_filename: + saved_file = File(tmp_filename) + + # when + attr.log( + file, + step=3, + timestamp=self._now(), + wait=wait, + description="something", + ) + attr.log( + [stream, saved_file], + timestamp=self._now(), + wait=wait, + description="something", + ) + + # then + def generate_expected_call(wait, step): + log_operation = LogImages( + path=path, + values=[ + LogImages.ValueType( + value=ImageValue(base64_encode(file.content), None, "something"), + step=step, + ts=self._now(), + ) + ], + ) + return call( + log_operation, + wait=wait, + ) + + processor.enqueue_operation.assert_has_calls( + [ + generate_expected_call(wait, step=3), + generate_expected_call(wait, step=None), + generate_expected_call(wait, step=None), + ] + ) + def test_log_raise_not_image(self): # given path = self._random_path() - op_processor = MagicMock() - exp = self._create_run(processor=op_processor) - attr = FileSeries(exp, path) - file = File.from_content("some text") - stream = File.from_stream(io.BytesIO(file.content)) - with create_file(file.content, binary_mode=True) as tmp_filename: - saved_file = File(tmp_filename) + with self._exp() as exp: + attr = FileSeries(exp, path) - # when - with self.assertRaises(OperationNotSupported): - attr.log(file) - with self.assertRaises(OperationNotSupported): - attr.log(saved_file) - with self.assertRaises(OperationNotSupported): - attr.log(stream) + file = File.from_content("some text") + stream = File.from_stream(io.BytesIO(file.content)) + with create_file(file.content, binary_mode=True) as tmp_filename: + saved_file = File(tmp_filename) + + # when + with self.assertRaises(OperationNotSupported): + attr.log(file) + with self.assertRaises(OperationNotSupported): + attr.log(saved_file) + with self.assertRaises(OperationNotSupported): + attr.log(stream) def test_assign_raise_not_image(self): # given path = self._random_path() - op_processor = MagicMock() - exp = self._create_run(processor=op_processor) - attr = FileSeries(exp, path) - file = File.from_content("some text") - stream = File.from_stream(io.BytesIO(file.content)) - with create_file(file.content, binary_mode=True) as tmp_filename: - saved_file = File(tmp_filename) + with self._exp() as exp: + attr = FileSeries(exp, path) - # when - with self.assertRaises(OperationNotSupported): - attr.assign([file]) - with self.assertRaises(OperationNotSupported): - attr.assign([saved_file]) - with self.assertRaises(OperationNotSupported): - attr.assign([stream]) + file = File.from_content("some text") + stream = File.from_stream(io.BytesIO(file.content)) + with create_file(file.content, binary_mode=True) as tmp_filename: + saved_file = File(tmp_filename) + + # when + with self.assertRaises(OperationNotSupported): + attr.assign([file]) + with self.assertRaises(OperationNotSupported): + attr.assign([saved_file]) + with self.assertRaises(OperationNotSupported): + attr.assign([stream]) @mock.patch("neptune.internal.utils.limits._LOGGED_IMAGE_SIZE_LIMIT_MB", (10**-3)) def test_image_limit(self): """Test if we prohibit logging images greater than mocked 1KB limit size""" # given path = self._random_path() - op_processor = MagicMock() - exp = self._create_run(processor=op_processor) - attr = FileSeries(exp, path) - - file = File.as_image(numpy.random.rand(100, 100) * 255) - with create_file(file.content, binary_mode=True) as tmp_filename: - saved_file = File(tmp_filename) - # when - with pytest.warns(expected_warning=UserWarning, match=".* Neptune supports logging images smaller than .*"): - attr.assign([file]) - with pytest.warns(expected_warning=UserWarning, match=".* Neptune supports logging images smaller than .*"): - attr.assign([saved_file]) + with self._exp() as exp: + attr = FileSeries(exp, path) + + file = File.as_image(numpy.random.rand(100, 100) * 255) + with create_file(file.content, binary_mode=True) as tmp_filename: + saved_file = File(tmp_filename) + + # when + with pytest.warns( + expected_warning=UserWarning, match=".* Neptune supports logging images smaller than .*" + ): + attr.assign([file]) + with pytest.warns( + expected_warning=UserWarning, match=".* Neptune supports logging images smaller than .*" + ): + attr.assign([saved_file]) diff --git a/tests/unit/neptune/new/attributes/series/test_float_series.py b/tests/unit/neptune/new/attributes/series/test_float_series.py index 442a1dc27..81a7d5378 100644 --- a/tests/unit/neptune/new/attributes/series/test_float_series.py +++ b/tests/unit/neptune/new/attributes/series/test_float_series.py @@ -37,17 +37,17 @@ def test_log_type_error(self): FloatSeries(MagicMock(), MagicMock()).log(value) def test_get(self): - exp, path = self._create_run(), self._random_path() - var = FloatSeries(exp, path) - var.log(5) - var.log(34) - self.assertEqual(34, var.fetch_last()) + with self._exp() as exp: + var = FloatSeries(exp, self._random_path()) + var.log(5) + var.log(34) + self.assertEqual(34, var.fetch_last()) def test_log(self): - exp, path = self._create_run(), self._random_path() - var = FloatSeries(exp, path) - var.log([val for val in range(0, 5000)]) - self.assertEqual(4999, var.fetch_last()) - values = list(var.fetch_values()["value"].array) - expected = list(range(0, 5000)) - self.assertEqual(len(set(expected)), len(set(values))) + with self._exp() as exp: + var = FloatSeries(exp, self._random_path()) + var.log([val for val in range(0, 5000)]) + self.assertEqual(4999, var.fetch_last()) + values = list(var.fetch_values()["value"].array) + expected = list(range(0, 5000)) + self.assertEqual(len(set(expected)), len(set(values))) diff --git a/tests/unit/neptune/new/attributes/series/test_series.py b/tests/unit/neptune/new/attributes/series/test_series.py index 6aa6bbfcb..5ae07204c 100644 --- a/tests/unit/neptune/new/attributes/series/test_series.py +++ b/tests/unit/neptune/new/attributes/series/test_series.py @@ -38,7 +38,8 @@ @patch("time.time", new=TestAttributeBase._now) class TestSeries(TestAttributeBase): - def test_assign(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_assign(self, get_operation_processor): value = FloatSeriesVal([17, 3.6], min=0, max=100, unit="%") expected = [ LogFloats.ValueType(17, None, self._now()), @@ -46,34 +47,38 @@ def test_assign(self): ] processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), + get_operation_processor.return_value = processor + path, wait = ( self._random_path(), self._random_wait(), ) - var = FloatSeries(exp, path) - var.assign(value, wait=wait) - self.assertEqual(3, processor.enqueue_operation.call_count) - processor.enqueue_operation.assert_has_calls( - [ - call(ConfigFloatSeries(path, min=0, max=100, unit="%"), wait=False), - call(ClearFloatLog(path), wait=False), - call(LogFloats(path, expected), wait=wait), - ] - ) + with self._exp() as exp: + var = FloatSeries(exp, path) + var.assign(value, wait=wait) + processor.enqueue_operation.assert_has_calls( + [ + call(ConfigFloatSeries(path, min=0, max=100, unit="%"), wait=False), + call(ClearFloatLog(path), wait=False), + call(LogFloats(path, expected), wait=wait), + ] + ) - def test_assign_empty(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_assign_empty(self, get_operation_processor): processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = StringSeries(exp, path) - var.assign(StringSeriesVal([]), wait=wait) - processor.enqueue_operation.assert_called_once_with(ClearStringLog(path), wait=wait) + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = StringSeries(exp, path) + var.assign(StringSeriesVal([]), wait=wait) + processor.enqueue_operation.assert_called_with(ClearStringLog(path), wait=wait) - def test_log(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_log(self, get_operation_processor): value_and_expected = [ (13, [LogFloats.ValueType(13, None, self._now())]), (15.3, [LogFloats.ValueType(15.3, None, self._now())]), @@ -105,16 +110,19 @@ def test_log(self): for value, expected in value_and_expected: processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = FloatSeries(exp, path) - var.log(value, wait=wait) - processor.enqueue_operation.assert_called_once_with(LogFloats(path, expected), wait=wait) + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = FloatSeries(exp, path) + var.log(value, wait=wait) + processor.enqueue_operation.assert_called_with(LogFloats(path, expected), wait=wait) - def test_log_with_step(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_log_with_step(self, get_operation_processor): value_step_and_expected = [ (13, 5.3, LogFloats.ValueType(13, 5.3, self._now())), (15.3, 10, LogFloats.ValueType(15.3, 10, self._now())), @@ -125,16 +133,19 @@ def test_log_with_step(self): for value, step, expected in value_step_and_expected: processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = FloatSeries(exp, path) - var.log(value, step=step, wait=wait) - processor.enqueue_operation.assert_called_once_with(LogFloats(path, [expected]), wait=wait) + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = FloatSeries(exp, path) + var.log(value, step=step, wait=wait) + processor.enqueue_operation.assert_called_with(LogFloats(path, [expected]), wait=wait) - def test_log_with_timestamp(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_log_with_timestamp(self, get_operation_processor): value_step_and_expected = [ (13, 5.3, LogFloats.ValueType(13, None, 5.3)), (15.3, 10, LogFloats.ValueType(15.3, None, 10)), @@ -142,36 +153,44 @@ def test_log_with_timestamp(self): for value, ts, expected in value_step_and_expected: processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = FloatSeries(exp, path) + var.log(value, timestamp=ts, wait=wait) + processor.enqueue_operation.assert_called_with(LogFloats(path, [expected]), wait=wait) + + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_log_value_errors(self, get_operation_processor): + processor = MagicMock() + get_operation_processor.return_value = processor + + with self._exp() as exp: + attr = FloatSeries(exp, self._random_path()) + + with self.assertRaises(ValueError): + attr.log(["str", 5]) + with self.assertRaises(ValueError): + attr.log([5, 10], step=10) + with self.assertRaises(TypeError): + attr.log(5, step="str") + with self.assertRaises(TypeError): + attr.log(5, timestamp="str") + + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_clear(self, get_operation_processor): + processor = MagicMock() + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( self._random_path(), self._random_wait(), ) var = FloatSeries(exp, path) - var.log(value, timestamp=ts, wait=wait) - processor.enqueue_operation.assert_called_once_with(LogFloats(path, [expected]), wait=wait) - - def test_log_value_errors(self): - processor = MagicMock() - exp, path = self._create_run(processor), self._random_path() - attr = FloatSeries(exp, path) - - with self.assertRaises(ValueError): - attr.log(["str", 5]) - with self.assertRaises(ValueError): - attr.log([5, 10], step=10) - with self.assertRaises(TypeError): - attr.log(5, step="str") - with self.assertRaises(TypeError): - attr.log(5, timestamp="str") - - def test_clear(self): - processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = FloatSeries(exp, path) - var.clear(wait=wait) - processor.enqueue_operation.assert_called_once_with(ClearFloatLog(path), wait=wait) + var.clear(wait=wait) + processor.enqueue_operation.assert_called_with(ClearFloatLog(path), wait=wait) diff --git a/tests/unit/neptune/new/attributes/series/test_string_series.py b/tests/unit/neptune/new/attributes/series/test_string_series.py index 4683fe93e..92806bf9a 100644 --- a/tests/unit/neptune/new/attributes/series/test_string_series.py +++ b/tests/unit/neptune/new/attributes/series/test_string_series.py @@ -31,17 +31,17 @@ def test_assign_type_error(self): StringSeries(MagicMock(), MagicMock()).assign(value) def test_get(self): - exp, path = self._create_run(), self._random_path() - var = StringSeries(exp, path) - var.log("asdfhadh") - var.log("hej!") - self.assertEqual("hej!", var.fetch_last()) + with self._exp() as exp: + var = StringSeries(exp, self._random_path()) + var.log("asdfhadh") + var.log("hej!") + self.assertEqual("hej!", var.fetch_last()) def test_log(self): - exp, path = self._create_run(), self._random_path() - var = StringSeries(exp, path) - var.log([str(val) for val in range(0, 5000)]) - self.assertEqual("4999", var.fetch_last()) - values = list(var.fetch_values()["value"].array) - expected = list(range(0, 5000)) - self.assertEqual(len(set(expected)), len(set(values))) + with self._exp() as exp: + var = StringSeries(exp, self._random_path()) + var.log([str(val) for val in range(0, 5000)]) + self.assertEqual("4999", var.fetch_last()) + values = list(var.fetch_values()["value"].array) + expected = list(range(0, 5000)) + self.assertEqual(len(set(expected)), len(set(values))) diff --git a/tests/unit/neptune/new/attributes/sets/test_file_set.py b/tests/unit/neptune/new/attributes/sets/test_file_set.py index 341c959df..e50613d48 100644 --- a/tests/unit/neptune/new/attributes/sets/test_file_set.py +++ b/tests/unit/neptune/new/attributes/sets/test_file_set.py @@ -15,7 +15,10 @@ # import os -from mock import MagicMock +from mock import ( + MagicMock, + patch, +) from neptune.attributes.file_set import FileSet from neptune.internal.operation import ( @@ -26,42 +29,51 @@ class TestFileSet(TestAttributeBase): - def test_assign(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_assign(self, get_operation_processor): globs = ["path1", "dir/", "glob/*.py"] expected = [os.path.abspath(glob) for glob in globs] processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = FileSet(exp, path) - var.assign(globs, wait=wait) - processor.enqueue_operation.assert_called_once_with(UploadFileSet(path, expected, reset=True), wait=wait) + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = FileSet(exp, path) + var.assign(globs, wait=wait) + processor.enqueue_operation.assert_called_with(UploadFileSet(path, expected, reset=True), wait=wait) - def test_upload_files(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_upload_files(self, get_operation_processor): globs = ["path1", "dir/", "glob/*.py"] processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = FileSet(exp, path) - var.upload_files(globs, wait=wait) - processor.enqueue_operation.assert_called_once_with( - UploadFileSet(path, [os.path.abspath(glob) for glob in globs], reset=False), - wait=wait, - ) + get_operation_processor.return_value = processor - def test_delete_files(self): + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = FileSet(exp, path) + var.upload_files(globs, wait=wait) + processor.enqueue_operation.assert_called_with( + UploadFileSet(path, [os.path.abspath(glob) for glob in globs], reset=False), + wait=wait, + ) + + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_delete_files(self, get_operation_processor): processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = FileSet(exp, path) - var.delete_files(["path1", "dir/"], wait=wait) - processor.enqueue_operation.assert_called_once_with(DeleteFiles(path, {"path1", "dir/"}), wait=wait) + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = FileSet(exp, path) + var.delete_files(["path1", "dir/"], wait=wait) + processor.enqueue_operation.assert_called_with(DeleteFiles(path, {"path1", "dir/"}), wait=wait) diff --git a/tests/unit/neptune/new/attributes/sets/test_string_set.py b/tests/unit/neptune/new/attributes/sets/test_string_set.py index 1ec329c1f..d756ca155 100644 --- a/tests/unit/neptune/new/attributes/sets/test_string_set.py +++ b/tests/unit/neptune/new/attributes/sets/test_string_set.py @@ -16,6 +16,7 @@ from mock import ( MagicMock, call, + patch, ) from neptune.attributes.sets.string_set import ( @@ -31,33 +32,38 @@ class TestStringSet(TestAttributeBase): - def test_assign(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_assign(self, get_operation_processor): value = StringSetVal(["ert", "qwe"]) expected = {"ert", "qwe"} processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = StringSet(exp, path) - var.assign(value, wait=wait) - self.assertEqual(2, processor.enqueue_operation.call_count) - processor.enqueue_operation.assert_has_calls( - [call(ClearStringSet(path), wait=False), call(AddStrings(path, expected), wait=wait)] - ) - - def test_assign_empty(self): + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = StringSet(exp, path) + var.assign(value, wait=wait) + processor.enqueue_operation.assert_has_calls( + [call(ClearStringSet(path), wait=False), call(AddStrings(path, expected), wait=wait)] + ) + + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_assign_empty(self, get_operation_processor): processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = StringSet(exp, path) - var.assign(StringSetVal([]), wait=wait) - processor.enqueue_operation.assert_called_once_with(ClearStringSet(path), wait=wait) + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = StringSet(exp, path) + var.assign(StringSetVal([]), wait=wait) + processor.enqueue_operation.assert_called_with(ClearStringSet(path), wait=wait) def test_assign_type_error(self): values = [{5.0}, {"text"}, {}, [5.0], ["text"], [], 55, "string", None] @@ -65,65 +71,80 @@ def test_assign_type_error(self): with self.assertRaises(TypeError): StringSet(MagicMock(), MagicMock()).assign(value) - def test_add(self): + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_add(self, get_operation_processor): processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = StringSet(exp, path) - var.add(["a", "bb", "ccc"], wait=wait) - processor.enqueue_operation.assert_called_once_with(AddStrings(path, {"a", "bb", "ccc"}), wait=wait) - - def test_add_single_value(self): + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = StringSet(exp, path) + var.add(["a", "bb", "ccc"], wait=wait) + processor.enqueue_operation.assert_called_with(AddStrings(path, {"a", "bb", "ccc"}), wait=wait) + + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_add_single_value(self, get_operation_processor): processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = StringSet(exp, path) - var.add("ccc", wait=wait) - processor.enqueue_operation.assert_called_once_with(AddStrings(path, {"ccc"}), wait=wait) - - def test_remove(self): + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = StringSet(exp, path) + var.add("ccc", wait=wait) + processor.enqueue_operation.assert_called_with(AddStrings(path, {"ccc"}), wait=wait) + + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_remove(self, get_operation_processor): processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = StringSet(exp, path) - var.remove(["a", "bb", "ccc"], wait=wait) - processor.enqueue_operation.assert_called_once_with(RemoveStrings(path, {"a", "bb", "ccc"}), wait=wait) - - def test_remove_single_value(self): + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = StringSet(exp, path) + var.remove(["a", "bb", "ccc"], wait=wait) + processor.enqueue_operation.assert_called_with(RemoveStrings(path, {"a", "bb", "ccc"}), wait=wait) + + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_remove_single_value(self, get_operation_processor): processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = StringSet(exp, path) - var.remove("bb", wait=wait) - processor.enqueue_operation.assert_called_once_with(RemoveStrings(path, {"bb"}), wait=wait) - - def test_clear(self): + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = StringSet(exp, path) + var.remove("bb", wait=wait) + processor.enqueue_operation.assert_called_with(RemoveStrings(path, {"bb"}), wait=wait) + + @patch("neptune.metadata_containers.metadata_container.get_operation_processor") + def test_clear(self, get_operation_processor): processor = MagicMock() - exp, path, wait = ( - self._create_run(processor), - self._random_path(), - self._random_wait(), - ) - var = StringSet(exp, path) - var.clear(wait=wait) - processor.enqueue_operation.assert_called_once_with(ClearStringSet(path), wait=wait) + get_operation_processor.return_value = processor + + with self._exp() as exp: + path, wait = ( + self._random_path(), + self._random_wait(), + ) + var = StringSet(exp, path) + var.clear(wait=wait) + processor.enqueue_operation.assert_called_with(ClearStringSet(path), wait=wait) def test_get(self): - exp, path = self._create_run(), self._random_path() - var = StringSet(exp, path) - var.add(["abc", "xyz"]) - var.remove(["abc"]) - var.add(["hej", "lol"]) - self.assertEqual({"xyz", "hej", "lol"}, var.fetch()) + with self._exp() as exp: + var = StringSet(exp, self._random_path()) + var.add(["abc", "xyz"]) + var.remove(["abc"]) + var.add(["hej", "lol"]) + self.assertEqual({"xyz", "hej", "lol"}, var.fetch()) diff --git a/tests/unit/neptune/new/attributes/test_attribute_base.py b/tests/unit/neptune/new/attributes/test_attribute_base.py index 34ee433c7..2e8574b35 100644 --- a/tests/unit/neptune/new/attributes/test_attribute_base.py +++ b/tests/unit/neptune/new/attributes/test_attribute_base.py @@ -14,48 +14,28 @@ # limitations under the License. # import random -import threading import time import unittest import uuid -from typing import Optional +from contextlib import contextmanager -from mock import MagicMock - -from neptune.internal.backends.neptune_backend_mock import NeptuneBackendMock -from neptune.internal.container_type import ContainerType -from neptune.internal.id_formats import UniqueId -from neptune.internal.operation_processors.operation_processor import OperationProcessor -from neptune.internal.operation_processors.sync_operation_processor import SyncOperationProcessor -from neptune.metadata_containers import Run -from neptune.types.mode import Mode +from neptune import Run _now = time.time() class TestAttributeBase(unittest.TestCase): - # TODO: test Projects, Model and ModelVersion @staticmethod - def _create_run(processor: Optional[OperationProcessor] = None): - backend = NeptuneBackendMock() - exp = backend.create_run(UniqueId(str(uuid.uuid4()))) - if processor is None: - processor = SyncOperationProcessor(exp.id, ContainerType.RUN, backend) - _run = Run( - id_=exp.id, - mode=Mode.SYNC, - backend=backend, - op_processor=processor, - background_job=MagicMock(), - lock=threading.RLock(), - workspace=MagicMock(), - project_id=MagicMock(), - project_name=MagicMock(), - sys_id=MagicMock(), - ) - _run.sync() - _run.start() - return _run + @contextmanager + def _exp(): + with Run( + mode="debug", + capture_stderr=False, + capture_traceback=False, + capture_stdout=False, + capture_hardware_metrics=False, + ) as exp: + yield exp @staticmethod def _random_path(): diff --git a/tests/unit/neptune/new/client/test_run.py b/tests/unit/neptune/new/client/test_run.py index fd0697288..8a3d18508 100644 --- a/tests/unit/neptune/new/client/test_run.py +++ b/tests/unit/neptune/new/client/test_run.py @@ -94,7 +94,7 @@ def test_resume(self): self.assertIsInstance(exp.get_structure()["test"], String) @patch("neptune.internal.utils.source_code.get_path_executed_script", lambda: "main.py") - @patch("neptune.internal.init.run.os.path.isfile", new=lambda file: "." in file) + @patch("neptune.metadata_containers.run.os.path.isfile", new=lambda file: "." in file) @patch( "neptune.internal.utils.glob", new=lambda path, recursive=False: [path.replace("*", "file.txt")], @@ -140,35 +140,35 @@ def test_entrypoint_in_interactive_python(self): with self.assertRaises(MissingFieldException): exp["source_code/entrypoint"].fetch() - @patch("neptune.internal.init.run.in_interactive", new=lambda: True) - @patch("neptune.internal.init.run.TracebackJob") - @patch("neptune.internal.init.run.HardwareMetricReportingJob") - @patch("neptune.internal.init.run.StderrCaptureBackgroundJob") - @patch("neptune.internal.init.run.StdoutCaptureBackgroundJob") + @patch("neptune.metadata_containers.run.in_interactive", new=lambda: True) + @patch("neptune.metadata_containers.run.TracebackJob") + @patch("neptune.metadata_containers.run.HardwareMetricReportingJob") + @patch("neptune.metadata_containers.run.StderrCaptureBackgroundJob") + @patch("neptune.metadata_containers.run.StdoutCaptureBackgroundJob") def test_monitoring_disabled_in_interactive_python(self, stdout_job, stderr_job, hardware_job, traceback_job): with init_run(mode="debug", monitoring_namespace="monitoring"): assert not stdout_job.called assert not stderr_job.called assert not hardware_job.called - traceback_job.assert_called_once_with("monitoring/traceback", True) + traceback_job.assert_called_once_with(path="monitoring/traceback", fail_on_exception=True) - @patch("neptune.internal.init.run.in_interactive", new=lambda: False) - @patch("neptune.internal.init.run.TracebackJob") - @patch("neptune.internal.init.run.HardwareMetricReportingJob") - @patch("neptune.internal.init.run.StderrCaptureBackgroundJob") - @patch("neptune.internal.init.run.StdoutCaptureBackgroundJob") + @patch("neptune.metadata_containers.run.in_interactive", new=lambda: False) + @patch("neptune.metadata_containers.run.TracebackJob") + @patch("neptune.metadata_containers.run.HardwareMetricReportingJob") + @patch("neptune.metadata_containers.run.StderrCaptureBackgroundJob") + @patch("neptune.metadata_containers.run.StdoutCaptureBackgroundJob") def test_monitoring_enabled_in_non_interactive_python(self, stdout_job, stderr_job, hardware_job, traceback_job): with init_run(mode="debug", monitoring_namespace="monitoring"): stdout_job.assert_called_once_with(attribute_name="monitoring/stdout") stderr_job.assert_called_once_with(attribute_name="monitoring/stderr") hardware_job.assert_called_once_with(attribute_namespace="monitoring") - traceback_job.assert_called_once_with("monitoring/traceback", True) + traceback_job.assert_called_once_with(path="monitoring/traceback", fail_on_exception=True) - @patch("neptune.internal.init.run.in_interactive", new=lambda: True) - @patch("neptune.internal.init.run.TracebackJob") - @patch("neptune.internal.init.run.HardwareMetricReportingJob") - @patch("neptune.internal.init.run.StderrCaptureBackgroundJob") - @patch("neptune.internal.init.run.StdoutCaptureBackgroundJob") + @patch("neptune.metadata_containers.run.in_interactive", new=lambda: True) + @patch("neptune.metadata_containers.run.TracebackJob") + @patch("neptune.metadata_containers.run.HardwareMetricReportingJob") + @patch("neptune.metadata_containers.run.StderrCaptureBackgroundJob") + @patch("neptune.metadata_containers.run.StdoutCaptureBackgroundJob") def test_monitoring_in_interactive_explicitly_enabled(self, stdout_job, stderr_job, hardware_job, traceback_job): with init_run( mode="debug", @@ -180,11 +180,11 @@ def test_monitoring_in_interactive_explicitly_enabled(self, stdout_job, stderr_j stdout_job.assert_called_once_with(attribute_name="monitoring/stdout") stderr_job.assert_called_once_with(attribute_name="monitoring/stderr") hardware_job.assert_called_once_with(attribute_namespace="monitoring") - traceback_job.assert_called_once_with("monitoring/traceback", True) + traceback_job.assert_called_once_with(path="monitoring/traceback", fail_on_exception=True) @patch("neptune.internal.utils.source_code.get_path_executed_script", lambda: "main.py") @patch("neptune.internal.utils.source_code.get_common_root", new=lambda _: None) - @patch("neptune.internal.init.run.os.path.isfile", new=lambda file: "." in file) + @patch("neptune.metadata_containers.run.os.path.isfile", new=lambda file: "." in file) @patch( "neptune.internal.utils.glob", new=lambda path, recursive=False: [path.replace("*", "file.txt")], @@ -200,22 +200,22 @@ def test_entrypoint_without_common_root(self): with init_run(mode="debug", source_files=["internal/*"]) as exp: self.assertEqual(exp["source_code/entrypoint"].fetch(), "/home/user/main_dir/main.py") - @patch("neptune.internal.init.run.generate_hash", lambda *vals, length: "some_hash") - @patch("neptune.internal.init.run.TracebackJob") - @patch("neptune.internal.init.run.HardwareMetricReportingJob") - @patch("neptune.internal.init.run.StderrCaptureBackgroundJob") - @patch("neptune.internal.init.run.StdoutCaptureBackgroundJob") + @patch("neptune.metadata_containers.run.generate_hash", lambda *vals, length: "some_hash") + @patch("neptune.metadata_containers.run.TracebackJob") + @patch("neptune.metadata_containers.run.HardwareMetricReportingJob") + @patch("neptune.metadata_containers.run.StderrCaptureBackgroundJob") + @patch("neptune.metadata_containers.run.StdoutCaptureBackgroundJob") def test_monitoring_namespace_based_on_hash(self, stdout_job, stderr_job, hardware_job, traceback_job): with init_run(mode="debug"): stdout_job.assert_called_once_with(attribute_name="monitoring/some_hash/stdout") stderr_job.assert_called_once_with(attribute_name="monitoring/some_hash/stderr") hardware_job.assert_called_once_with(attribute_namespace="monitoring/some_hash") - traceback_job.assert_called_once_with("monitoring/some_hash/traceback", True) + traceback_job.assert_called_once_with(path="monitoring/some_hash/traceback", fail_on_exception=True) - @patch("neptune.internal.init.run.generate_hash", lambda *vals, length: "some_hash") - @patch("neptune.internal.init.run.get_hostname", lambda *vals: "localhost") - @patch("neptune.internal.init.run.os.getpid", lambda *vals: 1234) - @patch("neptune.internal.init.run.threading.get_ident", lambda: 56789) + @patch("neptune.metadata_containers.run.generate_hash", lambda *vals, length: "some_hash") + @patch("neptune.metadata_containers.run.get_hostname", lambda *vals: "localhost") + @patch("neptune.metadata_containers.run.os.getpid", lambda *vals: 1234) + @patch("neptune.metadata_containers.run.threading.get_ident", lambda: 56789) def test_that_hostname_and_process_info_were_logged(self): with init_run(mode="debug") as exp: assert exp["monitoring/some_hash/hostname"].fetch() == "localhost" diff --git a/tests/unit/neptune/new/test_log_handler.py b/tests/unit/neptune/new/test_log_handler.py index 06f62c553..fcf4cecb3 100644 --- a/tests/unit/neptune/new/test_log_handler.py +++ b/tests/unit/neptune/new/test_log_handler.py @@ -30,7 +30,7 @@ from neptune.integrations.python_logger import NeptuneHandler -@patch("neptune.internal.init.run.generate_hash", lambda *vals, length: "some_hash") +@patch("neptune.metadata_containers.run.generate_hash", lambda *vals, length: "some_hash") class TestLogHandler(unittest.TestCase): @classmethod def setUpClass(cls) -> None: