From 7ac3fe5fee35209b5ec2e83d257d8ccb9eb9fa16 Mon Sep 17 00:00:00 2001 From: Sergey Petushkov Date: Mon, 1 Feb 2021 19:21:36 +0100 Subject: [PATCH 1/3] feat(node-runtime-worker-thread): Add proxied eventEmitter to the worker runtime --- .../node-runtime-worker-thread/package.json | 1 + .../src/child-process-mongosh-bus.ts | 23 ++++++++ .../src/child-process-proxy.ts | 4 ++ .../src/index.spec.ts | 52 +++++++++++++++---- .../node-runtime-worker-thread/src/index.ts | 17 +++++- .../src/worker-runtime.ts | 17 ++++-- 6 files changed, 100 insertions(+), 14 deletions(-) create mode 100644 packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts diff --git a/packages/node-runtime-worker-thread/package.json b/packages/node-runtime-worker-thread/package.json index 780da41e68..049b704f97 100644 --- a/packages/node-runtime-worker-thread/package.json +++ b/packages/node-runtime-worker-thread/package.json @@ -43,6 +43,7 @@ "@mongosh/browser-runtime-electron": "0.0.0-dev.0", "@mongosh/service-provider-core": "0.0.0-dev.0", "@mongosh/service-provider-server": "0.0.0-dev.0", + "@mongosh/types": "0.0.0-dev.0", "bson": "^4.2.2", "postmsg-rpc": "^2.4.0" } diff --git a/packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts b/packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts new file mode 100644 index 0000000000..f916956e6e --- /dev/null +++ b/packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts @@ -0,0 +1,23 @@ +import { ChildProcess } from 'child_process'; +import { MongoshBus } from '@mongosh/types'; +import { exposeAll, WithClose } from './rpc'; + +export class ChildProcessMongoshBus { + exposedEmitter: WithClose; + + constructor(eventEmitter: MongoshBus, childProcess: ChildProcess) { + const exposedEmitter: WithClose = exposeAll( + { + emit(...args) { + eventEmitter.emit(...args); + }, + on() { + // no-op + return exposedEmitter; + } + }, + childProcess + ); + this.exposedEmitter = exposedEmitter; + } +} diff --git a/packages/node-runtime-worker-thread/src/child-process-proxy.ts b/packages/node-runtime-worker-thread/src/child-process-proxy.ts index 7ef4e4d9fc..0433a04d75 100644 --- a/packages/node-runtime-worker-thread/src/child-process-proxy.ts +++ b/packages/node-runtime-worker-thread/src/child-process-proxy.ts @@ -68,3 +68,7 @@ const evaluationListener = createCaller( ); exposeAll(evaluationListener, workerProcess); + +const messageBus = createCaller(['emit', 'on'], process); + +exposeAll(messageBus, workerProcess); diff --git a/packages/node-runtime-worker-thread/src/index.spec.ts b/packages/node-runtime-worker-thread/src/index.spec.ts index bc9a17ef2d..41955fbd2f 100644 --- a/packages/node-runtime-worker-thread/src/index.spec.ts +++ b/packages/node-runtime-worker-thread/src/index.spec.ts @@ -1,20 +1,31 @@ import chai, { expect } from 'chai'; import sinon from 'sinon'; import sinonChai from 'sinon-chai'; +import { MongoshBus } from '@mongosh/types'; import { startTestServer } from '../../../testing/integration-testing-hooks'; import { WorkerRuntime } from '../dist/index'; chai.use(sinonChai); +function createMockEventEmitter() { + return (sinon.stub({ on() {}, emit() {} }) as unknown) as MongoshBus; +} + describe('WorkerRuntime', () => { + let runtime: WorkerRuntime; + + afterEach(async() => { + if (runtime) { + await runtime.terminate(); + } + }); + describe('evaluate', () => { it('should evaluate and return basic values', async() => { - const runtime = new WorkerRuntime('mongodb://nodb/', {}, { nodb: true }); + runtime = new WorkerRuntime('mongodb://nodb/', {}, { nodb: true }); const result = await runtime.evaluate('1+1'); expect(result.printable).to.equal(2); - - await runtime.terminate(); }); describe('errors', () => { @@ -46,7 +57,9 @@ describe('WorkerRuntime', () => { }); it("should return an error if it's returned from evaluation", async() => { - const { printable } = await runtime.evaluate('new SyntaxError("Syntax!")'); + const { printable } = await runtime.evaluate( + 'new SyntaxError("Syntax!")' + ); expect(printable).to.be.instanceof(Error); expect(printable).to.have.property('name', 'SyntaxError'); @@ -62,12 +75,10 @@ describe('WorkerRuntime', () => { const testServer = startTestServer('shared'); it('should return completions', async() => { - const runtime = new WorkerRuntime(await testServer.connectionString()); + runtime = new WorkerRuntime(await testServer.connectionString()); const completions = await runtime.getCompletions('db.coll1.f'); expect(completions).to.deep.contain({ completion: 'db.coll1.find' }); - - await runtime.terminate(); }); }); @@ -77,7 +88,7 @@ describe('WorkerRuntime', () => { onPrompt: sinon.spy(() => 'password123') }; - const runtime = new WorkerRuntime('mongodb://nodb/', {}, { nodb: true }); + runtime = new WorkerRuntime('mongodb://nodb/', {}, { nodb: true }); runtime.setEvaluationListener(evalListener); @@ -85,8 +96,31 @@ describe('WorkerRuntime', () => { expect(evalListener.onPrompt).to.have.been.called; expect(password.printable).to.equal('password123'); + }); + }); - await runtime.terminate(); + describe('eventEmitter', () => { + const testServer = startTestServer('shared'); + + it('should propagate emitted events from worker', async() => { + const eventEmitter = createMockEventEmitter(); + + runtime = new WorkerRuntime( + await testServer.connectionString(), + {}, + {}, + {}, + eventEmitter + ); + + await runtime.evaluate('db.getCollectionNames()'); + + expect(eventEmitter.emit).to.have.been.calledWith('mongosh:api-call', { + arguments: {}, + class: 'Database', + db: 'test', + method: 'getCollectionNames' + }); }); }); }); diff --git a/packages/node-runtime-worker-thread/src/index.ts b/packages/node-runtime-worker-thread/src/index.ts index 9dd94593ec..d9a05e900c 100644 --- a/packages/node-runtime-worker-thread/src/index.ts +++ b/packages/node-runtime-worker-thread/src/index.ts @@ -4,15 +4,19 @@ import { ChildProcess, SpawnOptionsWithoutStdio } from 'child_process'; import { MongoClientOptions } from '@mongosh/service-provider-core'; import { Runtime, RuntimeEvaluationListener, RuntimeEvaluationResult } from '@mongosh/browser-runtime-core'; +import { MongoshBus } from '@mongosh/types'; import { promises as fs } from 'fs'; import path from 'path'; +import { EventEmitter } from 'events'; import spawnChildFromSource, { kill } from './spawn-child-from-source'; import { Caller, createCaller } from './rpc'; import { ChildProcessEvaluationListener } from './child-process-evaluation-listener'; import type { WorkerRuntime as WorkerThreadWorkerRuntime } from './worker-runtime'; import { deserializeEvaluationResult } from './serializer'; +import { ChildProcessMongoshBus } from './child-process-mongosh-bus'; type ChildProcessRuntime = Caller; + class WorkerRuntime implements Runtime { private initOptions: { uri: string; @@ -23,6 +27,10 @@ class WorkerRuntime implements Runtime { evaluationListener: RuntimeEvaluationListener | null = null; + private eventEmitter: MongoshBus; + + private childProcessMongoshBus!: ChildProcessMongoshBus; + private childProcessEvaluationListener!: ChildProcessEvaluationListener; private childProcess!: ChildProcess; @@ -35,10 +43,12 @@ class WorkerRuntime implements Runtime { uri: string, driverOptions: MongoClientOptions = {}, cliOptions: { nodb?: boolean } = {}, - spawnOptions: SpawnOptionsWithoutStdio = {} + spawnOptions: SpawnOptionsWithoutStdio = {}, + eventEmitter: MongoshBus = new EventEmitter() ) { this.initOptions = { uri, driverOptions, cliOptions, spawnOptions }; this.initWorkerPromise = this.initWorker(); + this.eventEmitter = eventEmitter; } private async initWorker() { @@ -70,6 +80,11 @@ class WorkerRuntime implements Runtime { this.childProcess ); + this.childProcessMongoshBus = new ChildProcessMongoshBus( + this.eventEmitter, + this.childProcess + ); + await this.childProcessRuntime.init(uri, driverOptions, cliOptions); } diff --git a/packages/node-runtime-worker-thread/src/worker-runtime.ts b/packages/node-runtime-worker-thread/src/worker-runtime.ts index e0732c43c0..b7f069fb45 100644 --- a/packages/node-runtime-worker-thread/src/worker-runtime.ts +++ b/packages/node-runtime-worker-thread/src/worker-runtime.ts @@ -11,6 +11,7 @@ import { import { CompassServiceProvider } from '@mongosh/service-provider-server'; import { exposeAll, createCaller } from './rpc'; import { serializeEvaluationResult } from './serializer'; +import { MongoshBus } from '@mongosh/types'; if (!parentPort || isMainThread) { throw new Error('Worker runtime can be used only in a worker thread'); @@ -34,6 +35,17 @@ const evaluationListener = createCaller( parentPort ); +const messageBus: MongoshBus = Object.assign( + createCaller(['emit'], parentPort), + { + on() { + // this method is a no-op, we only want message bus to proxy emitted + // events to main thread + return messageBus; + } + } +); + export type WorkerRuntime = Runtime & { init( uri: string, @@ -53,10 +65,7 @@ const workerRuntime: WorkerRuntime = { driverOptions, cliOptions ); - runtime = new ElectronRuntime( - provider /** , TODO: `messageBus` support for telemetry in a separate ticket */ - ); - + runtime = new ElectronRuntime(provider, messageBus); runtime.setEvaluationListener(evaluationListener); }, From d01b11dc88f9cae8f19bee5289db843ff7be5dc4 Mon Sep 17 00:00:00 2001 From: Sergey Petushkov Date: Tue, 2 Feb 2021 09:26:52 +0100 Subject: [PATCH 2/3] refactor(node-runtime-worker-thread): Use 'global' runtime in tests --- packages/node-runtime-worker-thread/src/index.spec.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/node-runtime-worker-thread/src/index.spec.ts b/packages/node-runtime-worker-thread/src/index.spec.ts index d65b00827d..f4babdbfee 100644 --- a/packages/node-runtime-worker-thread/src/index.spec.ts +++ b/packages/node-runtime-worker-thread/src/index.spec.ts @@ -86,12 +86,10 @@ describe('WorkerRuntime', () => { const testServer = startTestServer('shared'); it('should return prompt when connected to the server', async() => { - const runtime = new WorkerRuntime(await testServer.connectionString()); + runtime = new WorkerRuntime(await testServer.connectionString()); const result = await runtime.getShellPrompt(); expect(result).to.match(/>/); - - await runtime.terminate(); }); }); From ad8136dd6f01dc01a0bf947ed756b71fb72234d2 Mon Sep 17 00:00:00 2001 From: Sergey Petushkov Date: Tue, 2 Feb 2021 09:37:26 +0100 Subject: [PATCH 3/3] refactor(node-runtime-worker-thread): Throw if on method called on mongosh bus --- .../src/child-process-mongosh-bus.ts | 3 +-- .../node-runtime-worker-thread/src/worker-runtime.ts | 9 +++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts b/packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts index f916956e6e..de0f40d586 100644 --- a/packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts +++ b/packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts @@ -12,8 +12,7 @@ export class ChildProcessMongoshBus { eventEmitter.emit(...args); }, on() { - // no-op - return exposedEmitter; + throw new Error("Can't use `on` method on ChildProcessMongoshBus"); } }, childProcess diff --git a/packages/node-runtime-worker-thread/src/worker-runtime.ts b/packages/node-runtime-worker-thread/src/worker-runtime.ts index b7f069fb45..5dd4ed5303 100644 --- a/packages/node-runtime-worker-thread/src/worker-runtime.ts +++ b/packages/node-runtime-worker-thread/src/worker-runtime.ts @@ -2,7 +2,10 @@ /* ^^^ we test the dist directly, so isntanbul can't calculate the coverage correctly */ import { parentPort, isMainThread } from 'worker_threads'; -import { Runtime, RuntimeEvaluationListener } from '@mongosh/browser-runtime-core'; +import { + Runtime, + RuntimeEvaluationListener +} from '@mongosh/browser-runtime-core'; import { ElectronRuntime } from '@mongosh/browser-runtime-electron'; import { MongoClientOptions, @@ -39,9 +42,7 @@ const messageBus: MongoshBus = Object.assign( createCaller(['emit'], parentPort), { on() { - // this method is a no-op, we only want message bus to proxy emitted - // events to main thread - return messageBus; + throw new Error("Can't call `on` method on worker runtime MongoshBus"); } } );