From 4c771b7be54b8ca4e112516d7101bf8f0fc21218 Mon Sep 17 00:00:00 2001 From: wajcha Date: Wed, 14 Jun 2023 10:39:06 +0200 Subject: [PATCH 1/5] Fix SSL errors after forking process --- src/neptune/common/backends/utils.py | 6 ++++- .../async_operation_processor.py | 17 ++++---------- .../metadata_containers/metadata_container.py | 22 +++++++++++++++++++ 3 files changed, 31 insertions(+), 14 deletions(-) diff --git a/src/neptune/common/backends/utils.py b/src/neptune/common/backends/utils.py index b814cad4c..ab1c70b2f 100644 --- a/src/neptune/common/backends/utils.py +++ b/src/neptune/common/backends/utils.py @@ -18,6 +18,7 @@ import itertools import logging import os +import ssl import time import requests @@ -68,7 +69,10 @@ def wrapper(*args, **kwargs): raise NeptuneInvalidApiTokenException() raise except requests.exceptions.SSLError as e: - raise NeptuneSSLVerificationError() from e + if retry == 0: + continue + else: + raise NeptuneSSLVerificationError() from e except ( BravadoConnectionError, BravadoTimeoutError, diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index db150d9e8..273fbd8d7 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -17,6 +17,7 @@ import logging import os +import random import sys import threading from datetime import datetime @@ -73,35 +74,25 @@ def __init__( self._container_id = container_id self._container_type = container_type self._backend = backend + self._lock = lock + self._sleep_time = sleep_time self._batch_size = batch_size self._last_version = 0 self._consumed_version = 0 self._consumer = self.ConsumerThread(self, sleep_time, batch_size) - self._drop_operations = False # Caller is responsible for taking this lock self._waiting_cond = threading.Condition(lock=lock) - if sys.version_info >= (3, 7): - try: - os.register_at_fork(after_in_child=self._handle_fork_in_child) - except AttributeError: - pass - @staticmethod def _init_data_path(container_id: UniqueId, container_type: ContainerType): now = datetime.now() container_dir = f"{NEPTUNE_DATA_DIRECTORY}/{ASYNC_DIRECTORY}/{container_type.create_dir_name(container_id)}" - data_path = f"{container_dir}/exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}" + data_path = f"{container_dir}/exec-{os.getpid()}-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}" data_path = data_path.replace(" ", "_").replace(":", ".") return data_path - def _handle_fork_in_child(self): - self._drop_operations = True - def enqueue_operation(self, op: Operation, *, wait: bool) -> None: - if self._drop_operations: - return self._last_version = self._queue.put(op) if self._queue.size() > self._batch_size / 2: self._consumer.wake_up() diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index 9bee5f1fb..ead682f23 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -18,6 +18,8 @@ import abc import atexit import itertools +import os +import sys import threading import time import traceback @@ -107,6 +109,7 @@ def __init__( verify_type("proxies", proxies, (dict, type(None))) self._mode: Mode = mode + self._flush_period = flush_period self._lock: threading.RLock = threading.RLock() self._state: ContainerState = ContainerState.CREATED @@ -143,6 +146,25 @@ def __init__( self._startup(debug_mode=mode == Mode.DEBUG) + if sys.version_info >= (3, 7): + try: + os.register_at_fork(after_in_child=self._handle_fork_in_child) + except AttributeError: + pass + + def _handle_fork_in_child(self): + self._op_processor: OperationProcessor = get_operation_processor( + mode=self._mode, + container_id=self._id, + container_type=self.container_type, + backend=self._backend, + lock=self._lock, + flush_period=self._flush_period, + ) + self._bg_job = BackgroundJobList([]) + if self._state == ContainerState.STARTED: + self._op_processor.start() + def _prepare_background_jobs_if_non_read_only(self) -> BackgroundJobList: if self._mode != Mode.READ_ONLY: return self._prepare_background_jobs() From 76c0033e16b413fbe5eb4ccfb870d9ab2b22bc4e Mon Sep 17 00:00:00 2001 From: wajcha Date: Wed, 14 Jun 2023 11:07:25 +0200 Subject: [PATCH 2/5] Fix --- CHANGELOG.md | 1 + src/neptune/common/backends/utils.py | 18 +++++++++++++--- .../async_operation_processor.py | 2 -- .../metadata_containers/metadata_container.py | 21 ++++++++++++++++++- 4 files changed, 36 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c825c066..b3e741500 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Fixes - Added support of project visibility exception ([#1343](https://github.com/neptune-ai/neptune-client/pull/1343)) +- Fix SSL errors after forking process ([#1353](https://github.com/neptune-ai/neptune-client/pull/1353)) ### Changes - Added support of active projects limit exception ([#1348](https://github.com/neptune-ai/neptune-client/pull/1348)) diff --git a/src/neptune/common/backends/utils.py b/src/neptune/common/backends/utils.py index ab1c70b2f..e8d31a98d 100644 --- a/src/neptune/common/backends/utils.py +++ b/src/neptune/common/backends/utils.py @@ -56,6 +56,7 @@ def with_api_exceptions_handler(func): def wrapper(*args, **kwargs): + ssl_error_occurred = False last_exception = None start_time = time.monotonic() for retry in itertools.count(0): @@ -69,10 +70,21 @@ def wrapper(*args, **kwargs): raise NeptuneInvalidApiTokenException() raise except requests.exceptions.SSLError as e: - if retry == 0: + """ + OpenSSL's internal random number generator does not properly handle forked processes. + Applications must change the PRNG state of the parent process + if they use any SSL feature with os.fork(). + Any successful call of RAND_add(), RAND_bytes() or RAND_pseudo_bytes() is sufficient. + https://docs.python.org/3/library/ssl.html#multi-processing + + On Linux it looks like it does not help much but does not break anything either. + But single retry seems to solve the issue. + """ + if not ssl_error_occurred: + ssl_error_occurred = True + ssl.RAND_bytes(100) continue - else: - raise NeptuneSSLVerificationError() from e + raise NeptuneSSLVerificationError() from e except ( BravadoConnectionError, BravadoTimeoutError, diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 273fbd8d7..0d7131ac1 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -17,8 +17,6 @@ import logging import os -import random -import sys import threading from datetime import datetime from time import ( diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index ead682f23..ee425ce2a 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -19,6 +19,7 @@ import atexit import itertools import os +import ssl import sys import threading import time @@ -148,11 +149,26 @@ def __init__( if sys.version_info >= (3, 7): try: - os.register_at_fork(after_in_child=self._handle_fork_in_child) + os.register_at_fork( + after_in_child=self._handle_fork_in_child, after_in_parent=self._handle_fork_in_parent + ) except AttributeError: pass + """ + OpenSSL's internal random number generator does not properly handle forked processes. + Applications must change the PRNG state of the parent process if they use any SSL feature with os.fork(). + Any successful call of RAND_add(), RAND_bytes() or RAND_pseudo_bytes() is sufficient. + https://docs.python.org/3/library/ssl.html#multi-processing + + On Linux it looks like it does not help much but does not break anything either. + """ + + def _handle_fork_in_parent(self): + ssl.RAND_bytes(100) + def _handle_fork_in_child(self): + ssl.RAND_bytes(100) self._op_processor: OperationProcessor = get_operation_processor( mode=self._mode, container_id=self._id, @@ -161,7 +177,10 @@ def _handle_fork_in_child(self): lock=self._lock, flush_period=self._flush_period, ) + + # TODO: Every implementation of background job should handle fork by itself. self._bg_job = BackgroundJobList([]) + if self._state == ContainerState.STARTED: self._op_processor.start() From e7c46aa7eeb495bd8f1fb01e8414ecb4fae907a7 Mon Sep 17 00:00:00 2001 From: wajcha Date: Wed, 14 Jun 2023 12:50:18 +0200 Subject: [PATCH 3/5] Review fixes --- src/neptune/common/backends/utils.py | 3 ++- src/neptune/common/utils.py | 5 +++++ .../async_operation_processor.py | 4 +--- .../metadata_containers/metadata_container.py | 13 ++++--------- 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/neptune/common/backends/utils.py b/src/neptune/common/backends/utils.py index e8d31a98d..0761cc9fa 100644 --- a/src/neptune/common/backends/utils.py +++ b/src/neptune/common/backends/utils.py @@ -35,6 +35,7 @@ HTTPTooManyRequests, HTTPUnauthorized, ) +from neptune.common.utils import reset_internal_ssl_state from urllib3.exceptions import NewConnectionError from neptune.common.envs import NEPTUNE_RETRIES_TIMEOUT_ENV @@ -82,7 +83,7 @@ def wrapper(*args, **kwargs): """ if not ssl_error_occurred: ssl_error_occurred = True - ssl.RAND_bytes(100) + reset_internal_ssl_state() continue raise NeptuneSSLVerificationError() from e except ( diff --git a/src/neptune/common/utils.py b/src/neptune/common/utils.py index 6fe71bd62..ba91cd36f 100644 --- a/src/neptune/common/utils.py +++ b/src/neptune/common/utils.py @@ -20,6 +20,7 @@ import math import os import re +import ssl import sys import numpy as np @@ -43,6 +44,10 @@ IS_MACOS = sys.platform == "darwin" +def reset_internal_ssl_state(): + ssl.RAND_bytes(100) + + def map_values(f_value, dictionary): return dict((k, f_value(v)) for k, v in dictionary.items()) diff --git a/src/neptune/internal/operation_processors/async_operation_processor.py b/src/neptune/internal/operation_processors/async_operation_processor.py index 0d7131ac1..61f4f504e 100644 --- a/src/neptune/internal/operation_processors/async_operation_processor.py +++ b/src/neptune/internal/operation_processors/async_operation_processor.py @@ -72,8 +72,6 @@ def __init__( self._container_id = container_id self._container_type = container_type self._backend = backend - self._lock = lock - self._sleep_time = sleep_time self._batch_size = batch_size self._last_version = 0 self._consumed_version = 0 @@ -86,7 +84,7 @@ def __init__( def _init_data_path(container_id: UniqueId, container_type: ContainerType): now = datetime.now() container_dir = f"{NEPTUNE_DATA_DIRECTORY}/{ASYNC_DIRECTORY}/{container_type.create_dir_name(container_id)}" - data_path = f"{container_dir}/exec-{os.getpid()}-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}" + data_path = f"{container_dir}/exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}" data_path = data_path.replace(" ", "_").replace(":", ".") return data_path diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index ee425ce2a..c992b3bb5 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -40,6 +40,7 @@ from neptune.attributes.namespace import Namespace as NamespaceAttr from neptune.attributes.namespace import NamespaceBuilder from neptune.common.exceptions import UNIX_STYLES +from neptune.common.utils import reset_internal_ssl_state from neptune.common.warnings import warn_about_unsupported_type from neptune.exceptions import ( MetadataInconsistency, @@ -147,13 +148,7 @@ def __init__( self._startup(debug_mode=mode == Mode.DEBUG) - if sys.version_info >= (3, 7): - try: - os.register_at_fork( - after_in_child=self._handle_fork_in_child, after_in_parent=self._handle_fork_in_parent - ) - except AttributeError: - pass + os.register_at_fork(after_in_child=self._handle_fork_in_child, after_in_parent=self._handle_fork_in_parent) """ OpenSSL's internal random number generator does not properly handle forked processes. @@ -165,10 +160,10 @@ def __init__( """ def _handle_fork_in_parent(self): - ssl.RAND_bytes(100) + reset_internal_ssl_state() def _handle_fork_in_child(self): - ssl.RAND_bytes(100) + reset_internal_ssl_state() self._op_processor: OperationProcessor = get_operation_processor( mode=self._mode, container_id=self._id, From 1910634689feb2723b804da76c7807b139f3b327 Mon Sep 17 00:00:00 2001 From: wajcha Date: Wed, 14 Jun 2023 13:03:15 +0200 Subject: [PATCH 4/5] Review fix 2 --- src/neptune/common/utils.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/neptune/common/utils.py b/src/neptune/common/utils.py index ba91cd36f..c788f7afa 100644 --- a/src/neptune/common/utils.py +++ b/src/neptune/common/utils.py @@ -45,6 +45,12 @@ def reset_internal_ssl_state(): + """ + OpenSSL's internal random number generator does not properly handle forked processes. + Applications must change the PRNG state of the parent process if they use any SSL feature with os.fork(). + Any successful call of RAND_add(), RAND_bytes() or RAND_pseudo_bytes() is sufficient. + https://docs.python.org/3/library/ssl.html#multi-processing + """ ssl.RAND_bytes(100) From ab0b851d7e10a2976172abf1cbfff5247a695344 Mon Sep 17 00:00:00 2001 From: wajcha Date: Wed, 14 Jun 2023 13:22:03 +0200 Subject: [PATCH 5/5] Fix imports --- src/neptune/common/backends/utils.py | 3 +-- src/neptune/metadata_containers/metadata_container.py | 2 -- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/neptune/common/backends/utils.py b/src/neptune/common/backends/utils.py index 0761cc9fa..ed3ee6943 100644 --- a/src/neptune/common/backends/utils.py +++ b/src/neptune/common/backends/utils.py @@ -18,7 +18,6 @@ import itertools import logging import os -import ssl import time import requests @@ -35,7 +34,6 @@ HTTPTooManyRequests, HTTPUnauthorized, ) -from neptune.common.utils import reset_internal_ssl_state from urllib3.exceptions import NewConnectionError from neptune.common.envs import NEPTUNE_RETRIES_TIMEOUT_ENV @@ -48,6 +46,7 @@ NeptuneSSLVerificationError, Unauthorized, ) +from neptune.common.utils import reset_internal_ssl_state _logger = logging.getLogger(__name__) diff --git a/src/neptune/metadata_containers/metadata_container.py b/src/neptune/metadata_containers/metadata_container.py index c992b3bb5..cbba0e85a 100644 --- a/src/neptune/metadata_containers/metadata_container.py +++ b/src/neptune/metadata_containers/metadata_container.py @@ -19,8 +19,6 @@ import atexit import itertools import os -import ssl -import sys import threading import time import traceback