Skip to content

Commit

Permalink
Merge pull request #4115 from blink1073/sync-comm
Browse files Browse the repository at this point in the history
Make comms asynchronous
  • Loading branch information
jasongrout committed Mar 15, 2018
2 parents 8de8cf6 + 9b654f5 commit c9c199c
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 98 deletions.
76 changes: 33 additions & 43 deletions packages/services/src/kernel/default.ts
Expand Up @@ -78,7 +78,6 @@ class DefaultKernel implements Kernel.IKernel {
this._username = options.username || '';
this._futures = new Map<string, KernelFutureHandler>();
this._commPromises = new Map<string, Promise<Kernel.IComm>>();
this._comms = new Map<string, Kernel.IComm>();
this._createSocket();
Private.runningKernels.push(this);
}
Expand Down Expand Up @@ -229,8 +228,10 @@ class DefaultKernel implements Kernel.IKernel {
this._futures.forEach((future, key) => {
future.dispose();
});
this._comms.forEach((comm, key) => {
comm.dispose();
this._commPromises.forEach((promise, key) => {
promise.then(comm => {
comm.dispose();
});
});
this._displayIdToParentIds.clear();
this._msgIdToDisplayIds.clear();
Expand Down Expand Up @@ -617,16 +618,21 @@ class DefaultKernel implements Kernel.IKernel {
* #### Notes
* If a client-side comm already exists, it is returned.
*/
connectToComm(targetName: string, commId?: string): Kernel.IComm {
connectToComm(targetName: string, commId?: string): Promise<Kernel.IComm> {
let id = commId || uuid();
let comm = this._comms.get(id) || new CommHandler(
targetName,
id,
this,
() => { this._unregisterComm(id); }
);
this._comms.set(id, comm);
return comm;
if (this._commPromises.has(id)) {
return this._commPromises.get(id);
}
let promise = Promise.resolve(void 0).then(() => {
return new CommHandler(
targetName,
id,
this,
() => { this._unregisterComm(id); }
);
});
this._commPromises.set(id, promise);
return promise;
}

/**
Expand Down Expand Up @@ -756,12 +762,13 @@ class DefaultKernel implements Kernel.IKernel {
this._futures.forEach((future, key) => {
future.dispose();
});
this._comms.forEach((comm, key) => {
comm.dispose();
this._commPromises.forEach((promise, key) => {
promise.then(comm => {
comm.dispose();
});
});
this._futures = new Map<string, KernelFutureHandler>();
this._commPromises = new Map<string, Promise<Kernel.IComm>>();
this._comms = new Map<string, Kernel.IComm>();
this._displayIdToParentIds.clear();
this._msgIdToDisplayIds.clear();
}
Expand Down Expand Up @@ -794,8 +801,6 @@ class DefaultKernel implements Kernel.IKernel {
if (this.isDisposed) {
return;
}
this._commPromises.delete(comm.commId);
this._comms.set(comm.commId, comm);
return comm;
});
});
Expand All @@ -809,12 +814,8 @@ class DefaultKernel implements Kernel.IKernel {
let content = msg.content;
let promise = this._commPromises.get(content.comm_id);
if (!promise) {
let comm = this._comms.get(content.comm_id);
if (!comm) {
console.error('Comm not found for comm id ' + content.comm_id);
return;
}
promise = Promise.resolve(comm);
console.error('Comm not found for comm id ' + content.comm_id);
return;
}
promise.then((comm) => {
if (!comm) {
Expand All @@ -840,38 +841,28 @@ class DefaultKernel implements Kernel.IKernel {
let content = msg.content;
let promise = this._commPromises.get(content.comm_id);
if (!promise) {
let comm = this._comms.get(content.comm_id);
// We do have a registered comm for this comm id, ignore.
return;
}
promise.then((comm) => {
if (!comm) {
// We do have a registered comm for this comm id, ignore.
return;
} else {
}
try {
let onMsg = comm.onMsg;
if (onMsg) {
onMsg(msg);
}
} catch (e) {
console.error('Exception handling comm msg: ', e, e.stack, msg);
}
} else {
promise.then((comm) => {
if (!comm) {
return;
}
try {
let onMsg = comm.onMsg;
if (onMsg) {
onMsg(msg);
}
} catch (e) {
console.error('Exception handling comm msg: ', e, e.stack, msg);
}
});
}
});
}

/**
* Unregister a comm instance.
*/
private _unregisterComm(commId: string) {
this._comms.delete(commId);
this._commPromises.delete(commId);
}

Expand Down Expand Up @@ -1034,7 +1025,6 @@ class DefaultKernel implements Kernel.IKernel {
private _isReady = false;
private _futures: Map<string, KernelFutureHandler>;
private _commPromises: Map<string, Promise<Kernel.IComm | undefined>>;
private _comms: Map<string, Kernel.IComm>;
private _targetRegistry: { [key: string]: (comm: Kernel.IComm, msg: KernelMessage.ICommOpenMsg) => void; } = Object.create(null);
private _info: KernelMessage.IInfoReply | null = null;
private _pendingMessages: KernelMessage.IMessage[] = [];
Expand Down
6 changes: 3 additions & 3 deletions packages/services/src/kernel/kernel.ts
Expand Up @@ -302,12 +302,12 @@ namespace Kernel {
*
* @param id - The comm id.
*
* @returns A comm instance.
* @returns A promise that resolves with a comm instance.
*
* #### Notes
* If a client-side comm already exists, it is returned.
* If a promise to client-side comm already exists, it is returned.
*/
connectToComm(targetName: string, commId?: string): Kernel.IComm;
connectToComm(targetName: string, commId?: string): Promise<Kernel.IComm>;

/**
* Register a comm target handler.
Expand Down
2 changes: 1 addition & 1 deletion packages/services/test/run-test.py
Expand Up @@ -27,7 +27,7 @@ def get_command(self):
terminalsAvailable = self.web_app.settings['terminals_available']
mocha = os.path.join(HERE, '..', 'node_modules', '.bin', '_mocha')
mocha = os.path.realpath(mocha)
defaults = ['build/**/*.spec.js', 'build/*.spec.js']
defaults = ['build/**/ikernel.spec.js', 'build/a.spec.js']
defaults += ['--retries', '2',
'--jupyter-config-data=./build/config.json']
default_timeout = ['--timeout', '20000']
Expand Down
88 changes: 46 additions & 42 deletions packages/services/test/src/kernel/comm.spec.ts
Expand Up @@ -70,36 +70,42 @@ describe('jupyter.services - Comm', () => {
context('#connectToComm()', () => {

it('should create an instance of IComm', () => {
let comm = kernel.connectToComm('test');
expect(comm.targetName).to.be('test');
expect(typeof comm.commId).to.be('string');
return kernel.connectToComm('test').then(comm => {
expect(comm.targetName).to.be('test');
expect(typeof comm.commId).to.be('string');
});
});

it('should use the given id', () => {
let comm = kernel.connectToComm('test', '1234');
expect(comm.targetName).to.be('test');
expect(comm.commId).to.be('1234');
return kernel.connectToComm('test', '1234').then(comm => {
expect(comm.targetName).to.be('test');
expect(comm.commId).to.be('1234');
});
});

it('should create an instance of IComm', () => {
let comm = kernel.connectToComm('test', '1234');
expect(comm.targetName).to.be('test');
expect(comm.commId).to.be('1234');
return kernel.connectToComm('test', '1234').then(comm => {
expect(comm.targetName).to.be('test');
expect(comm.commId).to.be('1234');
});
});

it('should use the given id', () => {
let comm = kernel.connectToComm('test', '1234');
expect(comm.targetName).to.be('test');
expect(comm.commId).to.be('1234');
return kernel.connectToComm('test', '1234').then(comm => {
expect(comm.targetName).to.be('test');
expect(comm.commId).to.be('1234');
});
});

it('should reuse an existing comm', (done) => {
let comm = kernel.connectToComm('test', '1234');
comm.onClose = () => {
done();
};
let comm2 = kernel.connectToComm('test', '1234');
comm2.close(); // should trigger comm to close
kernel.connectToComm('test', '1234').then(comm => {
comm.onClose = () => {
done();
};
kernel.connectToComm('test', '1234').then(comm2 => {
comm2.close(); // should trigger comm to close
});
});
});
});

Expand All @@ -110,7 +116,6 @@ describe('jupyter.services - Comm', () => {
disposable.dispose();
let content = msg.content;
expect(content.data).to.be('hello');
kernel.connectToComm(content.target_name, content.comm_id);
comm.dispose();
done();
});
Expand Down Expand Up @@ -149,28 +154,31 @@ describe('jupyter.services - Comm', () => {
context('#isDisposed', () => {

it('should be true after we dispose of the comm', () => {
let comm = kernel.connectToComm('test');
expect(comm.isDisposed).to.be(false);
comm.dispose();
expect(comm.isDisposed).to.be(true);
return kernel.connectToComm('test').then(comm => {
expect(comm.isDisposed).to.be(false);
comm.dispose();
expect(comm.isDisposed).to.be(true);
});
});

it('should be safe to call multiple times', () => {
let comm = kernel.connectToComm('test');
expect(comm.isDisposed).to.be(false);
expect(comm.isDisposed).to.be(false);
comm.dispose();
expect(comm.isDisposed).to.be(true);
expect(comm.isDisposed).to.be(true);
return kernel.connectToComm('test').then(comm => {
expect(comm.isDisposed).to.be(false);
expect(comm.isDisposed).to.be(false);
comm.dispose();
expect(comm.isDisposed).to.be(true);
expect(comm.isDisposed).to.be(true);
});
});
});

context('#dispose()', () => {

it('should dispose of the resources held by the comm', () => {
let comm = kernel.connectToComm('foo');
comm.dispose();
expect(comm.isDisposed).to.be(true);
return kernel.connectToComm('foo').then(comm => {
comm.dispose();
expect(comm.isDisposed).to.be(true);
});
});
});

Expand All @@ -180,10 +188,15 @@ describe('jupyter.services - Comm', () => {

let comm: Kernel.IComm;

beforeEach(() => {
return kernel.connectToComm('test').then(c => {
comm = c;
});
});

context('#id', () => {

it('should be a string', () => {
comm = kernel.connectToComm('test');
expect(typeof comm.commId).to.be('string');
});

Expand All @@ -192,7 +205,6 @@ describe('jupyter.services - Comm', () => {
context('#name', () => {

it('should be a string', () => {
comm = kernel.connectToComm('test');
expect(comm.targetName).to.be('test');
});

Expand All @@ -201,7 +213,6 @@ describe('jupyter.services - Comm', () => {
context('#onClose', () => {

it('should be readable and writable function', (done) => {
comm = kernel.connectToComm('test');
expect(comm.onClose).to.be(undefined);
comm.onClose = msg => {
done();
Expand All @@ -223,7 +234,6 @@ describe('jupyter.services - Comm', () => {

context('#onMsg', () => {
it('should be readable and writable function', (done) => {
comm = kernel.connectToComm('test');
comm.onMsg = (msg) => {
done();
};
Expand Down Expand Up @@ -254,7 +264,6 @@ describe('jupyter.services - Comm', () => {
it('should send a message to the server', () => {
let future = kernel.requestExecute({ code: TARGET });
future.done.then(() => {
comm = kernel.connectToComm('test');
let encoder = new TextEncoder('utf8');
let data = encoder.encode('hello');
future = comm.open({ foo: 'bar' }, { fizz: 'buzz' }, [data, data.buffer]);
Expand All @@ -267,15 +276,13 @@ describe('jupyter.services - Comm', () => {
context('#send()', () => {

it('should send a message to the server', () => {
comm = kernel.connectToComm('test');
return comm.open().done.then(() => {
let future = comm.send({ foo: 'bar' }, { fizz: 'buzz' });
return future.done;
});
});

it('should pass through a buffers field', () => {
comm = kernel.connectToComm('test');
return comm.open().done.then(() => {
let future = comm.send({ buffers: 'bar' });
return future.done;
Expand All @@ -287,7 +294,6 @@ describe('jupyter.services - Comm', () => {
context('#close()', () => {

it('should send a message to the server', () => {
comm = kernel.connectToComm('test');
return comm.open().done.then(() => {
let encoder = new TextEncoder('utf8');
let data = encoder.encode('hello');
Expand All @@ -297,7 +303,6 @@ describe('jupyter.services - Comm', () => {
});

it('should trigger an onClose', (done) => {
comm = kernel.connectToComm('test');
comm.open().done.then(() => {
comm.onClose = (msg: KernelMessage.ICommCloseMsg) => {
expect(msg.content.data).to.eql({ foo: 'bar' });
Expand All @@ -309,7 +314,6 @@ describe('jupyter.services - Comm', () => {
});

it('should not send subsequent messages', () => {
comm = kernel.connectToComm('test');
return comm.open().done.then(() => {
return comm.close({ foo: 'bar' }).done;
}).then(() => {
Expand Down

0 comments on commit c9c199c

Please sign in to comment.