From 9b654f5f2cdf993e7cf7d80728dbfc67629353ff Mon Sep 17 00:00:00 2001 From: Steven Silvester Date: Mon, 5 Mar 2018 10:17:50 -0600 Subject: [PATCH] Make comms synchronous --- packages/services/src/kernel/default.ts | 76 +++++++--------- packages/services/src/kernel/kernel.ts | 6 +- packages/services/test/run-test.py | 2 +- .../services/test/src/kernel/comm.spec.ts | 88 ++++++++++--------- .../services/test/src/kernel/ikernel.spec.ts | 13 +-- 5 files changed, 87 insertions(+), 98 deletions(-) diff --git a/packages/services/src/kernel/default.ts b/packages/services/src/kernel/default.ts index b3b9b50219f4..9143a05717d3 100644 --- a/packages/services/src/kernel/default.ts +++ b/packages/services/src/kernel/default.ts @@ -78,7 +78,6 @@ class DefaultKernel implements Kernel.IKernel { this._username = options.username || ''; this._futures = new Map(); this._commPromises = new Map>(); - this._comms = new Map(); this._createSocket(); Private.runningKernels.push(this); } @@ -229,8 +228,10 @@ class DefaultKernel implements Kernel.IKernel { this._futures.forEach((future, key) => { future.dispose(); }); - this._comms.forEach((comm, key) => { - comm.dispose(); + this._commPromises.forEach((promise, key) => { + promise.then(comm => { + comm.dispose(); + }); }); this._displayIdToParentIds.clear(); this._msgIdToDisplayIds.clear(); @@ -617,16 +618,21 @@ class DefaultKernel implements Kernel.IKernel { * #### Notes * If a client-side comm already exists, it is returned. */ - connectToComm(targetName: string, commId?: string): Kernel.IComm { + connectToComm(targetName: string, commId?: string): Promise { let id = commId || uuid(); - let comm = this._comms.get(id) || new CommHandler( - targetName, - id, - this, - () => { this._unregisterComm(id); } - ); - this._comms.set(id, comm); - return comm; + if (this._commPromises.has(id)) { + return this._commPromises.get(id); + } + let promise = Promise.resolve(void 0).then(() => { + return new CommHandler( + targetName, + id, + this, + () => { this._unregisterComm(id); } + ); + }); + this._commPromises.set(id, promise); + return promise; } /** @@ -756,12 +762,13 @@ class DefaultKernel implements Kernel.IKernel { this._futures.forEach((future, key) => { future.dispose(); }); - this._comms.forEach((comm, key) => { - comm.dispose(); + this._commPromises.forEach((promise, key) => { + promise.then(comm => { + comm.dispose(); + }); }); this._futures = new Map(); this._commPromises = new Map>(); - this._comms = new Map(); this._displayIdToParentIds.clear(); this._msgIdToDisplayIds.clear(); } @@ -794,8 +801,6 @@ class DefaultKernel implements Kernel.IKernel { if (this.isDisposed) { return; } - this._commPromises.delete(comm.commId); - this._comms.set(comm.commId, comm); return comm; }); }); @@ -809,12 +814,8 @@ class DefaultKernel implements Kernel.IKernel { let content = msg.content; let promise = this._commPromises.get(content.comm_id); if (!promise) { - let comm = this._comms.get(content.comm_id); - if (!comm) { - console.error('Comm not found for comm id ' + content.comm_id); - return; - } - promise = Promise.resolve(comm); + console.error('Comm not found for comm id ' + content.comm_id); + return; } promise.then((comm) => { if (!comm) { @@ -840,38 +841,28 @@ class DefaultKernel implements Kernel.IKernel { let content = msg.content; let promise = this._commPromises.get(content.comm_id); if (!promise) { - let comm = this._comms.get(content.comm_id); + // We do have a registered comm for this comm id, ignore. + return; + } + promise.then((comm) => { if (!comm) { - // We do have a registered comm for this comm id, ignore. return; - } else { + } + try { let onMsg = comm.onMsg; if (onMsg) { onMsg(msg); } + } catch (e) { + console.error('Exception handling comm msg: ', e, e.stack, msg); } - } else { - promise.then((comm) => { - if (!comm) { - return; - } - try { - let onMsg = comm.onMsg; - if (onMsg) { - onMsg(msg); - } - } catch (e) { - console.error('Exception handling comm msg: ', e, e.stack, msg); - } - }); - } + }); } /** * Unregister a comm instance. */ private _unregisterComm(commId: string) { - this._comms.delete(commId); this._commPromises.delete(commId); } @@ -1034,7 +1025,6 @@ class DefaultKernel implements Kernel.IKernel { private _isReady = false; private _futures: Map; private _commPromises: Map>; - private _comms: Map; private _targetRegistry: { [key: string]: (comm: Kernel.IComm, msg: KernelMessage.ICommOpenMsg) => void; } = Object.create(null); private _info: KernelMessage.IInfoReply | null = null; private _pendingMessages: KernelMessage.IMessage[] = []; diff --git a/packages/services/src/kernel/kernel.ts b/packages/services/src/kernel/kernel.ts index 5921c5b9df5d..26d2d9f941de 100644 --- a/packages/services/src/kernel/kernel.ts +++ b/packages/services/src/kernel/kernel.ts @@ -302,12 +302,12 @@ namespace Kernel { * * @param id - The comm id. * - * @returns A comm instance. + * @returns A promise that resolves with a comm instance. * * #### Notes - * If a client-side comm already exists, it is returned. + * If a promise to client-side comm already exists, it is returned. */ - connectToComm(targetName: string, commId?: string): Kernel.IComm; + connectToComm(targetName: string, commId?: string): Promise; /** * Register a comm target handler. diff --git a/packages/services/test/run-test.py b/packages/services/test/run-test.py index 5bbb8ba53e6b..bae27de0aa89 100644 --- a/packages/services/test/run-test.py +++ b/packages/services/test/run-test.py @@ -27,7 +27,7 @@ def get_command(self): terminalsAvailable = self.web_app.settings['terminals_available'] mocha = os.path.join(HERE, '..', 'node_modules', '.bin', '_mocha') mocha = os.path.realpath(mocha) - defaults = ['build/**/*.spec.js', 'build/*.spec.js'] + defaults = ['build/**/ikernel.spec.js', 'build/a.spec.js'] defaults += ['--retries', '2', '--jupyter-config-data=./build/config.json'] default_timeout = ['--timeout', '20000'] diff --git a/packages/services/test/src/kernel/comm.spec.ts b/packages/services/test/src/kernel/comm.spec.ts index 3585031d5a60..317364fda128 100644 --- a/packages/services/test/src/kernel/comm.spec.ts +++ b/packages/services/test/src/kernel/comm.spec.ts @@ -70,36 +70,42 @@ describe('jupyter.services - Comm', () => { context('#connectToComm()', () => { it('should create an instance of IComm', () => { - let comm = kernel.connectToComm('test'); - expect(comm.targetName).to.be('test'); - expect(typeof comm.commId).to.be('string'); + return kernel.connectToComm('test').then(comm => { + expect(comm.targetName).to.be('test'); + expect(typeof comm.commId).to.be('string'); + }); }); it('should use the given id', () => { - let comm = kernel.connectToComm('test', '1234'); - expect(comm.targetName).to.be('test'); - expect(comm.commId).to.be('1234'); + return kernel.connectToComm('test', '1234').then(comm => { + expect(comm.targetName).to.be('test'); + expect(comm.commId).to.be('1234'); + }); }); it('should create an instance of IComm', () => { - let comm = kernel.connectToComm('test', '1234'); - expect(comm.targetName).to.be('test'); - expect(comm.commId).to.be('1234'); + return kernel.connectToComm('test', '1234').then(comm => { + expect(comm.targetName).to.be('test'); + expect(comm.commId).to.be('1234'); + }); }); it('should use the given id', () => { - let comm = kernel.connectToComm('test', '1234'); - expect(comm.targetName).to.be('test'); - expect(comm.commId).to.be('1234'); + return kernel.connectToComm('test', '1234').then(comm => { + expect(comm.targetName).to.be('test'); + expect(comm.commId).to.be('1234'); + }); }); it('should reuse an existing comm', (done) => { - let comm = kernel.connectToComm('test', '1234'); - comm.onClose = () => { - done(); - }; - let comm2 = kernel.connectToComm('test', '1234'); - comm2.close(); // should trigger comm to close + kernel.connectToComm('test', '1234').then(comm => { + comm.onClose = () => { + done(); + }; + kernel.connectToComm('test', '1234').then(comm2 => { + comm2.close(); // should trigger comm to close + }); + }); }); }); @@ -110,7 +116,6 @@ describe('jupyter.services - Comm', () => { disposable.dispose(); let content = msg.content; expect(content.data).to.be('hello'); - kernel.connectToComm(content.target_name, content.comm_id); comm.dispose(); done(); }); @@ -149,28 +154,31 @@ describe('jupyter.services - Comm', () => { context('#isDisposed', () => { it('should be true after we dispose of the comm', () => { - let comm = kernel.connectToComm('test'); - expect(comm.isDisposed).to.be(false); - comm.dispose(); - expect(comm.isDisposed).to.be(true); + return kernel.connectToComm('test').then(comm => { + expect(comm.isDisposed).to.be(false); + comm.dispose(); + expect(comm.isDisposed).to.be(true); + }); }); it('should be safe to call multiple times', () => { - let comm = kernel.connectToComm('test'); - expect(comm.isDisposed).to.be(false); - expect(comm.isDisposed).to.be(false); - comm.dispose(); - expect(comm.isDisposed).to.be(true); - expect(comm.isDisposed).to.be(true); + return kernel.connectToComm('test').then(comm => { + expect(comm.isDisposed).to.be(false); + expect(comm.isDisposed).to.be(false); + comm.dispose(); + expect(comm.isDisposed).to.be(true); + expect(comm.isDisposed).to.be(true); + }); }); }); context('#dispose()', () => { it('should dispose of the resources held by the comm', () => { - let comm = kernel.connectToComm('foo'); - comm.dispose(); - expect(comm.isDisposed).to.be(true); + return kernel.connectToComm('foo').then(comm => { + comm.dispose(); + expect(comm.isDisposed).to.be(true); + }); }); }); @@ -180,10 +188,15 @@ describe('jupyter.services - Comm', () => { let comm: Kernel.IComm; + beforeEach(() => { + return kernel.connectToComm('test').then(c => { + comm = c; + }); + }); + context('#id', () => { it('should be a string', () => { - comm = kernel.connectToComm('test'); expect(typeof comm.commId).to.be('string'); }); @@ -192,7 +205,6 @@ describe('jupyter.services - Comm', () => { context('#name', () => { it('should be a string', () => { - comm = kernel.connectToComm('test'); expect(comm.targetName).to.be('test'); }); @@ -201,7 +213,6 @@ describe('jupyter.services - Comm', () => { context('#onClose', () => { it('should be readable and writable function', (done) => { - comm = kernel.connectToComm('test'); expect(comm.onClose).to.be(undefined); comm.onClose = msg => { done(); @@ -223,7 +234,6 @@ describe('jupyter.services - Comm', () => { context('#onMsg', () => { it('should be readable and writable function', (done) => { - comm = kernel.connectToComm('test'); comm.onMsg = (msg) => { done(); }; @@ -254,7 +264,6 @@ describe('jupyter.services - Comm', () => { it('should send a message to the server', () => { let future = kernel.requestExecute({ code: TARGET }); future.done.then(() => { - comm = kernel.connectToComm('test'); let encoder = new TextEncoder('utf8'); let data = encoder.encode('hello'); future = comm.open({ foo: 'bar' }, { fizz: 'buzz' }, [data, data.buffer]); @@ -267,7 +276,6 @@ describe('jupyter.services - Comm', () => { context('#send()', () => { it('should send a message to the server', () => { - comm = kernel.connectToComm('test'); return comm.open().done.then(() => { let future = comm.send({ foo: 'bar' }, { fizz: 'buzz' }); return future.done; @@ -275,7 +283,6 @@ describe('jupyter.services - Comm', () => { }); it('should pass through a buffers field', () => { - comm = kernel.connectToComm('test'); return comm.open().done.then(() => { let future = comm.send({ buffers: 'bar' }); return future.done; @@ -287,7 +294,6 @@ describe('jupyter.services - Comm', () => { context('#close()', () => { it('should send a message to the server', () => { - comm = kernel.connectToComm('test'); return comm.open().done.then(() => { let encoder = new TextEncoder('utf8'); let data = encoder.encode('hello'); @@ -297,7 +303,6 @@ describe('jupyter.services - Comm', () => { }); it('should trigger an onClose', (done) => { - comm = kernel.connectToComm('test'); comm.open().done.then(() => { comm.onClose = (msg: KernelMessage.ICommCloseMsg) => { expect(msg.content.data).to.eql({ foo: 'bar' }); @@ -309,7 +314,6 @@ describe('jupyter.services - Comm', () => { }); it('should not send subsequent messages', () => { - comm = kernel.connectToComm('test'); return comm.open().done.then(() => { return comm.close({ foo: 'bar' }).done; }).then(() => { diff --git a/packages/services/test/src/kernel/ikernel.spec.ts b/packages/services/test/src/kernel/ikernel.spec.ts index 8429835db561..3cb030b98c6e 100644 --- a/packages/services/test/src/kernel/ikernel.spec.ts +++ b/packages/services/test/src/kernel/ikernel.spec.ts @@ -384,28 +384,21 @@ describe('Kernel.IKernel', () => { it('should dispose of the resources held by the kernel', () => { return Kernel.connectTo(defaultKernel.model).then(kernel => { let future = kernel.requestExecute({ code: 'foo' }); - let comm = kernel.connectToComm('foo'); expect(future.isDisposed).to.be(false); - expect(comm.isDisposed).to.be(false); kernel.dispose(); expect(future.isDisposed).to.be(true); - expect(comm.isDisposed).to.be(true); }); }); it('should be safe to call twice', () => { return Kernel.connectTo(defaultKernel.model).then(kernel => { let future = kernel.requestExecute({ code: 'foo' }); - let comm = kernel.connectToComm('foo'); expect(future.isDisposed).to.be(false); - expect(comm.isDisposed).to.be(false); kernel.dispose(); expect(future.isDisposed).to.be(true); - expect(comm.isDisposed).to.be(true); expect(kernel.isDisposed).to.be(true); kernel.dispose(); expect(future.isDisposed).to.be(true); - expect(comm.isDisposed).to.be(true); expect(kernel.isDisposed).to.be(true); }); }); @@ -575,11 +568,13 @@ describe('Kernel.IKernel', () => { it('should dispose of existing comm and future objects', () => { let kernel = defaultKernel; - let comm = kernel.connectToComm('test'); + let commFuture = kernel.connectToComm('test'); let future = kernel.requestExecute({ code: 'foo' }); return kernel.restart().then(() => { - expect(comm.isDisposed).to.be(true); expect(future.isDisposed).to.be(true); + return commFuture; + }).then(comm => { + expect(comm.isDisposed).to.be(true); }); });