Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance kernel activity tracking, ignore comms messages for activity tracking when mappingkernelmanager.cull_connected=true #7260

Closed
miloaissatu opened this issue Feb 14, 2024 · 1 comment
Labels
enhancement status:Needs Triage Applied to issues that need triage

Comments

@miloaissatu
Copy link

miloaissatu commented Feb 14, 2024

Problem

The idle kernel culling implementation (Ref https://discourse.jupyter.org/t/which-is-the-correct-way-to-cull-idle-kernels-and-notebook/8123) bases its last_activity time (https://github.com/jupyter/notebook/pull/1827/files) on any iopub messages https://github.com/jupyter/notebook/pull/1827/files#diff-0f0220b079fea446d9d5a2bda9710dcabe6116b03e6195c92ba02ec818aac779R203-R226

This is a problem in our environment/situation

  • jupyterhub on kubernetes
  • users tend to use the same singleuser pod for long periods of time (days/weeks)
  • users commonly have notebooks running long tasks, so they may kick it off towards end of day or end of week and come back to it next business day
  • users often do not close unused notebook sessions / kernels
  • refreshing the jupyter web ui or opening a new browser session to the notebook server triggers a flood of kernel_info_request, debug_request, comm_info request etc on zmq iopub channel

The requirement that they need to be able to come back to an existing session after an extended period (.e.g overnight/weekend) coupled with the jupyter frontend refresh/reload causing all kernel last activity timers to reset to utcnow() means the native idle culler basically never kicks in for us. This leads to large amounts of memory being held and wasted in idle sessions.

Proposed Solution

When mappingkernelmanager.cull_connected is true, ignore these iopub message types i.e. don't update last_activity based on these msg_types

  • kernel_info_request
  • comm_open
  • comm_msg
  • comm_info_request
  • debug_request
  • usage_request
  • shutdown_request
  • shutdown_reply

additionally, use parent_header.msg_id to track when an execute_request starts (execution_state=busy) and end (execution_state=idle). e.g. for each kernel, store a key-value pair when a new parent_header.msg_id execute_request has execution_status=busy and pop it when it transitions back to idle. If this key value store is not empty on the next activity check, set last_activity to utcnow() because it means there's still something running.

Additional context

Here's an example implementation for a specific kernel instance that demonstrates what I mean in the proposed solution.

#!/usr/bin/env python
#Ref https://github.com/zeromq/pyzmq/blob/main/examples/eventloop/activity_cb.py
"""A basic ZMQ activity_cb server using the tornado eventloop

without relying on tornado integration (see activity_cbstream, activity_cbfuture).
"""

from tornado import ioloop
import zmq
import json
from typing import Any
import time
import threading
import os
from jupyter_client import AsyncKernelClient


# -----------------------------------------------------------------------------
# Message deserialize parameters
# -----------------------------------------------------------------------------

# Ref https://github.com/jupyter/jupyter_client/blob/4ffbfcf5809ad8913724dd1359089ac3d42031e1/jupyter_client/session.py#L1022
minlen = 5
copy=True
content=True
mode=zmq.NOBLOCK
DELIM = b"<IDS|MSG>"
# -----------------------------------------------------------------------------
# Activity tracking
# -----------------------------------------------------------------------------
# cull_idle_timeout=int(os.getenv("CULL_IDLE_TIMEOUT",432000))
cull_idle_timeout=int(os.getenv("CULL_IDLE_TIMEOUT",60))
cull_interval=int(os.getenv("CULL_INTERVAL",30))
kernel_max_active_duration = int(os.getenv("KERNEL_MAX_ACTIVE_DURATION",4*24*60*60))
busy={}
last_activity=time.time()
ignore_types=[
    "__IGNORE__",
    "kernel_info_request",
    "comm_open",
    "comm_msg",
    "comm_info_request",
    "debug_request",
    "usage_request",
    "shutdown_request",
    "shutdown_reply"
    ]

# Ref https://github.com/jupyter/jupyter_client/blob/main/jupyter_client/session.py#L119C1-L124C25
def json_unpacker(s: str | bytes) -> Any:
    """Convert a json bytes or string to an object."""
    if isinstance(s, bytes):
        s = s.decode("utf8", "replace")
    return json.loads(s)

def activity_cb(sock, events):
    global last_activity
    # We don't know how many recv's we can do?
    if not sock.EVENTS & zmq.POLLIN:
        # not a read event
        return
    try:
        msg_list  = sock.recv_multipart(mode,copy=copy)
    except zmq.ZMQError as e:
        if e.errno == zmq.EAGAIN:
            # We can convert EAGAIN to None as we know in this case
            # recv_multipart won't return None.
            return None, None
        else:
            raise
    idx = msg_list.index(DELIM)
    content_list = msg_list[idx + 1 :]
    if not len(content_list) >= minlen:
        msg = "malformed message, must have at least %i elements" % minlen
        raise TypeError(msg)
    message={}
    message["header"] = json_unpacker(content_list[1])
    message["parent_header"] = json_unpacker(content_list[2])
    message["metadata"] = json_unpacker(content_list[3])
    if content:
        message["content"] = json_unpacker(content_list[4])
    else:
        message["content"] = content_list[4]
    print(message,flush=True)
    if "execution_state" in message["content"]:
        if not message["parent_header"].get("msg_type","__IGNORE__") in ignore_types:    
            # print(message,flush=True)
            now=time.time()
            if message["content"]["execution_state"]=="busy":
                busy[message["parent_header"]["msg_id"]]=now
                last_activity=now
            else:
                busy.pop(message["parent_header"]["msg_id"])
            print(f"busy is {busy}",flush=True)
    # sock.send_multipart(msg)
    # avoid starving due to edge-triggered event FD
    # if there is more than one read event waiting
    if sock.EVENTS & zmq.POLLIN:
        ioloop.IOLoop.current().add_callback(activity_cb, sock, events)



def idle_culler():
    # Do we need to prune kernels that have been 'busy' for longer than x duration?
    # i.e. is there a case where we might have missed the busy->idle transition
    now=time.time()
    global last_activity
    global busy
    if len(busy)>0:
        last_activity=time.time()
    print(f"LastActivity: {last_activity}",flush=True)
    cull = now-last_activity>cull_idle_timeout
    print(f"cull is {cull}",flush=True)
    if cull:
        try:
            km = AsyncKernelClient(connection_file=connection_file)
            km.load_connection_file()
            km.shutdown(restart=False)
            del km

        except Exception as e:
            print("Error:", e)

    return cull
connection_file = "/juno-tmp/jupyter-data/runtime/kernel-ccf08bbf-a234-455d-9d8b-33491ec8e183.json"
ctx = zmq.Context.instance()

socket = ctx.socket(zmq.SUB)
topic="kernel" #ZMQ topic subscribe/filter is prefix based, so anything that doesn't start with kernel is ignored.
shell_port=35001
iopub_port=50173
stdin_port=36563
control_port=54833
hb_port=45409
socket.connect(f"tcp://localhost:{shell_port}")
socket.connect(f"tcp://localhost:{iopub_port}")
socket.connect(f"tcp://localhost:{stdin_port}")
socket.connect(f"tcp://localhost:{control_port}")
socket.connect(f"tcp://localhost:{hb_port}")
socket.subscribe(topic)


loop = ioloop.IOLoop.current()
loop.add_handler(socket, activity_cb, loop.READ)
periodic_cull = ioloop.PeriodicCallback(idle_culler, 10*1000)
periodic_cull.start()
loop.start()
@miloaissatu miloaissatu added enhancement status:Needs Triage Applied to issues that need triage labels Feb 14, 2024
@jtpio
Copy link
Member

jtpio commented Feb 14, 2024

Thanks @miloaissatu.

You may want to open that issue on the jupyter_server repo instead, since this is where the kernel activity tracking is now implemented: https://github.com/jupyter-server/jupyter_server

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement status:Needs Triage Applied to issues that need triage
Projects
None yet
Development

No branches or pull requests

2 participants