diff --git a/packages/services/src/kernel/comm.ts b/packages/services/src/kernel/comm.ts index ca7757503822..7db0ac828ea3 100644 --- a/packages/services/src/kernel/comm.ts +++ b/packages/services/src/kernel/comm.ts @@ -56,7 +56,7 @@ class CommHandler extends DisposableDelegate implements Kernel.IComm { * * **See also:** [[ICommClose]], [[close]] */ - get onClose(): (msg: KernelMessage.ICommCloseMsg) => void { + get onClose(): (msg: KernelMessage.ICommCloseMsg) => Promise | void { return this._onClose; } @@ -69,21 +69,21 @@ class CommHandler extends DisposableDelegate implements Kernel.IComm { * * **See also:** [[close]] */ - set onClose(cb: (msg: KernelMessage.ICommCloseMsg) => void) { + set onClose(cb: (msg: KernelMessage.ICommCloseMsg) => Promise | void) { this._onClose = cb; } /** * Get the callback for a comm message received event. */ - get onMsg(): (msg: KernelMessage.ICommMsgMsg) => void { + get onMsg(): (msg: KernelMessage.ICommMsgMsg) => Promise | void { return this._onMsg; } /** * Set the callback for a comm message received event. */ - set onMsg(cb: (msg: KernelMessage.ICommMsgMsg) => void) { + set onMsg(cb: (msg: KernelMessage.ICommMsgMsg) => Promise | void) { this._onMsg = cb; } @@ -180,6 +180,6 @@ class CommHandler extends DisposableDelegate implements Kernel.IComm { private _target = ''; private _id = ''; private _kernel: Kernel.IKernel; - private _onClose: (msg: KernelMessage.ICommCloseMsg) => void; - private _onMsg: (msg: KernelMessage.ICommMsgMsg) => void; + private _onClose: (msg: KernelMessage.ICommCloseMsg) => Promise | void; + private _onMsg: (msg: KernelMessage.ICommMsgMsg) => Promise | void; } diff --git a/packages/services/src/kernel/default.ts b/packages/services/src/kernel/default.ts index 09a44b92361a..26812d4115e6 100644 --- a/packages/services/src/kernel/default.ts +++ b/packages/services/src/kernel/default.ts @@ -77,7 +77,7 @@ class DefaultKernel implements Kernel.IKernel { this._clientId = options.clientId || uuid(); this._username = options.username || ''; this._futures = new Map(); - this._commPromises = new Map>(); + this._comms = new Map>(); this._createSocket(); Private.runningKernels.push(this); } @@ -103,6 +103,10 @@ class DefaultKernel implements Kernel.IKernel { /** * A signal emitted for iopub kernel messages. + * + * #### Notes + * This signal is emitted after the iopub message is handled, which may + * asynchronously after it is received. */ get iopubMessage(): ISignal { return this._iopubMessage; @@ -238,7 +242,7 @@ class DefaultKernel implements Kernel.IKernel { this._futures.forEach((future, key) => { future.dispose(); }); - this._commPromises.forEach((promise, key) => { + this._comms.forEach((promise, key) => { promise.then(comm => { comm.dispose(); }); @@ -627,24 +631,25 @@ class DefaultKernel implements Kernel.IKernel { /** * Connect to a comm, or create a new one. * + * TODO: should we change this to a synchronous function? There's nothing + * asynchronous about it now. + * * #### Notes * If a client-side comm already exists, it is returned. */ - connectToComm(targetName: string, commId?: string): Promise { + async connectToComm(targetName: string, commId?: string): Promise { let id = commId || uuid(); - if (this._commPromises.has(id)) { - return this._commPromises.get(id); + if (this._comms.has(id)) { + return this._comms.get(id); } - let promise = Promise.resolve(void 0).then(() => { - return new CommHandler( - targetName, - id, - this, - () => { this._unregisterComm(id); } - ); - }); - this._commPromises.set(id, promise); - return promise; + let comm = new CommHandler( + targetName, + id, + this, + () => { this._unregisterComm(id); } + ); + this._comms.set(id, comm); + return comm; } /** @@ -652,7 +657,7 @@ class DefaultKernel implements Kernel.IKernel { * * @returns Whether the message was handled. */ - private _handleDisplayId(displayId: string, msg: KernelMessage.IMessage): boolean { + private async _handleDisplayId(displayId: string, msg: KernelMessage.IMessage): Promise { let msgId = (msg.parent_header as KernelMessage.IHeader).msg_id; let parentIds = this._displayIdToParentIds.get(displayId); if (parentIds) { @@ -668,12 +673,12 @@ class DefaultKernel implements Kernel.IKernel { }; (updateMsg.header as any).msg_type = 'update_display_data'; - parentIds.map((parentId) => { + await Promise.all(parentIds.map(async (parentId) => { let future = this._futures && this._futures.get(parentId); if (future) { - future.handleMsg(updateMsg); + await future.handleMsg(updateMsg); } - }); + })); } // We're done here if it's update_display. @@ -697,7 +702,7 @@ class DefaultKernel implements Kernel.IKernel { } this._msgIdToDisplayIds.set(msgId, displayIds); - // Let it propagate to the intended recipient. + // Let the message propagate to the intended recipient. return false; } @@ -774,13 +779,13 @@ class DefaultKernel implements Kernel.IKernel { this._futures.forEach((future, key) => { future.dispose(); }); - this._commPromises.forEach((promise, key) => { + this._comms.forEach((promise, key) => { promise.then(comm => { comm.dispose(); }); }); this._futures = new Map(); - this._commPromises = new Map>(); + this._comms = new Map>(); this._displayIdToParentIds.clear(); this._msgIdToDisplayIds.clear(); } @@ -788,94 +793,77 @@ class DefaultKernel implements Kernel.IKernel { /** * Handle a `comm_open` kernel message. */ - private _handleCommOpen(msg: KernelMessage.ICommOpenMsg): void { + private async _handleCommOpen(msg: KernelMessage.ICommOpenMsg): Promise { let content = msg.content; if (this.isDisposed) { return; } - let promise = Private.loadObject(content.target_name, content.target_module, - this._targetRegistry).then(target => { - let comm = new CommHandler( - content.target_name, - content.comm_id, - this, - () => { this._unregisterComm(content.comm_id); } - ); - let response : any; - try { - response = target(comm, msg); - } catch (e) { - comm.close(); - console.error('Exception opening new comm'); - throw(e); - } - return Promise.resolve(response).then(() => { - if (this.isDisposed) { - return; - } - return comm; - }); - }); - this._commPromises.set(content.comm_id, promise); + let target = await Private.loadObject(content.target_name, content.target_module, this._targetRegistry); + let comm = new CommHandler( + content.target_name, + content.comm_id, + this, + () => { this._unregisterComm(content.comm_id); } + ); + try { + await target(comm, msg); + } catch (e) { + // TODO: do we need to await this? + await comm.close(); + console.error('Exception opening new comm'); + throw(e); + } + // TODO: do we need to check if the comm is disposed? + this._comms.set(content.comm_id, comm); } /** * Handle 'comm_close' kernel message. */ - private _handleCommClose(msg: KernelMessage.ICommCloseMsg): void { + private async _handleCommClose(msg: KernelMessage.ICommCloseMsg): Promise { let content = msg.content; - let promise = this._commPromises.get(content.comm_id); - if (!promise) { + let comm = this._comms.get(content.comm_id); + if (!comm) { console.error('Comm not found for comm id ' + content.comm_id); return; } - promise.then((comm) => { - if (!comm) { - return; - } - this._unregisterComm(comm.commId); - try { - let onClose = comm.onClose; - if (onClose) { - onClose(msg); - } - (comm as CommHandler).dispose(); - } catch (e) { - console.error('Exception closing comm: ', e, e.stack, msg); + this._unregisterComm(comm.commId); + try { + let onClose = comm.onClose; + if (onClose) { + await onClose(msg); } - }); + (comm as CommHandler).dispose(); + } catch (e) { + console.error('Exception closing comm: ', e, e.stack, msg); + } } /** * Handle a 'comm_msg' kernel message. */ - private _handleCommMsg(msg: KernelMessage.ICommMsgMsg): void { + private async _handleCommMsg(msg: KernelMessage.ICommMsgMsg): Promise { let content = msg.content; - let promise = this._commPromises.get(content.comm_id); - if (!promise) { + let comm = this._comms.get(content.comm_id); + if (!comm) { // We do have a registered comm for this comm id, ignore. return; } - promise.then((comm) => { - if (!comm) { - return; - } - try { - let onMsg = comm.onMsg; - if (onMsg) { - onMsg(msg); - } - } catch (e) { - console.error('Exception handling comm msg: ', e, e.stack, msg); + try { + let onMsg = comm.onMsg; + if (onMsg) { + await onMsg(msg); } - }); + } catch (e) { + console.error('Exception handling comm msg: ', e, e.stack, msg); + } } /** * Unregister a comm instance. */ private _unregisterComm(commId: string) { - this._commPromises.delete(commId); + this._comms.delete(commId); } /** @@ -933,12 +921,16 @@ class DefaultKernel implements Kernel.IKernel { /** * Handle a websocket message, validating and routing appropriately. + * + * TODO: convert to asynchronous processing. */ private _onWSMessage = (evt: MessageEvent) => { if (this._wsStopped) { // If the socket is being closed, ignore any messages return; } + + // Notify immediately if there is an error with the message. let msg = serialize.deserialize(evt.data); try { validate.validateMessage(msg); @@ -947,8 +939,21 @@ class DefaultKernel implements Kernel.IKernel { return; } + // Handle the message asynchronously, in the order received. + this._msgChain = this._msgChain.then(() => { + return this._handleMessage(msg); + }); + + // TODO: should this emit happen asynchronously? Should we have two events, + // one for a message received here, and then another after it is handled + // asynchronously? + this._anyMessage.emit({msg, direction: 'recv'}); + } + + private async _handleMessage(msg: KernelMessage.IMessage): Promise { let handled = false; + // Check to see if we have a display_id we need to reroute. if (msg.parent_header && msg.channel === 'iopub') { switch (msg.header.msg_type) { case 'display_data': @@ -958,7 +963,7 @@ class DefaultKernel implements Kernel.IKernel { let transient = (msg.content.transient || {}) as JSONObject; let displayId = transient['display_id'] as string; if (displayId) { - handled = this._handleDisplayId(displayId, msg); + handled = await this._handleDisplayId(displayId, msg); } break; default: @@ -970,7 +975,7 @@ class DefaultKernel implements Kernel.IKernel { let parentHeader = msg.parent_header as KernelMessage.IHeader; let future = this._futures && this._futures.get(parentHeader.msg_id); if (future) { - future.handleMsg(msg); + await future.handleMsg(msg); } else { // If the message was sent by us and was not iopub, it is orphaned. let owned = parentHeader.session === this.clientId; @@ -982,23 +987,23 @@ class DefaultKernel implements Kernel.IKernel { if (msg.channel === 'iopub') { switch (msg.header.msg_type) { case 'status': + // Updating the status is synchronous, and we call no user code this._updateStatus((msg as KernelMessage.IStatusMsg).content.execution_state); break; case 'comm_open': - this._handleCommOpen(msg as KernelMessage.ICommOpenMsg); + await this._handleCommOpen(msg as KernelMessage.ICommOpenMsg); break; case 'comm_msg': - this._handleCommMsg(msg as KernelMessage.ICommMsgMsg); + await this._handleCommMsg(msg as KernelMessage.ICommMsgMsg); break; case 'comm_close': - this._handleCommClose(msg as KernelMessage.ICommCloseMsg); + await this._handleCommClose(msg as KernelMessage.ICommCloseMsg); break; default: break; } this._iopubMessage.emit(msg as KernelMessage.IIOPubMessage); } - this._anyMessage.emit({msg, direction: 'recv'}); } /** @@ -1037,7 +1042,7 @@ class DefaultKernel implements Kernel.IKernel { private _reconnectAttempt = 0; private _isReady = false; private _futures: Map; - private _commPromises: Map>; + private _comms: Map; private _targetRegistry: { [key: string]: (comm: Kernel.IComm, msg: KernelMessage.ICommOpenMsg) => void; } = Object.create(null); private _info: KernelMessage.IInfoReply | null = null; private _pendingMessages: KernelMessage.IMessage[] = []; @@ -1050,6 +1055,7 @@ class DefaultKernel implements Kernel.IKernel { private _displayIdToParentIds = new Map(); private _msgIdToDisplayIds = new Map(); private _terminated = new Signal(this); + private _msgChain = Promise.resolve(); private _noOp = () => { /* no-op */}; } diff --git a/packages/services/src/kernel/future.ts b/packages/services/src/kernel/future.ts index 53b60616b11d..1a0fb8764c09 100644 --- a/packages/services/src/kernel/future.ts +++ b/packages/services/src/kernel/future.ts @@ -17,7 +17,6 @@ import { KernelMessage } from './messages'; - /** * Implementation of a kernel future. */ @@ -53,42 +52,42 @@ class KernelFutureHandler extends DisposableDelegate implements Kernel.IFuture { /** * Get the reply handler. */ - get onReply(): (msg: KernelMessage.IShellMessage) => void { + get onReply(): (msg: KernelMessage.IShellMessage) => Promise | void { return this._reply; } /** * Set the reply handler. */ - set onReply(cb: (msg: KernelMessage.IShellMessage) => void) { + set onReply(cb: (msg: KernelMessage.IShellMessage) => Promise | void) { this._reply = cb; } /** * Get the iopub handler. */ - get onIOPub(): (msg: KernelMessage.IIOPubMessage) => void { + get onIOPub(): (msg: KernelMessage.IIOPubMessage) => Promise | void { return this._iopub; } /** * Set the iopub handler. */ - set onIOPub(cb: (msg: KernelMessage.IIOPubMessage) => void) { + set onIOPub(cb: (msg: KernelMessage.IIOPubMessage) => Promise | void) { this._iopub = cb; } /** * Get the stdin handler. */ - get onStdin(): (msg: KernelMessage.IStdinMessage) => void { + get onStdin(): (msg: KernelMessage.IStdinMessage) => Promise | void { return this._stdin; } /** * Set the stdin handler. */ - set onStdin(cb: (msg: KernelMessage.IStdinMessage) => void) { + set onStdin(cb: (msg: KernelMessage.IStdinMessage) => Promise | void) { this._stdin = cb; } @@ -105,7 +104,7 @@ class KernelFutureHandler extends DisposableDelegate implements Kernel.IFuture { * If a hook is registered during the hook processing, it won't run until the next message. * If a hook is removed during the hook processing, it will be deactivated immediately. */ - registerMessageHook(hook: (msg: KernelMessage.IIOPubMessage) => boolean): void { + registerMessageHook(hook: (msg: KernelMessage.IIOPubMessage) => Promise | boolean): void { this._hooks.add(hook); } @@ -117,7 +116,7 @@ class KernelFutureHandler extends DisposableDelegate implements Kernel.IFuture { * #### Notes * If a hook is removed during the hook processing, it will be deactivated immediately. */ - removeMessageHook(hook: (msg: KernelMessage.IIOPubMessage) => boolean): void { + removeMessageHook(hook: (msg: KernelMessage.IIOPubMessage) => Promise | boolean): void { if (this.isDisposed) { return; } @@ -148,25 +147,25 @@ class KernelFutureHandler extends DisposableDelegate implements Kernel.IFuture { /** * Handle an incoming kernel message. */ - handleMsg(msg: KernelMessage.IMessage): void { + async handleMsg(msg: KernelMessage.IMessage): Promise { switch (msg.channel) { case 'shell': - this._handleReply(msg as KernelMessage.IShellMessage); + await this._handleReply(msg as KernelMessage.IShellMessage); break; case 'stdin': - this._handleStdin(msg as KernelMessage.IStdinMessage); + await this._handleStdin(msg as KernelMessage.IStdinMessage); break; case 'iopub': - this._handleIOPub(msg as KernelMessage.IIOPubMessage); + await this._handleIOPub(msg as KernelMessage.IIOPubMessage); break; default: break; } } - private _handleReply(msg: KernelMessage.IShellMessage): void { + private async _handleReply(msg: KernelMessage.IShellMessage): Promise { let reply = this._reply; - if (reply) { reply(msg); } + if (reply) { await reply(msg); } this._replyMsg = msg; this._setFlag(Private.KernelFutureFlag.GotReply); if (this._testFlag(Private.KernelFutureFlag.GotIdle)) { @@ -174,15 +173,15 @@ class KernelFutureHandler extends DisposableDelegate implements Kernel.IFuture { } } - private _handleStdin(msg: KernelMessage.IStdinMessage): void { + private async _handleStdin(msg: KernelMessage.IStdinMessage): Promise { let stdin = this._stdin; - if (stdin) { stdin(msg); } + if (stdin) { await stdin(msg); } } - private _handleIOPub(msg: KernelMessage.IIOPubMessage): void { - let process = this._hooks.process(msg); + private async _handleIOPub(msg: KernelMessage.IIOPubMessage): Promise { + let process = await this._hooks.process(msg); let iopub = this._iopub; - if (process && iopub) { iopub(msg); } + if (process && iopub) { await iopub(msg); } if (KernelMessage.isStatusMsg(msg) && msg.content.execution_state === 'idle') { this._setFlag(Private.KernelFutureFlag.GotIdle); @@ -221,9 +220,9 @@ class KernelFutureHandler extends DisposableDelegate implements Kernel.IFuture { private _msg: KernelMessage.IShellMessage; private _status = 0; - private _stdin: (msg: KernelMessage.IStdinMessage) => void = Private.noOp; - private _iopub: (msg: KernelMessage.IIOPubMessage) => void = Private.noOp; - private _reply: (msg: KernelMessage.IShellMessage) => void = Private.noOp; + private _stdin: (msg: KernelMessage.IStdinMessage) => Promise | void = Private.noOp; + private _iopub: (msg: KernelMessage.IIOPubMessage) => Promise | void = Private.noOp; + private _reply: (msg: KernelMessage.IShellMessage) => Promise | void = Private.noOp; private _done = new PromiseDelegate(); private _replyMsg: KernelMessage.IShellMessage; private _hooks = new Private.HookList(); @@ -238,11 +237,6 @@ namespace Private { export const noOp = () => { /* no-op */ }; - /** - * A polyfill for a function to run code outside of the current execution context. - */ - let defer = typeof requestAnimationFrame === 'function' ? requestAnimationFrame : setImmediate; - export class HookList { /** @@ -250,7 +244,7 @@ namespace Private { * * @param hook - The callback to register. */ - add(hook: (msg: T) => boolean): void { + add(hook: (msg: T) => boolean | Promise): void { this.remove(hook); this._hooks.push(hook); } @@ -260,7 +254,7 @@ namespace Private { * * @param hook - The callback to remove. */ - remove(hook: (msg: T) => boolean): void { + remove(hook: (msg: T) => boolean | Promise): void { let index = this._hooks.indexOf(hook); if (index >= 0) { this._hooks[index] = null; @@ -272,28 +266,42 @@ namespace Private { * Process a message through the hooks. * * #### Notes - * The most recently registered hook is run first. - * If the hook returns false, any later hooks will not run. - * If a hook throws an error, the error is logged to the console and the next hook is run. - * If a hook is registered during the hook processing, it won't run until the next message. - * If a hook is removed during the hook processing, it will be deactivated immediately. + * The hooks can be asynchronous, returning a promise, and hook processing + * pauses until the promise resolves. The most recently registered hook is + * run first. If the hook returns false, any later hooks will not run. If a + * hook throws an error, the error is logged to the console and the next + * hook is run. If a hook is registered during the hook processing, it won't + * run until the next message. If a hook is removed during the hook + * processing, it will be deactivated immediately. */ - process(msg: T): boolean { + async process(msg: T): Promise { + // Wait until we can start a new process run. + await this._processing; + + // Reserve a process run for ourselves. + let processing = new PromiseDelegate(); + this._processing = processing.promise; + let continueHandling: boolean; - // most recently-added hook is called first + + // Call the end hook (most recently-added) first. Starting at the end also + // guarantees that hooks added during the processing will not be run in + // this invocation. for (let i = this._hooks.length - 1; i >= 0; i--) { let hook = this._hooks[i]; if (hook === null) { continue; } try { - continueHandling = hook(msg); + continueHandling = await hook(msg); } catch (err) { continueHandling = true; console.error(err); } if (continueHandling === false) { + processing.resolve(undefined); return false; } } + processing.resolve(undefined); return true; } @@ -301,10 +309,13 @@ namespace Private { * Schedule a cleanup of the list, removing any hooks that have been nulled out. */ private _scheduleCompact(): void { - if (!this._cleanupScheduled) { - this._cleanupScheduled = true; - defer(() => { - this._cleanupScheduled = false; + if (!this._compactScheduled) { + this._compactScheduled = true; + + // Make sure we compact the list between processing phases. We may want + // to rate-limit this compaction with a requestAnimationFrame as well. + this._processing = this._processing.then(() => { + this._compactScheduled = false; this._compact(); }); } @@ -326,8 +337,9 @@ namespace Private { this._hooks.length -= numNulls; } - private _hooks: (((msg: T) => boolean) | null)[] = []; - private _cleanupScheduled: boolean; + private _hooks: (((msg: T) => boolean | Promise) | null)[] = []; + private _compactScheduled: boolean; + private _processing: Promise; } /** diff --git a/packages/services/src/kernel/kernel.ts b/packages/services/src/kernel/kernel.ts index d258ed203d39..d9f9f11b7872 100644 --- a/packages/services/src/kernel/kernel.ts +++ b/packages/services/src/kernel/kernel.ts @@ -323,7 +323,7 @@ namespace Kernel { * callback will be overidden. A registered comm target handler will take * precedence over a comm which specifies a `target_module`. */ - registerCommTarget(targetName: string, callback: (comm: Kernel.IComm, msg: KernelMessage.ICommOpenMsg) => void): IDisposable; + registerCommTarget(targetName: string, callback: (comm: Kernel.IComm, msg: KernelMessage.ICommOpenMsg) => Promise): IDisposable; /** * Register an IOPub message hook. @@ -694,18 +694,30 @@ namespace Kernel { /** * The reply handler for the kernel future. + * + * #### Notes + * If the handler returns a promise, message processing pauses until the + * promise is resolved. */ - onReply: (msg: KernelMessage.IShellMessage) => void; + onReply: (msg: KernelMessage.IShellMessage) => Promise | void; /** * The stdin handler for the kernel future. + * + * #### Notes + * If the handler returns a promise, message processing pauses until the + * promise is resolved. */ - onStdin: (msg: KernelMessage.IStdinMessage) => void; + onStdin: (msg: KernelMessage.IStdinMessage) => Promise | void; /** * The iopub handler for the kernel future. + * + * #### Notes + * If the handler returns a promise, message processing pauses until the + * promise is resolved. */ - onIOPub: (msg: KernelMessage.IIOPubMessage) => void; + onIOPub: (msg: KernelMessage.IIOPubMessage) => Promise | void; /** * Register hook for IOPub messages. @@ -760,12 +772,12 @@ namespace Kernel { * This is called when the comm is closed from either the server or * client. */ - onClose: (msg: KernelMessage.ICommCloseMsg) => void; + onClose: (msg: KernelMessage.ICommCloseMsg) => Promise | void; /** * Callback for a comm message received event. */ - onMsg: (msg: KernelMessage.ICommMsgMsg) => void; + onMsg: (msg: KernelMessage.ICommMsgMsg) => Promise | void; /** * Open a comm with optional data and metadata.