Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix SSL errors after forking process #1353

Merged
merged 5 commits into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

### Fixes
- Added support of project visibility exception ([#1343](https://github.com/neptune-ai/neptune-client/pull/1343))
- Fix SSL errors after forking process ([#1353](https://github.com/neptune-ai/neptune-client/pull/1353))

### Changes
- Added support of active projects limit exception ([#1348](https://github.com/neptune-ai/neptune-client/pull/1348))
Expand Down
16 changes: 16 additions & 0 deletions src/neptune/common/backends/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
NeptuneSSLVerificationError,
Unauthorized,
)
from neptune.common.utils import reset_internal_ssl_state

_logger = logging.getLogger(__name__)

Expand All @@ -55,6 +56,7 @@

def with_api_exceptions_handler(func):
def wrapper(*args, **kwargs):
ssl_error_occurred = False
last_exception = None
start_time = time.monotonic()
for retry in itertools.count(0):
Expand All @@ -68,6 +70,20 @@ def wrapper(*args, **kwargs):
raise NeptuneInvalidApiTokenException()
raise
except requests.exceptions.SSLError as e:
"""
OpenSSL's internal random number generator does not properly handle forked processes.
Applications must change the PRNG state of the parent process
if they use any SSL feature with os.fork().
Any successful call of RAND_add(), RAND_bytes() or RAND_pseudo_bytes() is sufficient.
https://docs.python.org/3/library/ssl.html#multi-processing

On Linux it looks like it does not help much but does not break anything either.
But single retry seems to solve the issue.
"""
if not ssl_error_occurred:
ssl_error_occurred = True
reset_internal_ssl_state()
continue
raise NeptuneSSLVerificationError() from e
except (
BravadoConnectionError,
Expand Down
11 changes: 11 additions & 0 deletions src/neptune/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import math
import os
import re
import ssl
import sys

import numpy as np
Expand All @@ -43,6 +44,16 @@
IS_MACOS = sys.platform == "darwin"


def reset_internal_ssl_state():
"""
OpenSSL's internal random number generator does not properly handle forked processes.
Applications must change the PRNG state of the parent process if they use any SSL feature with os.fork().
Any successful call of RAND_add(), RAND_bytes() or RAND_pseudo_bytes() is sufficient.
https://docs.python.org/3/library/ssl.html#multi-processing
"""
ssl.RAND_bytes(100)


def map_values(f_value, dictionary):
return dict((k, f_value(v)) for k, v in dictionary.items())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import logging
import os
import sys
import threading
from datetime import datetime
from time import (
Expand Down Expand Up @@ -77,31 +76,19 @@ def __init__(
self._last_version = 0
self._consumed_version = 0
self._consumer = self.ConsumerThread(self, sleep_time, batch_size)
self._drop_operations = False

# Caller is responsible for taking this lock
self._waiting_cond = threading.Condition(lock=lock)

if sys.version_info >= (3, 7):
try:
os.register_at_fork(after_in_child=self._handle_fork_in_child)
except AttributeError:
pass

@staticmethod
def _init_data_path(container_id: UniqueId, container_type: ContainerType):
now = datetime.now()
container_dir = f"{NEPTUNE_DATA_DIRECTORY}/{ASYNC_DIRECTORY}/{container_type.create_dir_name(container_id)}"
data_path = f"{container_dir}/exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}"
data_path = f"{container_dir}/exec-{now.timestamp()}-{now.strftime('%Y-%m-%d_%H.%M.%S.%f')}-{os.getpid()}"
data_path = data_path.replace(" ", "_").replace(":", ".")
return data_path

def _handle_fork_in_child(self):
self._drop_operations = True

def enqueue_operation(self, op: Operation, *, wait: bool) -> None:
if self._drop_operations:
return
self._last_version = self._queue.put(op)
if self._queue.size() > self._batch_size / 2:
self._consumer.wake_up()
Expand Down
34 changes: 34 additions & 0 deletions src/neptune/metadata_containers/metadata_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import abc
import atexit
import itertools
import os
import threading
import time
import traceback
Expand All @@ -37,6 +38,7 @@
from neptune.attributes.namespace import Namespace as NamespaceAttr
from neptune.attributes.namespace import NamespaceBuilder
from neptune.common.exceptions import UNIX_STYLES
from neptune.common.utils import reset_internal_ssl_state
from neptune.common.warnings import warn_about_unsupported_type
from neptune.exceptions import (
MetadataInconsistency,
Expand Down Expand Up @@ -107,6 +109,7 @@ def __init__(
verify_type("proxies", proxies, (dict, type(None)))

self._mode: Mode = mode
self._flush_period = flush_period
self._lock: threading.RLock = threading.RLock()
self._state: ContainerState = ContainerState.CREATED

Expand Down Expand Up @@ -143,6 +146,37 @@ def __init__(

self._startup(debug_mode=mode == Mode.DEBUG)

os.register_at_fork(after_in_child=self._handle_fork_in_child, after_in_parent=self._handle_fork_in_parent)
Raalsky marked this conversation as resolved.
Show resolved Hide resolved

"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this comment to reset_internal_ssl_state as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

OpenSSL's internal random number generator does not properly handle forked processes.
Raalsky marked this conversation as resolved.
Show resolved Hide resolved
Applications must change the PRNG state of the parent process if they use any SSL feature with os.fork().
Any successful call of RAND_add(), RAND_bytes() or RAND_pseudo_bytes() is sufficient.
https://docs.python.org/3/library/ssl.html#multi-processing

On Linux it looks like it does not help much but does not break anything either.
"""

def _handle_fork_in_parent(self):
reset_internal_ssl_state()

def _handle_fork_in_child(self):
reset_internal_ssl_state()
self._op_processor: OperationProcessor = get_operation_processor(
mode=self._mode,
container_id=self._id,
container_type=self.container_type,
backend=self._backend,
lock=self._lock,
flush_period=self._flush_period,
)

# TODO: Every implementation of background job should handle fork by itself.
self._bg_job = BackgroundJobList([])

if self._state == ContainerState.STARTED:
self._op_processor.start()

def _prepare_background_jobs_if_non_read_only(self) -> BackgroundJobList:
if self._mode != Mode.READ_ONLY:
return self._prepare_background_jobs()
Expand Down
Loading