Skip to content

Commit

Permalink
Emit events from the kernels service and gateway client (jupyter-serv…
Browse files Browse the repository at this point in the history
…er#1252)

* Emit events from the kernels service and gateway client

* clean-up after code review

* Make GatewayKernelManager a compatible subclass to ServerKernelManager

* Add events to gateway lifecycle test

* Access event_logger trait from ServerApp

---------

Co-authored-by: Raj Musuku <r_musuku@apple.com>
Co-authored-by: Zach Sailer <zsailer@apple.com>
Co-authored-by: Kevin Bates <kbates4@gmail.com>
  • Loading branch information
4 people committed Apr 17, 2023
1 parent 8144a0d commit 5c49253
Show file tree
Hide file tree
Showing 9 changed files with 459 additions and 13 deletions.
40 changes: 40 additions & 0 deletions jupyter_server/event_schemas/gateway_client/v1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"$id": https://events.jupyter.org/jupyter_server/gateway_client/v1
version: 1
title: Gateway Client activities.
personal-data: true
description: |
Record events of a gateway client.
type: object
required:
- status
- msg
properties:
status:
enum:
- error
- success
description: |
Status received by Gateway client based on the rest api operation to gateway kernel.
This is a required field.
Possible values:
1. error
Error response from a rest api operation to gateway kernel.
2. success
Success response from a rest api operation to gateway kernel.
status_code:
type: number
description: |
Http response codes from a rest api operation to gateway kernel.
Examples: 200, 400, 502, 503, 599 etc.
msg:
type: string
description: |
Description of the event being emitted.
gateway_url:
type: string
description: |
Gateway url where the remote server exist.
71 changes: 71 additions & 0 deletions jupyter_server/event_schemas/kernel_actions/v1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
"$id": https://events.jupyter.org/jupyter_server/kernel_actions/v1
version: 1
title: Kernel Manager activities
personal-data: true
description: |
Record events of a kernel manager.
type: object
required:
- action
- kernel_id
- msg
properties:
action:
enum:
- start
- interrupt
- shutdown
- restart
description: |
Action performed by the Kernel Manager.
This is a required field.
Possible values:
1. start
A kernel has been started with the given kernel id.
2. interrupt
A kernel has been interrupted for the given kernel id.
3. shutdown
A kernel has been shut down for the given kernel id.
4. restart
A kernel has been restarted for the given kernel id.
kernel_id:
type: string
description: |
Kernel id.
This is a required field.
kernel_name:
type: string
description: |
Name of the kernel.
status:
enum:
- error
- success
description: |
Status received from a rest api operation to kernel server.
This is a required field.
Possible values:
1. error
Error response from a rest api operation to kernel server.
2. success
Success response from a rest api operation to kernel server.
status_code:
type: number
description: |
Http response codes from a rest api operation to kernel server.
Examples: 200, 400, 502, 503, 599 etc
msg:
type: string
description: |
Description of the event specified in action.
61 changes: 57 additions & 4 deletions jupyter_server/gateway/gateway_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,31 @@
from http.cookies import SimpleCookie
from socket import gaierror

from jupyter_events import EventLogger
from tornado import web
from tornado.httpclient import AsyncHTTPClient, HTTPClientError, HTTPResponse
from traitlets import Bool, Float, Int, TraitError, Type, Unicode, default, observe, validate
from traitlets import (
Bool,
Float,
Instance,
Int,
TraitError,
Type,
Unicode,
default,
observe,
validate,
)
from traitlets.config import LoggingConfigurable, SingletonConfigurable

from jupyter_server import DEFAULT_EVENTS_SCHEMA_PATH, JUPYTER_SERVER_EVENTS_URI

ERROR_STATUS = "error"
SUCCESS_STATUS = "success"
STATUS_KEY = "status"
STATUS_CODE_KEY = "status_code"
MESSAGE_KEY = "msg"

if ty.TYPE_CHECKING:
from http.cookies import Morsel

Expand Down Expand Up @@ -71,10 +91,30 @@ def get_token(
class GatewayClient(SingletonConfigurable):
"""This class manages the configuration. It's its own singleton class so
that we can share these values across all objects. It also contains some
helper methods to build request arguments out of the various config
options.
helper methods to build request arguments out of the various config
"""

event_schema_id = JUPYTER_SERVER_EVENTS_URI + "/gateway_client/v1"
event_logger = Instance(EventLogger).tag(config=True)

@default("event_logger")
def _default_event_logger(self):
if self.parent and hasattr(self.parent, "event_logger"):
# Event logger is attached from serverapp.
return self.parent.event_logger
else:
# If parent does not have an event logger, create one.
logger = EventLogger()
schema_path = DEFAULT_EVENTS_SCHEMA_PATH / "gateway_client" / "v1.yaml"
logger.register_event_schema(schema_path)
self.log.info("Event is registered in GatewayClient.")
return logger

def emit(self, data):
"""Emit event using the core event schema from Jupyter Server's Gateway Client."""
self.event_logger.emit(schema_id=self.event_schema_id, data=data)

url = Unicode(
default_value=None,
allow_none=True,
Expand All @@ -97,7 +137,9 @@ def _url_validate(self, proposal):
value = proposal["value"]
# Ensure value, if present, starts with 'http'
if value is not None and len(value) > 0 and not str(value).lower().startswith("http"):
raise TraitError("GatewayClient url must start with 'http': '%r'" % value)
message = "GatewayClient url must start with 'http': '%r'" % value
self.emit(data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 400, MESSAGE_KEY: message})
raise TraitError(message)
return value

ws_url = Unicode(
Expand All @@ -123,7 +165,9 @@ def _ws_url_validate(self, proposal):
value = proposal["value"]
# Ensure value, if present, starts with 'ws'
if value is not None and len(value) > 0 and not str(value).lower().startswith("ws"):
raise TraitError("GatewayClient ws_url must start with 'ws': '%r'" % value)
message = "GatewayClient ws_url must start with 'ws': '%r'" % value
self.emit(data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 400, MESSAGE_KEY: message})
raise TraitError(message)
return value

kernels_endpoint_default_value = "/api/kernels"
Expand Down Expand Up @@ -728,6 +772,9 @@ async def gateway_request(endpoint: str, **kwargs: ty.Any) -> HTTPResponse:
# NOTE: We do this here since this handler is called during the server's startup and subsequent refreshes
# of the tree view.
except HTTPClientError as e:
GatewayClient.instance().emit(
data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: e.code, MESSAGE_KEY: str(e.message)}
)
error_reason = f"Exception while attempting to connect to Gateway server url '{GatewayClient.instance().url}'"
error_message = e.message
if e.response:
Expand All @@ -744,12 +791,18 @@ async def gateway_request(endpoint: str, **kwargs: ty.Any) -> HTTPResponse:
"Ensure gateway url is valid and the Gateway instance is running.",
) from e
except ConnectionError as e:
GatewayClient.instance().emit(
data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 503, MESSAGE_KEY: str(e)}
)
raise web.HTTPError(
503,
f"ConnectionError was received from Gateway server url '{GatewayClient.instance().url}'. "
"Check to be sure the Gateway instance is running.",
) from e
except gaierror as e:
GatewayClient.instance().emit(
data={STATUS_KEY: ERROR_STATUS, STATUS_CODE_KEY: 404, MESSAGE_KEY: str(e)}
)
raise web.HTTPError(
404,
f"The Gateway server specified in the gateway_url '{GatewayClient.instance().url}' doesn't "
Expand Down
29 changes: 23 additions & 6 deletions jupyter_server/gateway/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@
from jupyter_client.asynchronous.client import AsyncKernelClient
from jupyter_client.clientabc import KernelClientABC
from jupyter_client.kernelspec import KernelSpecManager
from jupyter_client.manager import AsyncKernelManager
from jupyter_client.managerabc import KernelManagerABC
from jupyter_core.utils import ensure_async
from tornado import web
from tornado.escape import json_decode, json_encode, url_escape, utf8
from traitlets import DottedObjectName, Instance, Type, default

from .._tz import UTC
from ..services.kernels.kernelmanager import AsyncMappingKernelManager
from .._tz import UTC, utcnow
from ..services.kernels.kernelmanager import (
AsyncMappingKernelManager,
ServerKernelManager,
emit_kernel_action_event,
)
from ..services.sessions.sessionmanager import SessionManager
from ..utils import url_path_join
from .gateway_client import GatewayClient, gateway_request
Expand Down Expand Up @@ -79,7 +82,6 @@ async def start_kernel(self, *, kernel_id=None, path=None, **kwargs):
await km.start_kernel(kernel_id=kernel_id, **kwargs)
kernel_id = km.kernel_id
self._kernels[kernel_id] = km

# Initialize culling if not already
if not self._initialized_culler:
self.initialize_culler()
Expand Down Expand Up @@ -366,7 +368,7 @@ async def kernel_culled(self, kernel_id: str) -> bool: # typing: ignore
return km is None


class GatewayKernelManager(AsyncKernelManager):
class GatewayKernelManager(ServerKernelManager):
"""Manages a single kernel remotely via a Gateway Server."""

kernel_id: Optional[str] = None # type:ignore[assignment]
Expand All @@ -385,7 +387,8 @@ def __init__(self, **kwargs):
self.kernel_url: str
self.kernel = self.kernel_id = None
# simulate busy/activity markers:
self.execution_state = self.last_activity = None
self.execution_state = "starting"
self.last_activity = utcnow()

@property
def has_kernel(self):
Expand Down Expand Up @@ -458,6 +461,9 @@ async def refresh_model(self, model=None):
# Kernel management
# --------------------------------------------------------------------------

@emit_kernel_action_event(
success_msg="Kernel {kernel_id} was started.",
)
async def start_kernel(self, **kwargs):
"""Starts a kernel via HTTP in an asynchronous manner.
Expand Down Expand Up @@ -509,6 +515,9 @@ async def start_kernel(self, **kwargs):
self.kernel = await self.refresh_model()
self.log.info(f"GatewayKernelManager using existing kernel: {self.kernel_id}")

@emit_kernel_action_event(
success_msg="Kernel {kernel_id} was shutdown.",
)
async def shutdown_kernel(self, now=False, restart=False):
"""Attempts to stop the kernel process cleanly via HTTP."""

Expand All @@ -523,6 +532,9 @@ async def shutdown_kernel(self, now=False, restart=False):
else:
raise

@emit_kernel_action_event(
success_msg="Kernel {kernel_id} was restarted.",
)
async def restart_kernel(self, **kw):
"""Restarts a kernel via HTTP."""
if self.has_kernel:
Expand All @@ -537,6 +549,9 @@ async def restart_kernel(self, **kw):
)
self.log.debug("Restart kernel response: %d %s", response.code, response.reason)

@emit_kernel_action_event(
success_msg="Kernel {kernel_id} was interrupted.",
)
async def interrupt_kernel(self):
"""Interrupts the kernel via an HTTP request."""
if self.has_kernel:
Expand All @@ -556,8 +571,10 @@ async def is_alive(self):
if self.has_kernel:
# Go ahead and issue a request to get the kernel
self.kernel = await self.refresh_model()
self.log.debug(f"The kernel: {self.kernel} is alive.")
return True
else: # we don't have a kernel
self.log.debug(f"The kernel: {self.kernel} no longer exists.")
return False

def cleanup_resources(self, restart=False):
Expand Down
2 changes: 2 additions & 0 deletions jupyter_server/serverapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1964,6 +1964,8 @@ def init_event_logger(self):
# events URI, `JUPYTER_SERVER_EVENTS_URI`.
schema_ids = [
"https://events.jupyter.org/jupyter_server/contents_service/v1",
"https://events.jupyter.org/jupyter_server/gateway_client/v1",
"https://events.jupyter.org/jupyter_server/kernel_actions/v1",
]
for schema_id in schema_ids:
# Get the schema path from the schema ID.
Expand Down
Loading

0 comments on commit 5c49253

Please sign in to comment.