Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/node-runtime-worker-thread/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
"@mongosh/browser-runtime-electron": "0.0.0-dev.0",
"@mongosh/service-provider-core": "0.0.0-dev.0",
"@mongosh/service-provider-server": "0.0.0-dev.0",
"@mongosh/types": "0.0.0-dev.0",
"bson": "^4.2.2",
"postmsg-rpc": "^2.4.0"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { ChildProcess } from 'child_process';
import { MongoshBus } from '@mongosh/types';
import { exposeAll, WithClose } from './rpc';

export class ChildProcessMongoshBus {
exposedEmitter: WithClose<MongoshBus>;

constructor(eventEmitter: MongoshBus, childProcess: ChildProcess) {
const exposedEmitter: WithClose<MongoshBus> = exposeAll(
{
emit(...args) {
eventEmitter.emit(...args);
},
on() {
throw new Error("Can't use `on` method on ChildProcessMongoshBus");
}
},
childProcess
);
this.exposedEmitter = exposedEmitter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ const evaluationListener = createCaller(
);

exposeAll(evaluationListener, workerProcess);

const messageBus = createCaller(['emit', 'on'], process);

exposeAll(messageBus, workerProcess);
56 changes: 44 additions & 12 deletions packages/node-runtime-worker-thread/src/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
import chai, { expect } from 'chai';
import sinon from 'sinon';
import sinonChai from 'sinon-chai';
import { MongoshBus } from '@mongosh/types';
import { startTestServer } from '../../../testing/integration-testing-hooks';
import { WorkerRuntime } from '../dist/index';

chai.use(sinonChai);

function createMockEventEmitter() {
return (sinon.stub({ on() {}, emit() {} }) as unknown) as MongoshBus;
}

describe('WorkerRuntime', () => {
let runtime: WorkerRuntime;

afterEach(async() => {
if (runtime) {
await runtime.terminate();
}
});

describe('evaluate', () => {
it('should evaluate and return basic values', async() => {
const runtime = new WorkerRuntime('mongodb://nodb/', {}, { nodb: true });
runtime = new WorkerRuntime('mongodb://nodb/', {}, { nodb: true });
const result = await runtime.evaluate('1+1');

expect(result.printable).to.equal(2);

await runtime.terminate();
});

describe('errors', () => {
Expand Down Expand Up @@ -46,7 +57,9 @@ describe('WorkerRuntime', () => {
});

it("should return an error if it's returned from evaluation", async() => {
const { printable } = await runtime.evaluate('new SyntaxError("Syntax!")');
const { printable } = await runtime.evaluate(
'new SyntaxError("Syntax!")'
);

expect(printable).to.be.instanceof(Error);
expect(printable).to.have.property('name', 'SyntaxError');
Expand All @@ -62,25 +75,21 @@ describe('WorkerRuntime', () => {
const testServer = startTestServer('shared');

it('should return completions', async() => {
const runtime = new WorkerRuntime(await testServer.connectionString());
runtime = new WorkerRuntime(await testServer.connectionString());
const completions = await runtime.getCompletions('db.coll1.f');

expect(completions).to.deep.contain({ completion: 'db.coll1.find' });

await runtime.terminate();
});
});

describe('getShellPrompt', () => {
const testServer = startTestServer('shared');

it('should return prompt when connected to the server', async() => {
const runtime = new WorkerRuntime(await testServer.connectionString());
runtime = new WorkerRuntime(await testServer.connectionString());
const result = await runtime.getShellPrompt();

expect(result).to.match(/>/);

await runtime.terminate();
});
});

Expand All @@ -90,16 +99,39 @@ describe('WorkerRuntime', () => {
onPrompt: sinon.spy(() => 'password123')
};

const runtime = new WorkerRuntime('mongodb://nodb/', {}, { nodb: true });
runtime = new WorkerRuntime('mongodb://nodb/', {}, { nodb: true });

runtime.setEvaluationListener(evalListener);

const password = await runtime.evaluate('passwordPrompt()');

expect(evalListener.onPrompt).to.have.been.called;
expect(password.printable).to.equal('password123');
});
});

await runtime.terminate();
describe('eventEmitter', () => {
const testServer = startTestServer('shared');

it('should propagate emitted events from worker', async() => {
const eventEmitter = createMockEventEmitter();

runtime = new WorkerRuntime(
await testServer.connectionString(),
{},
{},
{},
eventEmitter
);

await runtime.evaluate('db.getCollectionNames()');

expect(eventEmitter.emit).to.have.been.calledWith('mongosh:api-call', {
arguments: {},
class: 'Database',
db: 'test',
method: 'getCollectionNames'
});
});
});
});
17 changes: 16 additions & 1 deletion packages/node-runtime-worker-thread/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
import { ChildProcess, SpawnOptionsWithoutStdio } from 'child_process';
import { MongoClientOptions } from '@mongosh/service-provider-core';
import { Runtime, RuntimeEvaluationListener, RuntimeEvaluationResult } from '@mongosh/browser-runtime-core';
import { MongoshBus } from '@mongosh/types';
import { promises as fs } from 'fs';
import path from 'path';
import { EventEmitter } from 'events';
import spawnChildFromSource, { kill } from './spawn-child-from-source';
import { Caller, createCaller } from './rpc';
import { ChildProcessEvaluationListener } from './child-process-evaluation-listener';
import type { WorkerRuntime as WorkerThreadWorkerRuntime } from './worker-runtime';
import { deserializeEvaluationResult } from './serializer';
import { ChildProcessMongoshBus } from './child-process-mongosh-bus';

type ChildProcessRuntime = Caller<WorkerThreadWorkerRuntime>;

class WorkerRuntime implements Runtime {
private initOptions: {
uri: string;
Expand All @@ -23,6 +27,10 @@ class WorkerRuntime implements Runtime {

evaluationListener: RuntimeEvaluationListener | null = null;

private eventEmitter: MongoshBus;

private childProcessMongoshBus!: ChildProcessMongoshBus;

private childProcessEvaluationListener!: ChildProcessEvaluationListener;

private childProcess!: ChildProcess;
Expand All @@ -35,10 +43,12 @@ class WorkerRuntime implements Runtime {
uri: string,
driverOptions: MongoClientOptions = {},
cliOptions: { nodb?: boolean } = {},
spawnOptions: SpawnOptionsWithoutStdio = {}
spawnOptions: SpawnOptionsWithoutStdio = {},
eventEmitter: MongoshBus = new EventEmitter()
) {
this.initOptions = { uri, driverOptions, cliOptions, spawnOptions };
this.initWorkerPromise = this.initWorker();
this.eventEmitter = eventEmitter;
}

private async initWorker() {
Expand Down Expand Up @@ -76,6 +86,11 @@ class WorkerRuntime implements Runtime {
this.childProcess
);

this.childProcessMongoshBus = new ChildProcessMongoshBus(
this.eventEmitter,
this.childProcess
);

await this.childProcessRuntime.init(uri, driverOptions, cliOptions);
}

Expand Down
20 changes: 15 additions & 5 deletions packages/node-runtime-worker-thread/src/worker-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
/* ^^^ we test the dist directly, so isntanbul can't calculate the coverage correctly */

import { parentPort, isMainThread } from 'worker_threads';
import { Runtime, RuntimeEvaluationListener } from '@mongosh/browser-runtime-core';
import {
Runtime,
RuntimeEvaluationListener
} from '@mongosh/browser-runtime-core';
import { ElectronRuntime } from '@mongosh/browser-runtime-electron';
import {
MongoClientOptions,
Expand All @@ -11,6 +14,7 @@ import {
import { CompassServiceProvider } from '@mongosh/service-provider-server';
import { exposeAll, createCaller } from './rpc';
import { serializeEvaluationResult } from './serializer';
import { MongoshBus } from '@mongosh/types';

if (!parentPort || isMainThread) {
throw new Error('Worker runtime can be used only in a worker thread');
Expand All @@ -34,6 +38,15 @@ const evaluationListener = createCaller<RuntimeEvaluationListener>(
parentPort
);

const messageBus: MongoshBus = Object.assign(
createCaller(['emit'], parentPort),
{
on() {
throw new Error("Can't call `on` method on worker runtime MongoshBus");
}
}
);

export type WorkerRuntime = Runtime & {
init(
uri: string,
Expand All @@ -53,10 +66,7 @@ const workerRuntime: WorkerRuntime = {
driverOptions,
cliOptions
);
runtime = new ElectronRuntime(
provider /** , TODO: `messageBus` support for telemetry in a separate ticket */
);

runtime = new ElectronRuntime(provider, messageBus);
runtime.setEvaluationListener(evaluationListener);
},

Expand Down