Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix shutdown and cleanup behavior #772

Merged
merged 22 commits into from
Apr 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ jobs:
- name: Run the tests
if: ${{ !startsWith(matrix.python-version, 'pypy') && !startsWith(matrix.os, 'windows') }}
run: |
args="-vv --cov jupyter_client --cov-branch --cov-report term-missing:skip-covered --cov-fail-under 70"
python -m pytest $args || python -m pytest $args --lf
args="-vv --cov jupyter_client --cov-branch --cov-report term-missing:skip-covered"
python -m pytest $args --cov-fail-under 70 || python -m pytest $args --lf
- name: Run the tests on pypy and windows
if: ${{ startsWith(matrix.python-version, 'pypy') || startsWith(matrix.os, 'windows') }}
run: |
Expand Down Expand Up @@ -150,3 +150,5 @@ jobs:
steps:
- uses: jupyterlab/maintainer-tools/.github/actions/base-setup@v1
- uses: jupyterlab/maintainer-tools/.github/actions/test-sdist@v1
with:
test_command: pytest --vv || pytest -vv --lf
1 change: 1 addition & 0 deletions jupyter_client/channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ def run(self) -> None:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self._async_run())
loop.close()

async def _async_run(self) -> None:
"""The thread's main activity. Call start() instead."""
Expand Down
25 changes: 18 additions & 7 deletions jupyter_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class KernelClient(ConnectionFileMixin):
# The PyZMQ Context to use for communication with the kernel.
context = Instance(zmq.asyncio.Context)

_created_context: Bool = Bool(False)
_created_context = Bool(False)

def _context_default(self) -> zmq.asyncio.Context:
self._created_context = True
Expand All @@ -116,6 +116,23 @@ def _context_default(self) -> zmq.asyncio.Context:
# flag for whether execute requests should be allowed to call raw_input:
allow_stdin: bool = True

def __del__(self):
"""Handle garbage collection. Destroy context if applicable."""
if self._created_context and self.context and not self.context.closed:
if self.channels_running:
if self.log:
self.log.warning("Could not destroy zmq context for %s", self)
else:
if self.log:
self.log.debug("Destroying zmq context for %s", self)
self.context.destroy()
try:
super_del = super().__del__
except AttributeError:
pass
else:
super_del()

# --------------------------------------------------------------------------
# Channel proxy methods
# --------------------------------------------------------------------------
Expand Down Expand Up @@ -286,9 +303,6 @@ def start_channels(
:meth:`start_kernel`. If the channels have been stopped and you
call this, :class:`RuntimeError` will be raised.
"""
# Create the context if needed.
if not self._created_context:
self.context = self._context_default()
if iopub:
self.iopub_channel.start()
if shell:
Expand Down Expand Up @@ -318,9 +332,6 @@ def stop_channels(self) -> None:
self.hb_channel.stop()
if self.control_channel.is_alive():
self.control_channel.stop()
if self._created_context:
self._created_context = False
self.context.destroy()

@property
def channels_running(self) -> bool:
Expand Down
70 changes: 31 additions & 39 deletions jupyter_client/multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ def create_kernel_manager(*args: Any, **kwargs: Any) -> KernelManager:
help="Share a single zmq.Context to talk to all my kernels",
).tag(config=True)

_created_context = Bool(False)

context = Instance("zmq.Context")

_created_context = Bool(False)

_pending_kernels = Dict()

@property
Expand All @@ -111,7 +111,12 @@ def _context_default(self) -> zmq.Context:
self._created_context = True
return zmq.Context()

connection_dir = Unicode("")

_kernels = Dict()

def __del__(self):
"""Handle garbage collection. Destroy context if applicable."""
if self._created_context and self.context and not self.context.closed:
if self.log:
self.log.debug("Destroying zmq context for %s", self)
Expand All @@ -123,10 +128,6 @@ def __del__(self):
else:
super_del()

connection_dir = Unicode("")

_kernels = Dict()

def list_kernel_ids(self) -> t.List[str]:
"""Return a list of the kernel ids of the active kernels."""
# Create a copy so we can iterate over kernels in operations
Expand Down Expand Up @@ -171,17 +172,19 @@ async def _add_kernel_when_ready(
try:
await kernel_awaitable
self._kernels[kernel_id] = km
finally:
self._pending_kernels.pop(kernel_id, None)
except Exception as e:
self.log.exception(e)

async def _remove_kernel_when_ready(
self, kernel_id: str, kernel_awaitable: t.Awaitable
) -> None:
try:
await kernel_awaitable
self.remove_kernel(kernel_id)
finally:
self._pending_kernels.pop(kernel_id, None)
except Exception as e:
self.log.exception(e)

def _using_pending_kernels(self):
"""Returns a boolean; a clearer method for determining if
Expand All @@ -207,15 +210,15 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg
kwargs['kernel_id'] = kernel_id # Make kernel_id available to manager and provisioner

starter = ensure_async(km.start_kernel(**kwargs))
fut = asyncio.ensure_future(self._add_kernel_when_ready(kernel_id, km, starter))
self._pending_kernels[kernel_id] = fut
task = asyncio.create_task(self._add_kernel_when_ready(kernel_id, km, starter))
self._pending_kernels[kernel_id] = task
# Handling a Pending Kernel
if self._using_pending_kernels():
# If using pending kernels, do not block
# on the kernel start.
self._kernels[kernel_id] = km
else:
await fut
await task
# raise an exception if one occurred during kernel startup.
if km.ready.exception():
raise km.ready.exception() # type: ignore
Expand All @@ -224,22 +227,6 @@ async def _async_start_kernel(self, kernel_name: t.Optional[str] = None, **kwarg

start_kernel = run_sync(_async_start_kernel)

async def _shutdown_kernel_when_ready(
self,
kernel_id: str,
now: t.Optional[bool] = False,
restart: t.Optional[bool] = False,
) -> None:
"""Wait for a pending kernel to be ready
before shutting the kernel down.
"""
# Only do this if using pending kernels
if self._using_pending_kernels():
kernel = self._kernels[kernel_id]
await kernel.ready
# Once out of a pending state, we can call shutdown.
await ensure_async(self.shutdown_kernel(kernel_id, now=now, restart=restart))

async def _async_shutdown_kernel(
self,
kernel_id: str,
Expand All @@ -258,22 +245,21 @@ async def _async_shutdown_kernel(
Will the kernel be restarted?
"""
self.log.info("Kernel shutdown: %s" % kernel_id)
# If we're using pending kernels, block shutdown when a kernel is pending.
if self._using_pending_kernels() and kernel_id in self._pending_kernels:
raise RuntimeError("Kernel is in a pending state. Cannot shutdown.")
# If the kernel is still starting, wait for it to be ready.
elif kernel_id in self._pending_kernels:
kernel = self._pending_kernels[kernel_id]
if kernel_id in self._pending_kernels:
task = self._pending_kernels[kernel_id]
try:
await kernel
await task
km = self.get_kernel(kernel_id)
await t.cast(asyncio.Future, km.ready)
except asyncio.CancelledError:
pass
except Exception:
self.remove_kernel(kernel_id)
return
km = self.get_kernel(kernel_id)
# If a pending kernel raised an exception, remove it.
if km.ready.exception():
if not km.ready.cancelled() and km.ready.exception():
self.remove_kernel(kernel_id)
return
stopper = ensure_async(km.shutdown_kernel(now, restart))
Expand Down Expand Up @@ -320,13 +306,19 @@ async def _async_shutdown_all(self, now: bool = False) -> None:
"""Shutdown all kernels."""
kids = self.list_kernel_ids()
kids += list(self._pending_kernels)
futs = [ensure_async(self._shutdown_kernel_when_ready(kid, now=now)) for kid in set(kids)]
kms = list(self._kernels.values())
futs = [ensure_async(self.shutdown_kernel(kid, now=now)) for kid in set(kids)]
await asyncio.gather(*futs)
# When using "shutdown all", all pending kernels
# should be awaited before exiting this method.
# If using pending kernels, the kernels will not have been fully shut down.
if self._using_pending_kernels():
for km in self._kernels.values():
await km.ready
for km in kms:
try:
await km.ready
except asyncio.CancelledError:
self._pending_kernels[km.kernel_id].cancel()
except Exception:
# Will have been logged in _add_kernel_when_ready
pass

shutdown_all = run_sync(_async_shutdown_all)

Expand Down
2 changes: 2 additions & 0 deletions jupyter_client/tests/test_kernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,15 @@ async def test_async_signal_kernel_subprocesses(self, name, install, expected):
class TestKernelManager:
def test_lifecycle(self, km):
km.start_kernel(stdout=PIPE, stderr=PIPE)
kc = km.client()
assert km.is_alive()
is_done = km.ready.done()
assert is_done
km.restart_kernel(now=True)
assert km.is_alive()
km.interrupt_kernel()
assert isinstance(km, KernelManager)
kc.stop_channels()
km.shutdown_kernel(now=True)
assert km.context.closed

Expand Down
12 changes: 7 additions & 5 deletions jupyter_client/tests/test_multikernelmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ def _run_lifecycle(km, test_kid=None):
assert kid in km.list_kernel_ids()
km.interrupt_kernel(kid)
k = km.get_kernel(kid)
kc = k.client()
assert isinstance(k, KernelManager)
km.shutdown_kernel(kid, now=True)
assert kid not in km, f"{kid} not in {km}"
kc.stop_channels()

def _run_cinfo(self, km, transport, ip):
kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
Expand Down Expand Up @@ -158,8 +160,10 @@ def test_start_sequence_ipc_kernels(self):

def tcp_lifecycle_with_loop(self):
# Ensure each thread has an event loop
asyncio.set_event_loop(asyncio.new_event_loop())
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self.test_tcp_lifecycle()
loop.close()

def test_start_parallel_thread_kernels(self):
self.test_tcp_lifecycle()
Expand Down Expand Up @@ -415,10 +419,6 @@ async def test_use_pending_kernels_early_shutdown(self):
kernel = km.get_kernel(kid)
assert not kernel.ready.done()
# Try shutting down while the kernel is pending
with pytest.raises(RuntimeError):
await ensure_future(km.shutdown_kernel(kid, now=True))
await kernel.ready
# Shutdown once the kernel is ready
await ensure_future(km.shutdown_kernel(kid, now=True))
# Wait for the kernel to shutdown
await kernel.ready
Expand Down Expand Up @@ -476,6 +476,7 @@ def tcp_lifecycle_with_loop(self):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(self.raw_tcp_lifecycle())
loop.close()

# static so picklable for multiprocessing on Windows
@classmethod
Expand All @@ -491,6 +492,7 @@ def raw_tcp_lifecycle_sync(cls, test_kid=None):
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(cls.raw_tcp_lifecycle(test_kid=test_kid))
loop.close()

@gen_test
async def test_start_parallel_thread_kernels(self):
Expand Down
3 changes: 3 additions & 0 deletions jupyter_client/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,9 @@ def stop(self) -> None:
self.close()
self.ioloop = None

def __del__(self):
self.close()

def close(self) -> None:
if self.ioloop is not None:
try:
Expand Down
8 changes: 6 additions & 2 deletions jupyter_client/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ def wrapped(*args, **kwargs):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# Workaround for bugs.python.org/issue39529.
try:
loop = asyncio.get_event_loop_policy().get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
import nest_asyncio # type: ignore

nest_asyncio.apply(loop)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ filterwarnings= [
# Fail on warnings
"error",

# Workarounds for https://github.com/pytest-dev/pytest-asyncio/issues/77
# We need to handle properly closing loops as part of https://github.com/jupyter/jupyter_client/issues/755.
"ignore:unclosed <socket.socket:ResourceWarning",
"ignore:unclosed event loop:ResourceWarning",

Expand Down