Skip to content

Commit

Permalink
First pass at making all kernel message handling asynchronous and pre…
Browse files Browse the repository at this point in the history
…serving message order.

Fixes jupyterlab#4188
  • Loading branch information
jasongrout committed Jun 8, 2018
1 parent c080cd5 commit 5067597
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 143 deletions.
12 changes: 6 additions & 6 deletions packages/services/src/kernel/comm.ts
Expand Up @@ -56,7 +56,7 @@ class CommHandler extends DisposableDelegate implements Kernel.IComm {
*
* **See also:** [[ICommClose]], [[close]]
*/
get onClose(): (msg: KernelMessage.ICommCloseMsg) => void {
get onClose(): (msg: KernelMessage.ICommCloseMsg) => Promise<void> | void {
return this._onClose;
}

Expand All @@ -69,21 +69,21 @@ class CommHandler extends DisposableDelegate implements Kernel.IComm {
*
* **See also:** [[close]]
*/
set onClose(cb: (msg: KernelMessage.ICommCloseMsg) => void) {
set onClose(cb: (msg: KernelMessage.ICommCloseMsg) => Promise<void> | void) {
this._onClose = cb;
}

/**
* Get the callback for a comm message received event.
*/
get onMsg(): (msg: KernelMessage.ICommMsgMsg) => void {
get onMsg(): (msg: KernelMessage.ICommMsgMsg) => Promise<void> | void {
return this._onMsg;
}

/**
* Set the callback for a comm message received event.
*/
set onMsg(cb: (msg: KernelMessage.ICommMsgMsg) => void) {
set onMsg(cb: (msg: KernelMessage.ICommMsgMsg) => Promise<void> | void) {
this._onMsg = cb;
}

Expand Down Expand Up @@ -180,6 +180,6 @@ class CommHandler extends DisposableDelegate implements Kernel.IComm {
private _target = '';
private _id = '';
private _kernel: Kernel.IKernel;
private _onClose: (msg: KernelMessage.ICommCloseMsg) => void;
private _onMsg: (msg: KernelMessage.ICommMsgMsg) => void;
private _onClose: (msg: KernelMessage.ICommCloseMsg) => Promise<void> | void;
private _onMsg: (msg: KernelMessage.ICommMsgMsg) => Promise<void> | void;
}
180 changes: 93 additions & 87 deletions packages/services/src/kernel/default.ts
Expand Up @@ -77,7 +77,7 @@ class DefaultKernel implements Kernel.IKernel {
this._clientId = options.clientId || uuid();
this._username = options.username || '';
this._futures = new Map<string, KernelFutureHandler>();
this._commPromises = new Map<string, Promise<Kernel.IComm>>();
this._comms = new Map<string, Promise<Kernel.IComm>>();
this._createSocket();
Private.runningKernels.push(this);
}
Expand All @@ -103,6 +103,10 @@ class DefaultKernel implements Kernel.IKernel {

/**
* A signal emitted for iopub kernel messages.
*
* #### Notes
* This signal is emitted after the iopub message is handled, which may
* asynchronously after it is received.
*/
get iopubMessage(): ISignal<this, KernelMessage.IIOPubMessage> {
return this._iopubMessage;
Expand Down Expand Up @@ -238,7 +242,7 @@ class DefaultKernel implements Kernel.IKernel {
this._futures.forEach((future, key) => {
future.dispose();
});
this._commPromises.forEach((promise, key) => {
this._comms.forEach((promise, key) => {
promise.then(comm => {
comm.dispose();
});
Expand Down Expand Up @@ -627,32 +631,33 @@ class DefaultKernel implements Kernel.IKernel {
/**
* Connect to a comm, or create a new one.
*
* TODO: should we change this to a synchronous function? There's nothing
* asynchronous about it now.
*
* #### Notes
* If a client-side comm already exists, it is returned.
*/
connectToComm(targetName: string, commId?: string): Promise<Kernel.IComm> {
async connectToComm(targetName: string, commId?: string): Promise<Kernel.IComm> {
let id = commId || uuid();
if (this._commPromises.has(id)) {
return this._commPromises.get(id);
if (this._comms.has(id)) {
return this._comms.get(id);
}
let promise = Promise.resolve(void 0).then(() => {
return new CommHandler(
targetName,
id,
this,
() => { this._unregisterComm(id); }
);
});
this._commPromises.set(id, promise);
return promise;
let comm = new CommHandler(
targetName,
id,
this,
() => { this._unregisterComm(id); }
);
this._comms.set(id, comm);
return comm;
}

/**
* Handle a message with a display id.
*
* @returns Whether the message was handled.
*/
private _handleDisplayId(displayId: string, msg: KernelMessage.IMessage): boolean {
private async _handleDisplayId(displayId: string, msg: KernelMessage.IMessage): Promise<boolean> {
let msgId = (msg.parent_header as KernelMessage.IHeader).msg_id;
let parentIds = this._displayIdToParentIds.get(displayId);
if (parentIds) {
Expand All @@ -668,12 +673,12 @@ class DefaultKernel implements Kernel.IKernel {
};
(updateMsg.header as any).msg_type = 'update_display_data';

parentIds.map((parentId) => {
await Promise.all(parentIds.map(async (parentId) => {
let future = this._futures && this._futures.get(parentId);
if (future) {
future.handleMsg(updateMsg);
await future.handleMsg(updateMsg);
}
});
}));
}

// We're done here if it's update_display.
Expand All @@ -697,7 +702,7 @@ class DefaultKernel implements Kernel.IKernel {
}
this._msgIdToDisplayIds.set(msgId, displayIds);

// Let it propagate to the intended recipient.
// Let the message propagate to the intended recipient.
return false;
}

Expand Down Expand Up @@ -774,108 +779,91 @@ class DefaultKernel implements Kernel.IKernel {
this._futures.forEach((future, key) => {
future.dispose();
});
this._commPromises.forEach((promise, key) => {
this._comms.forEach((promise, key) => {
promise.then(comm => {
comm.dispose();
});
});
this._futures = new Map<string, KernelFutureHandler>();
this._commPromises = new Map<string, Promise<Kernel.IComm>>();
this._comms = new Map<string, Promise<Kernel.IComm>>();
this._displayIdToParentIds.clear();
this._msgIdToDisplayIds.clear();
}

/**
* Handle a `comm_open` kernel message.
*/
private _handleCommOpen(msg: KernelMessage.ICommOpenMsg): void {
private async _handleCommOpen(msg: KernelMessage.ICommOpenMsg): Promise<void> {
let content = msg.content;
if (this.isDisposed) {
return;
}
let promise = Private.loadObject(content.target_name, content.target_module,
this._targetRegistry).then(target => {
let comm = new CommHandler(
content.target_name,
content.comm_id,
this,
() => { this._unregisterComm(content.comm_id); }
);
let response : any;
try {
response = target(comm, msg);
} catch (e) {
comm.close();
console.error('Exception opening new comm');
throw(e);
}
return Promise.resolve(response).then(() => {
if (this.isDisposed) {
return;
}
return comm;
});
});
this._commPromises.set(content.comm_id, promise);
let target = await Private.loadObject(content.target_name, content.target_module, this._targetRegistry);
let comm = new CommHandler(
content.target_name,
content.comm_id,
this,
() => { this._unregisterComm(content.comm_id); }
);
try {
await target(comm, msg);
} catch (e) {
// TODO: do we need to await this?
await comm.close();
console.error('Exception opening new comm');
throw(e);
}
// TODO: do we need to check if the comm is disposed?
this._comms.set(content.comm_id, comm);
}

/**
* Handle 'comm_close' kernel message.
*/
private _handleCommClose(msg: KernelMessage.ICommCloseMsg): void {
private async _handleCommClose(msg: KernelMessage.ICommCloseMsg): Promise<void> {
let content = msg.content;
let promise = this._commPromises.get(content.comm_id);
if (!promise) {
let comm = this._comms.get(content.comm_id);
if (!comm) {
console.error('Comm not found for comm id ' + content.comm_id);
return;
}
promise.then((comm) => {
if (!comm) {
return;
}
this._unregisterComm(comm.commId);
try {
let onClose = comm.onClose;
if (onClose) {
onClose(msg);
}
(comm as CommHandler).dispose();
} catch (e) {
console.error('Exception closing comm: ', e, e.stack, msg);
this._unregisterComm(comm.commId);
try {
let onClose = comm.onClose;
if (onClose) {
await onClose(msg);
}
});
(comm as CommHandler).dispose();
} catch (e) {
console.error('Exception closing comm: ', e, e.stack, msg);
}
}

/**
* Handle a 'comm_msg' kernel message.
*/
private _handleCommMsg(msg: KernelMessage.ICommMsgMsg): void {
private async _handleCommMsg(msg: KernelMessage.ICommMsgMsg): Promise<void> {
let content = msg.content;
let promise = this._commPromises.get(content.comm_id);
if (!promise) {
let comm = this._comms.get(content.comm_id);
if (!comm) {
// We do have a registered comm for this comm id, ignore.
return;
}
promise.then((comm) => {
if (!comm) {
return;
}
try {
let onMsg = comm.onMsg;
if (onMsg) {
onMsg(msg);
}
} catch (e) {
console.error('Exception handling comm msg: ', e, e.stack, msg);
try {
let onMsg = comm.onMsg;
if (onMsg) {
await onMsg(msg);
}
});
} catch (e) {
console.error('Exception handling comm msg: ', e, e.stack, msg);
}
}

/**
* Unregister a comm instance.
*/
private _unregisterComm(commId: string) {
this._commPromises.delete(commId);
this._comms.delete(commId);
}

/**
Expand Down Expand Up @@ -933,12 +921,16 @@ class DefaultKernel implements Kernel.IKernel {

/**
* Handle a websocket message, validating and routing appropriately.
*
* TODO: convert to asynchronous processing.
*/
private _onWSMessage = (evt: MessageEvent) => {
if (this._wsStopped) {
// If the socket is being closed, ignore any messages
return;
}

// Notify immediately if there is an error with the message.
let msg = serialize.deserialize(evt.data);
try {
validate.validateMessage(msg);
Expand All @@ -947,8 +939,21 @@ class DefaultKernel implements Kernel.IKernel {
return;
}

// Handle the message asynchronously, in the order received.
this._msgChain = this._msgChain.then(() => {
return this._handleMessage(msg);
});

// TODO: should this emit happen asynchronously? Should we have two events,
// one for a message received here, and then another after it is handled
// asynchronously?
this._anyMessage.emit({msg, direction: 'recv'});
}

private async _handleMessage(msg: KernelMessage.IMessage): Promise<void> {
let handled = false;

// Check to see if we have a display_id we need to reroute.
if (msg.parent_header && msg.channel === 'iopub') {
switch (msg.header.msg_type) {
case 'display_data':
Expand All @@ -958,7 +963,7 @@ class DefaultKernel implements Kernel.IKernel {
let transient = (msg.content.transient || {}) as JSONObject;
let displayId = transient['display_id'] as string;
if (displayId) {
handled = this._handleDisplayId(displayId, msg);
handled = await this._handleDisplayId(displayId, msg);
}
break;
default:
Expand All @@ -970,7 +975,7 @@ class DefaultKernel implements Kernel.IKernel {
let parentHeader = msg.parent_header as KernelMessage.IHeader;
let future = this._futures && this._futures.get(parentHeader.msg_id);
if (future) {
future.handleMsg(msg);
await future.handleMsg(msg);
} else {
// If the message was sent by us and was not iopub, it is orphaned.
let owned = parentHeader.session === this.clientId;
Expand All @@ -982,23 +987,23 @@ class DefaultKernel implements Kernel.IKernel {
if (msg.channel === 'iopub') {
switch (msg.header.msg_type) {
case 'status':
// Updating the status is synchronous, and we call no user code
this._updateStatus((msg as KernelMessage.IStatusMsg).content.execution_state);
break;
case 'comm_open':
this._handleCommOpen(msg as KernelMessage.ICommOpenMsg);
await this._handleCommOpen(msg as KernelMessage.ICommOpenMsg);
break;
case 'comm_msg':
this._handleCommMsg(msg as KernelMessage.ICommMsgMsg);
await this._handleCommMsg(msg as KernelMessage.ICommMsgMsg);
break;
case 'comm_close':
this._handleCommClose(msg as KernelMessage.ICommCloseMsg);
await this._handleCommClose(msg as KernelMessage.ICommCloseMsg);
break;
default:
break;
}
this._iopubMessage.emit(msg as KernelMessage.IIOPubMessage);
}
this._anyMessage.emit({msg, direction: 'recv'});
}

/**
Expand Down Expand Up @@ -1037,7 +1042,7 @@ class DefaultKernel implements Kernel.IKernel {
private _reconnectAttempt = 0;
private _isReady = false;
private _futures: Map<string, KernelFutureHandler>;
private _commPromises: Map<string, Promise<Kernel.IComm | undefined>>;
private _comms: Map<string, Kernel.IComm>;
private _targetRegistry: { [key: string]: (comm: Kernel.IComm, msg: KernelMessage.ICommOpenMsg) => void; } = Object.create(null);
private _info: KernelMessage.IInfoReply | null = null;
private _pendingMessages: KernelMessage.IMessage[] = [];
Expand All @@ -1050,6 +1055,7 @@ class DefaultKernel implements Kernel.IKernel {
private _displayIdToParentIds = new Map<string, string[]>();
private _msgIdToDisplayIds = new Map<string, string[]>();
private _terminated = new Signal<this, void>(this);
private _msgChain = Promise.resolve();
private _noOp = () => { /* no-op */};
}

Expand Down

0 comments on commit 5067597

Please sign in to comment.