diff --git a/packages/async-rewriter2/README.md b/packages/async-rewriter2/README.md index 9c8f0b815d..40cdf16b8b 100644 --- a/packages/async-rewriter2/README.md +++ b/packages/async-rewriter2/README.md @@ -33,7 +33,7 @@ they reach their first `await` expression, and the fact that we can determine which `Promise`s need `await`ing by marking them as such using decorators on the API surface. -The transformation takes place in two main steps. +The transformation takes place in three main steps. ### Step one: IIFE wrapping @@ -63,7 +63,64 @@ function foo() { Note how identifiers remain accessible in the outside environment, including top-level functions being hoisted to the outside. -### Step two: Async function wrapping +### Step two: Making certain exceptions uncatchable + +In order to support Ctrl+C properly, we add a type of exception that is not +catchable by userland code. + +For example, + +```js +try { + foo3(); +} catch { + bar3(); +} +``` + +is transformed into + +```js +try { + foo3(); +} catch (_err) { + if (!err || !_err[Symbol.for('@@mongosh.uncatchable')]) { + bar3(); + } else { + throw _err; + } +} +``` + +and + +```js +try { + foo1(); +} catch (err) { + bar1(err); +} finally { + baz(); +} +``` + +into + +```js +let _isCatchable; + +try { + foo1(); +} catch (err) { + _isCatchable = !err || !err[Symbol.for('@@mongosh.uncatchable')]; + + if (_isCatchable) bar1(err); else throw err; +} finally { + if (_isCatchable) baz(); +} +``` + +### Step three: Async function wrapping We perform three operations: diff --git a/packages/async-rewriter2/src/async-writer-babel.spec.ts b/packages/async-rewriter2/src/async-writer-babel.spec.ts index 38d0491029..c19a65b239 100644 --- a/packages/async-rewriter2/src/async-writer-babel.spec.ts +++ b/packages/async-rewriter2/src/async-writer-babel.spec.ts @@ -49,6 +49,11 @@ describe('AsyncWriter', () => { return Object.assign( Promise.resolve(implicitlyAsyncValue), { [Symbol.for('@@mongosh.syntheticPromise')]: true }); + }, + throwUncatchable() { + throw Object.assign( + new Error('uncatchable!'), + { [Symbol.for('@@mongosh.uncatchable')]: true }); } }); runTranspiledCode = (code: string, context?: any) => { @@ -760,4 +765,182 @@ describe('AsyncWriter', () => { }); }); }); + + context('uncatchable exceptions', () => { + it('allows catching regular exceptions', () => { + const result = runTranspiledCode(` + (() => { + try { + throw new Error('generic error'); + } catch (err) { + return ({ caught: err }); + } + })();`); + expect(result.caught.message).to.equal('generic error'); + }); + + it('allows catching regular exceptions with destructuring catch (object)', () => { + const result = runTranspiledCode(` + (() => { + try { + throw new Error('generic error'); + } catch ({ message }) { + return ({ caught: message }); + } + })();`); + expect(result.caught).to.equal('generic error'); + }); + + + it('allows catching regular exceptions with destructuring catch (array)', () => { + const result = runTranspiledCode(` + (() => { + try { + throw [ 'foo' ]; + } catch ([message]) { + return ({ caught: message }); + } + })();`); + expect(result.caught).to.equal('foo'); + }); + + it('allows catching regular exceptions with destructuring catch (assignable)', () => { + const result = runTranspiledCode(` + (() => { + try { + throw [ 'foo' ]; + } catch ([message]) { + message = 42; + return ({ caught: message }); + } + })();`); + expect(result.caught).to.equal(42); + }); + + it('allows rethrowing regular exceptions', () => { + try { + runTranspiledCode(` + (() => { + try { + throw new Error('generic error'); + } catch (err) { + throw err; + } + })();`); + expect.fail('missed exception'); + } catch (err) { + expect(err.message).to.equal('generic error'); + } + }); + + it('allows returning from finally', () => { + const result = runTranspiledCode(` + (() => { + try { + throw new Error('generic error'); + } catch (err) { + return ({ caught: err }); + } finally { + return 'finally'; + } + })();`); + expect(result).to.equal('finally'); + }); + + it('allows finally without catch', () => { + const result = runTranspiledCode(` + (() => { + try { + throw new Error('generic error'); + } finally { + return 'finally'; + } + })();`); + expect(result).to.equal('finally'); + }); + + it('allows throwing primitives', () => { + const result = runTranspiledCode(` + (() => { + try { + throw null; + } catch (err) { + return ({ caught: err }); + } + })();`); + expect(result.caught).to.equal(null); + }); + + it('allows throwing primitives with finally', () => { + const result = runTranspiledCode(` + (() => { + try { + throw null; + } catch (err) { + return ({ caught: err }); + } finally { + return 'finally'; + } + })();`); + expect(result).to.equal('finally'); + }); + + it('does not catch uncatchable exceptions', () => { + try { + runTranspiledCode(` + (() => { + try { + throwUncatchable(); + } catch (err) { + return ({ caught: err }); + } + })();`); + expect.fail('missed exception'); + } catch (err) { + expect(err.message).to.equal('uncatchable!'); + } + }); + + it('does not catch uncatchable exceptions with empty catch clause', () => { + try { + runTranspiledCode(` + (() => { + try { + throwUncatchable(); + } catch { } + })();`); + expect.fail('missed exception'); + } catch (err) { + expect(err.message).to.equal('uncatchable!'); + } + }); + + it('does not catch uncatchable exceptions with finalizer', () => { + try { + runTranspiledCode(` + (() => { + try { + throwUncatchable(); + } catch { } finally { return; } + })();`); + expect.fail('missed exception'); + } catch (err) { + expect(err.message).to.equal('uncatchable!'); + } + }); + + it('does not catch uncatchable exceptions with only finalizer', () => { + try { + runTranspiledCode(` + (() => { + try { + throwUncatchable(); + } finally { return; } + })();`); + expect.fail('missed exception'); + } catch (err) { + expect(err.message).to.equal('uncatchable!'); + } + }); + }); }); diff --git a/packages/async-rewriter2/src/async-writer-babel.ts b/packages/async-rewriter2/src/async-writer-babel.ts index b07b1b4b00..2008f9a840 100644 --- a/packages/async-rewriter2/src/async-writer-babel.ts +++ b/packages/async-rewriter2/src/async-writer-babel.ts @@ -2,13 +2,14 @@ import * as babel from '@babel/core'; import runtimeSupport from './runtime-support.nocov'; import wrapAsFunctionPlugin from './stages/wrap-as-iife'; +import uncatchableExceptionPlugin from './stages/uncatchable-exceptions'; import makeMaybeAsyncFunctionPlugin from './stages/transform-maybe-await'; import { AsyncRewriterErrors } from './error-codes'; /** * General notes for this package: * - * This package contains two babel plugins used in async rewriting, plus a helper + * This package contains three babel plugins used in async rewriting, plus a helper * to apply these plugins to plain code. * * If you have not worked with babel plugins, @@ -51,6 +52,7 @@ export default class AsyncWriter { require('@babel/plugin-transform-destructuring').default ]); code = this.step(code, [wrapAsFunctionPlugin]); + code = this.step(code, [uncatchableExceptionPlugin]); code = this.step(code, [ [ makeMaybeAsyncFunctionPlugin, diff --git a/packages/async-rewriter2/src/stages/uncatchable-exceptions.ts b/packages/async-rewriter2/src/stages/uncatchable-exceptions.ts new file mode 100644 index 0000000000..480795947f --- /dev/null +++ b/packages/async-rewriter2/src/stages/uncatchable-exceptions.ts @@ -0,0 +1,106 @@ +import * as babel from '@babel/core'; +import * as BabelTypes from '@babel/types'; + +/** + * In this step, we transform try/catch statements so that there are specific + * types of exceptions that they cannot catch (marked by + * Symbol.for('@@mongosh.uncatchable')) or run finally blocks for. + */ +export default ({ types: t }: { types: typeof BabelTypes }): babel.PluginObj<{}> => { + // We mark already-visited try/catch statements using these symbols. + function asNodeKey(v: any): keyof babel.types.Node { return v; } + const isGeneratedTryCatch = asNodeKey(Symbol('isGeneratedTryCatch')); + const notUncatchableCheck = babel.template.expression(` + (!ERR_IDENTIFIER || !ERR_IDENTIFIER[Symbol.for('@@mongosh.uncatchable')]) + `); + + return { + visitor: { + TryStatement(path) { + if (path.node[isGeneratedTryCatch]) return; + const { block, finalizer } = path.node; + let catchParam: babel.types.Identifier; + let handler: babel.types.CatchClause; + const fallbackCatchParam = path.scope.generateUidIdentifier('err'); + + if (path.node.handler) { + if (path.node.handler.param?.type === 'Identifier') { + // Classic catch(err) { ... }. We're good, no need to change anything. + catchParam = path.node.handler.param; + handler = path.node.handler; + } else if (path.node.handler.param) { + // Destructuring catch({ ... }) { ... body ... }. Transform to + // catch(err) { let ... = err; ... body ... }. + catchParam = fallbackCatchParam; + handler = t.catchClause(catchParam, t.blockStatement([ + t.variableDeclaration('let', [ + t.variableDeclarator(path.node.handler.param, catchParam) + ]), + path.node.handler.body + ])); + } else { + // + catchParam = fallbackCatchParam; + handler = path.node.handler; + } + } else { + // try {} finally {} without 'catch' is valid -- if we encounter that, + // pretend that there is a dummy catch (err) { throw err; } + catchParam = fallbackCatchParam; + handler = t.catchClause(catchParam, t.blockStatement([ + t.throwStatement(catchParam) + ])); + } + + if (!finalizer) { + // No finalizer -> no need to keep track of state outside the catch {} + // block itself. This is a bit simpler. + path.replaceWith(Object.assign( + t.tryStatement( + block, + t.catchClause( + catchParam, + t.blockStatement([ + // if (!err[isUncatchableSymbol]) { ... } else throw err; + t.ifStatement( + notUncatchableCheck({ ERR_IDENTIFIER: catchParam }), + handler.body, + t.throwStatement(catchParam)) + ]) + ) + ), + { [isGeneratedTryCatch]: true })); + } else { + // finalizer -> need to store whether the exception was catchable + // (i.e. whether the finalizer is allowed to run) outside of the + // try/catch/finally block. + const isCatchable = path.scope.generateUidIdentifier('_isCatchable'); + path.replaceWithMultiple([ + t.variableDeclaration('let', [t.variableDeclarator(isCatchable)]), + Object.assign( + t.tryStatement( + block, + t.catchClause( + catchParam, + t.blockStatement([ + // isCatchable = !err[isUncatchableSymbol] + t.expressionStatement( + t.assignmentExpression('=', + isCatchable, + notUncatchableCheck({ ERR_IDENTIFIER: catchParam }))), + // if (isCatchable) { ... } else throw err; + t.ifStatement(isCatchable, handler.body, t.throwStatement(catchParam)) + ]), + ), + t.blockStatement([ + // if (isCatchable) { ... } + t.ifStatement(isCatchable, finalizer) + ]) + ), + { [isGeneratedTryCatch]: true }) + ]); + } + } + } + }; +}; diff --git a/packages/cli-repl/src/async-repl.spec.ts b/packages/cli-repl/src/async-repl.spec.ts index e19f231f33..e028173d7f 100644 --- a/packages/cli-repl/src/async-repl.spec.ts +++ b/packages/cli-repl/src/async-repl.spec.ts @@ -67,11 +67,10 @@ describe('AsyncRepl', () => { it('allows sync interruption through SIGINT', async function() { if (process.platform === 'win32') { - this.skip(); // No SIGINT on Windows. - return; + return this.skip(); // No SIGINT on Windows. } - const { input, output } = createDefaultAsyncRepl({ breakEvalOnSigint: true }); + const { input, output } = createDefaultAsyncRepl({ onAsyncSigint: () => false }); input.write('while (true) { process.kill(process.pid, "SIGINT"); }\n'); await expectInStream(output, 'execution was interrupted'); @@ -79,16 +78,17 @@ describe('AsyncRepl', () => { it('allows async interruption through SIGINT', async function() { if (process.platform === 'win32') { - this.skip(); // No SIGINT on Windows. - return; + return this.skip(); // No SIGINT on Windows. } - const { input, output } = createDefaultAsyncRepl({ breakEvalOnSigint: true }); + const onAsyncSigint = sinon.stub().resolves(false); + const { input, output } = createDefaultAsyncRepl({ onAsyncSigint: onAsyncSigint }); input.write('new Promise(oopsIdontResolve => 0)\n'); await delay(100); process.kill(process.pid, 'SIGINT'); await expectInStream(output, 'execution was interrupted'); + expect(onAsyncSigint).to.have.been.calledOnce; }); it('handles synchronous exceptions well', async() => { diff --git a/packages/cli-repl/src/async-repl.ts b/packages/cli-repl/src/async-repl.ts index 42e730ca97..837e829d9d 100644 --- a/packages/cli-repl/src/async-repl.ts +++ b/packages/cli-repl/src/async-repl.ts @@ -1,11 +1,11 @@ /* eslint-disable chai-friendly/no-unused-expressions */ -import type { REPLServer, ReplOptions } from 'repl'; -import { Interface, ReadLineOptions } from 'readline'; +import { Domain } from 'domain'; import type { EventEmitter } from 'events'; -import { Recoverable, start as originalStart } from 'repl'; import isRecoverableError from 'is-recoverable-error'; +import { Interface, ReadLineOptions } from 'readline'; +import type { ReplOptions, REPLServer } from 'repl'; +import { Recoverable, start as originalStart } from 'repl'; import { promisify } from 'util'; -import { Domain } from 'domain'; // Utility, inverse of Readonly type Mutable = { @@ -15,10 +15,11 @@ type Mutable = { export type OriginalEvalFunction = (input: string, context: any, filename: string) => Promise; export type AsyncEvalFunction = (originalEval: OriginalEvalFunction, input: string, context: any, filename: string) => Promise; -export type AsyncREPLOptions = ReadLineOptions & Omit & { +export type AsyncREPLOptions = ReadLineOptions & Omit & { start?: typeof originalStart, wrapCallbackError?: (err: Error) => Error; asyncEval: AsyncEvalFunction; + onAsyncSigint?: () => Promise | boolean; }; export type EvalStartEvent = { @@ -57,8 +58,16 @@ function getPrompt(repl: any): string { // Start a REPLServer that supports asynchronous evaluation, rather than just // synchronous, and integrates nicely with Ctrl+C handling in that respect. export function start(opts: AsyncREPLOptions): REPLServer { + const { + asyncEval, + wrapCallbackError = err => err, + onAsyncSigint + } = opts; + if (onAsyncSigint) { + (opts as ReplOptions).breakEvalOnSigint = true; + } + const repl = (opts.start ?? originalStart)(opts); - const { asyncEval, wrapCallbackError = err => err, breakEvalOnSigint } = opts; const originalEval = promisify(wrapNoSyncDomainError(repl.eval.bind(repl))); (repl as Mutable).eval = async( @@ -97,14 +106,22 @@ export function start(opts: AsyncREPLOptions): REPLServer { try { result = await new Promise((resolve, reject) => { - if (breakEvalOnSigint) { + if (onAsyncSigint) { // Handle SIGINT (Ctrl+C) that occurs while we are stuck in `await` // by racing a listener for 'SIGINT' against the evalResult Promise. // We remove all 'SIGINT' listeners and install our own. - sigintListener = (): void => { - // Reject with an exception similar to one thrown by Node.js - // itself if the `customEval` itself is interrupted. - reject(new Error('Asynchronous execution was interrupted by `SIGINT`')); + sigintListener = async(): Promise => { + let interruptHandled = false; + try { + interruptHandled = await onAsyncSigint(); + } catch (e) { + // ignore + } finally { + // Reject with an exception similar to one thrown by Node.js + // itself if the `customEval` itself is interrupted + // and the asyncSigint handler did not deal with it + reject(interruptHandled ? undefined : new Error('Asynchronous execution was interrupted by `SIGINT`')); + } }; replSigint = disableEvent(repl, 'SIGINT'); diff --git a/packages/cli-repl/src/cli-repl.spec.ts b/packages/cli-repl/src/cli-repl.spec.ts index 54e6b7a9be..f4145f84f4 100644 --- a/packages/cli-repl/src/cli-repl.spec.ts +++ b/packages/cli-repl/src/cli-repl.spec.ts @@ -5,8 +5,9 @@ import http from 'http'; import path from 'path'; import { Duplex, PassThrough } from 'stream'; import { promisify } from 'util'; -import { MongodSetup, startTestServer } from '../../../testing/integration-testing-hooks'; -import { expect, fakeTTYProps, readReplLogfile, useTmpdir, waitBus, waitCompletion, waitEval, tick } from '../test/repl-helpers'; +import { MongodSetup, skipIfServerVersion, startTestServer } from '../../../testing/integration-testing-hooks'; +import { expect, fakeTTYProps, readReplLogfile, tick, useTmpdir, waitBus, waitCompletion, waitEval } from '../test/repl-helpers'; +import { eventually } from '../test/helpers'; import CliRepl, { CliReplOptions } from './cli-repl'; import { CliReplErrors } from './error-codes'; @@ -178,6 +179,7 @@ describe('CliRepl', () => { const filenameA = path.resolve(__dirname, '..', 'test', 'fixtures', 'load', 'a.js'); input.write(`load(${JSON.stringify(filenameA)})\n`); await waitEval(cliRepl.bus); + expect(output).to.contain('Hi!'); input.write('variableFromA\n'); await waitEval(cliRepl.bus); expect(output).to.include('yes from A'); @@ -187,6 +189,7 @@ describe('CliRepl', () => { const filenameB = path.resolve(__dirname, '..', 'test', 'fixtures', 'load', 'b.js'); input.write(`load(${JSON.stringify(filenameB)})\n`); await waitEval(cliRepl.bus); + expect(output).to.contain('Hi!'); input.write('variableFromA + " " + variableFromB\n'); await waitEval(cliRepl.bus); expect(output).to.include('yes from A yes from A from B'); @@ -230,7 +233,6 @@ describe('CliRepl', () => { it('emits error for inaccessible home directory', async function() { if (process.platform === 'win32') { this.skip(); // TODO: Figure out why this doesn't work on Windows. - return; } cliReplOptions.shellHomePaths.shellRoamingDataPath = '/nonexistent/inaccesible'; cliReplOptions.shellHomePaths.shellLocalDataPath = '/nonexistent/inaccesible'; @@ -777,6 +779,150 @@ describe('CliRepl', () => { expect(output).to.include('on clirepltest> '); }); }); + + context('pressing CTRL-C', () => { + before(function() { + if (process.platform === 'win32') { // cannot trigger SIGINT on Windows + this.skip(); + } + }); + + beforeEach(async() => { + await cliRepl.start(await testServer.connectionString(), {}); + await tick(); + input.write('db.ctrlc.insertOne({ hello: "there" })\n'); + await waitEval(cliRepl.bus); + }); + + afterEach(async() => { + input.write('db.ctrlc.drop()\n'); + await waitEval(cliRepl.bus); + }); + + context('for server < 4.1', () => { + skipIfServerVersion(testServer, '>= 4.1'); + + it('prints a warning to manually terminate operations', async() => { + input.write('sleep(500); print(db.ctrlc.find({}));\n'); + await delay(100); + + output = ''; + process.kill(process.pid, 'SIGINT'); + + await waitBus(cliRepl.bus, 'mongosh:interrupt-complete'); + expect(output).to.match(/^Stopping execution.../m); + expect(output).to.match(/^WARNING: Operations running on the server cannot be killed automatically/m); + }); + }); + + context('for server >= 4.1', () => { + skipIfServerVersion(testServer, '< 4.1'); + + it('terminates operations on the server side', async() => { + input.write('db.ctrlc.find({ $where: \'while(true) { /* loop1 */ }\' })\n'); + await delay(100); + process.kill(process.pid, 'SIGINT'); + await waitBus(cliRepl.bus, 'mongosh:interrupt-complete'); + expect(output).to.match(/Stopping execution.../m); + + input.write('use admin\n'); + await waitEval(cliRepl.bus); + + await eventually(async() => { + output = ''; + input.write('db.aggregate([ {$currentOp: {} }, { $match: { \'command.find\': \'ctrlc\' } }, { $project: { command: 1 } } ])\n'); + await waitEval(cliRepl.bus); + + expect(output).to.not.include('MongoError'); + expect(output).to.not.include('loop1'); + }); + }); + + it('terminates operations also for explicitly created Mongo instances', async() => { + input.write('dbname = db.getName()\n'); + await waitEval(cliRepl.bus); + input.write(`client = Mongo("${await testServer.connectionString()}")\n`); + await waitEval(cliRepl.bus); + input.write('clientCtrlcDb = client.getDB(dbname);\n'); + await waitEval(cliRepl.bus); + input.write('clientAdminDb = client.getDB(\'admin\');\n'); + await waitEval(cliRepl.bus); + + input.write('clientCtrlcDb.ctrlc.find({ $where: \'while(true) { /* loop2 */ }\' })\n'); + await delay(100); + process.kill(process.pid, 'SIGINT'); + await waitBus(cliRepl.bus, 'mongosh:interrupt-complete'); + expect(output).to.match(/Stopping execution.../m); + + await eventually(async() => { + output = ''; + input.write('clientAdminDb.aggregate([ {$currentOp: {} }, { $match: { \'command.find\': \'ctrlc\' } }, { $project: { command: 1 } } ])\n'); + await waitEval(cliRepl.bus); + + expect(output).to.not.include('MongoError'); + expect(output).to.not.include('loop2'); + }); + }); + }); + + it('does not reconnect until the evaluation finishes', async() => { + input.write('sleep(500); print(db.ctrlc.find({}));\n'); + await delay(100); + + output = ''; + process.kill(process.pid, 'SIGINT'); + + await waitBus(cliRepl.bus, 'mongosh:interrupt-complete'); + expect(output).to.match(/^Stopping execution.../m); + expect(output).to.not.include('MongoError'); + expect(output).to.not.include('MongoshInternalError'); + expect(output).to.not.include('hello'); + expect(output).to.match(/>\s+$/); + + output = ''; + await delay(1000); + expect(output).to.be.empty; + + input.write('db.ctrlc.find({})\n'); + await waitEval(cliRepl.bus); + expect(output).to.contain('hello'); + }); + + it('cancels shell API commands that do not use the server', async() => { + output = ''; + input.write('while(true) { print("I am alive"); };\n'); + await tick(); + process.kill(process.pid, 'SIGINT'); + + await waitBus(cliRepl.bus, 'mongosh:interrupt-complete'); + expect(output).to.match(/^Stopping execution.../m); + expect(output).to.not.include('MongoError'); + expect(output).to.not.include('Mongosh'); + expect(output).to.match(/>\s+$/); + + output = ''; + await delay(100); + expect(output).to.not.include('alive'); + }); + + it('ensures user code cannot catch the interrupt exception', async() => { + output = ''; + input.write('nope = false; while(true) { try { print("I am alive"); } catch { nope = true; } };\n'); + await tick(); + process.kill(process.pid, 'SIGINT'); + + await waitBus(cliRepl.bus, 'mongosh:interrupt-complete'); + expect(output).to.match(/^Stopping execution.../m); + expect(output).to.not.include('MongoError'); + expect(output).to.not.include('Mongosh'); + expect(output).to.match(/>\s+$/); + + output = ''; + input.write('nope\n'); + await waitEval(cliRepl.bus); + expect(output).to.not.contain(true); + }); + }); }); context('with a replset node', () => { diff --git a/packages/cli-repl/src/mongosh-repl.ts b/packages/cli-repl/src/mongosh-repl.ts index 5dc079004b..24799f2b1e 100644 --- a/packages/cli-repl/src/mongosh-repl.ts +++ b/packages/cli-repl/src/mongosh-repl.ts @@ -1,25 +1,25 @@ import completer from '@mongosh/autocomplete'; import { MongoshCommandFailed, MongoshInternalError, MongoshWarning } from '@mongosh/errors'; import { changeHistory } from '@mongosh/history'; -import type { ServiceProvider, AutoEncryptionOptions } from '@mongosh/service-provider-core'; -import { EvaluationListener, ShellCliOptions, ShellInternalState, OnLoadResult } from '@mongosh/shell-api'; +import type { AutoEncryptionOptions, ServiceProvider } from '@mongosh/service-provider-core'; +import { EvaluationListener, OnLoadResult, ShellCliOptions, ShellInternalState } from '@mongosh/shell-api'; import { ShellEvaluator, ShellResult } from '@mongosh/shell-evaluator'; -import type { MongoshBus, CliUserConfig, ConfigProvider } from '@mongosh/types'; +import type { CliUserConfig, ConfigProvider, MongoshBus } from '@mongosh/types'; import askpassword from 'askpassword'; import { Console } from 'console'; import { once } from 'events'; import prettyRepl from 'pretty-repl'; import { ReplOptions, REPLServer, start as replStart } from 'repl'; -import { Readable, Writable, PassThrough } from 'stream'; +import { PassThrough, Readable, Writable } from 'stream'; import type { ReadStream, WriteStream } from 'tty'; import { callbackify, promisify } from 'util'; import * as asyncRepl from './async-repl'; import clr, { StyleDefinition } from './clr'; import { MONGOSH_WIKI, TELEMETRY_GREETING_MESSAGE } from './constants'; import formatOutput, { formatError } from './format-output'; -import { LineByLineInput } from './line-by-line-input'; -import { parseAnyLogEntry, LogEntry } from './log-entry'; import { makeMultilineJSIntoSingleLine } from './js-multiline-to-singleline'; +import { LineByLineInput } from './line-by-line-input'; +import { LogEntry, parseAnyLogEntry } from './log-entry'; export type MongoshCliOptions = ShellCliOptions & { redactInfo?: boolean; @@ -108,6 +108,8 @@ class MongoshNodeRepl implements EvaluationListener { inspectDepth = 0; started = false; showStackTraces = false; + loadNestingLevel = 0; + constructor(options: MongoshNodeReplOptions) { this.input = options.input; @@ -146,6 +148,7 @@ class MongoshNodeRepl implements EvaluationListener { historySize: await this.getConfig('historyLength'), wrapCallbackError: (err: Error) => Object.assign(new MongoshInternalError(err.message), { stack: err.stack }), + onAsyncSigint: this.onAsyncSigint.bind(this), ...this.nodeReplOptions }); fixupReplForNodeBug38314(repl); @@ -387,6 +390,18 @@ class MongoshNodeRepl implements EvaluationListener { // at all and instead leave that to the @mongosh/autocomplete package. return shellResult.type !== null ? null : shellResult.rawValue; } catch (err) { + if (this.runtimeState().internalState.interrupted.isSet()) { + this.bus.emit('mongosh:eval-interrupted'); + // The shell is interrupted by CTRL-C - so we ignore any errors + // that happened during evaluation. + const result: ShellResult = { + type: null, + rawValue: undefined, + printable: undefined + }; + return result; + } + if (!isErrorLike(err)) { throw new Error(this.formatOutput({ value: err @@ -397,7 +412,10 @@ class MongoshNodeRepl implements EvaluationListener { if (!this.insideAutoCompleteOrGetPrompt) { repl.setPrompt(await this.getShellPrompt()); } - this.bus.emit('mongosh:eval-complete'); // For testing purposes. + + if (this.loadNestingLevel <= 1) { + this.bus.emit('mongosh:eval-complete'); // For testing purposes. + } } } @@ -409,7 +427,14 @@ class MongoshNodeRepl implements EvaluationListener { return { resolvedFilename: absolutePath, - evaluate: async() => { await this.loadExternalCode(contents, absolutePath); } + evaluate: async() => { + this.loadNestingLevel += 1; + try { + await this.loadExternalCode(contents, absolutePath); + } finally { + this.loadNestingLevel -= 1; + } + } }; } @@ -422,6 +447,41 @@ class MongoshNodeRepl implements EvaluationListener { return await promisify(repl.eval.bind(repl))(code, repl.context, filename); } + async onAsyncSigint(): Promise { + const { internalState } = this.runtimeState(); + if (internalState.interrupted.isSet()) { + return true; + } + this.output.write('Stopping execution...'); + + const mongodVersion: string = internalState.connectionInfo.buildInfo?.version; + if (mongodVersion.match(/^(4\.0\.|3\.)\d+/)) { + this.output.write(this.clr( + `\nWARNING: Operations running on the server cannot be killed automatically for MongoDB ${mongodVersion}.` + + '\n Please make sure to kill them manually. Killing operations is supported starting with MongoDB 4.1.', + ['bold', 'yellow'] + )); + } + + const fullyInterrupted = await internalState.onInterruptExecution(); + // this is an async interrupt - the evaluation is still running in the background + // we wait until it finally completes (which should happen immediately) + await Promise.race([ + once(this.bus, 'mongosh:eval-interrupted'), + new Promise(resolve => setImmediate(resolve)) + ]); + + const fullyResumed = await internalState.onResumeExecution(); + if (!fullyInterrupted || !fullyResumed) { + this.output.write(this.formatError({ + name: 'MongoshInternalError', + message: 'Could not re-establish all connections, we suggest to restart the shell.' + })); + } + this.bus.emit('mongosh:interrupt-complete'); // For testing purposes. + return true; + } + /** * Format the result to a string so it can be written to the output stream. */ diff --git a/packages/cli-repl/test/e2e.spec.ts b/packages/cli-repl/test/e2e.spec.ts index 755bfd0366..8152cc9c93 100644 --- a/packages/cli-repl/test/e2e.spec.ts +++ b/packages/cli-repl/test/e2e.spec.ts @@ -429,16 +429,17 @@ describe('e2e', function() { describe('Ctrl+C aka SIGINT', () => { before(function() { if (process.platform === 'win32') { - this.skip(); // There is no SIGINT on Windows. + return this.skip(); // Cannot trigger SIGINT programmatically on Windows } }); - let shell; + let shell: TestShell; beforeEach(async() => { shell = TestShell.start({ args: [ '--nodb' ], removeSigintListeners: true }); await shell.waitForPrompt(); shell.assertNoErrors(); }); + it('interrupts sync execution', async() => { await shell.executeLine('void process.removeAllListeners("SIGINT")'); const result = shell.executeLine('while(true);'); @@ -450,13 +451,14 @@ describe('e2e', function() { const result = shell.executeLine('new Promise(() => {});'); setTimeout(() => shell.kill('SIGINT'), 3000); await result; - shell.assertContainsError('interrupted'); + shell.assertContainsOutput('Stopping execution...'); }); it('interrupts load()', async() => { const filename = path.resolve(__dirname, 'fixtures', 'load', 'infinite-loop.js'); const result = shell.executeLine(`load(${JSON.stringify(filename)})`); setTimeout(() => shell.kill('SIGINT'), 3000); await result; + // The while loop in the script is run as "sync" code shell.assertContainsError('interrupted'); }); it('behaves normally after an exception', async() => { @@ -466,6 +468,7 @@ describe('e2e', function() { await shell.waitForPrompt(); await new Promise((resolve) => setTimeout(resolve, 100)); shell.assertNotContainsOutput('interrupted'); + shell.assertNotContainsOutput('Stopping execution'); }); }); @@ -847,8 +850,7 @@ describe('e2e', function() { it('keeps working when the config file is present but not writable', async function() { if (process.platform === 'win32' || process.getuid() === 0 || process.geteuid() === 0) { - this.skip(); // There is no meaningful chmod on Windows, and root can ignore permissions. - return; + return this.skip(); // There is no meaningful chmod on Windows, and root can ignore permissions. } await fs.mkdir(path.dirname(configPath), { recursive: true }); await fs.writeFile(configPath, '{}'); diff --git a/packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts b/packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts index c9dab87ec4..684805da75 100644 --- a/packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts +++ b/packages/node-runtime-worker-thread/src/child-process-mongosh-bus.ts @@ -13,6 +13,9 @@ export class ChildProcessMongoshBus { }, on() { throw new Error("Can't use `on` method on ChildProcessMongoshBus"); + }, + once() { + throw new Error("Can't use `once` method on ChildProcessMongoshBus"); } }, childProcess diff --git a/packages/node-runtime-worker-thread/src/worker-runtime.ts b/packages/node-runtime-worker-thread/src/worker-runtime.ts index be603f51ec..0e81ba151f 100644 --- a/packages/node-runtime-worker-thread/src/worker-runtime.ts +++ b/packages/node-runtime-worker-thread/src/worker-runtime.ts @@ -61,6 +61,9 @@ const messageBus: MongoshBus = Object.assign( { on() { throw new Error("Can't call `on` method on worker runtime MongoshBus"); + }, + once() { + throw new Error("Can't call `once` method on worker runtime MongoshBus"); } } ); diff --git a/packages/service-provider-core/src/closable.ts b/packages/service-provider-core/src/closable.ts index d4202aafc9..1e54df4789 100644 --- a/packages/service-provider-core/src/closable.ts +++ b/packages/service-provider-core/src/closable.ts @@ -5,4 +5,10 @@ export default interface Closable { * @param {boolean} force - Whether to force close. */ close(force: boolean): Promise; + + /** + * Suspends the connection, i.e. temporarily force-closes it + * and returns a function that will re-open the connection. + */ + suspend(): Promise<() => Promise>; } diff --git a/packages/service-provider-server/src/cli-service-provider.ts b/packages/service-provider-server/src/cli-service-provider.ts index dea27b1adc..c53f8400c6 100644 --- a/packages/service-provider-server/src/cli-service-provider.ts +++ b/packages/service-provider-server/src/cli-service-provider.ts @@ -81,6 +81,7 @@ import { } from '@mongosh/service-provider-core'; import { MongoshCommandFailed, MongoshInternalError, MongoshRuntimeError } from '@mongosh/errors'; +import { ensureMongoNodeNativePatchesAreApplied } from './mongodb-patches'; const bsonlib = { Binary, @@ -238,6 +239,8 @@ class CliServiceProvider extends ServiceProviderCore implements ServiceProvider */ constructor(mongoClient: MongoClient, clientOptions: MongoClientOptions = {}, uri?: ConnectionString) { super(bsonlib); + ensureMongoNodeNativePatchesAreApplied(); + this.mongoClient = mongoClient; this.uri = uri; this.platform = ReplPlatform.CLI; @@ -454,6 +457,13 @@ class CliServiceProvider extends ServiceProviderCore implements ServiceProvider return this.mongoClient.close(force); } + async suspend(): Promise<() => Promise> { + await this.close(true); + return async() => { + await this.resetConnectionOptions({}); + }; + } + /** * Deprecated count command. * diff --git a/packages/service-provider-server/src/mongodb-patches.ts b/packages/service-provider-server/src/mongodb-patches.ts new file mode 100644 index 0000000000..b951436a26 --- /dev/null +++ b/packages/service-provider-server/src/mongodb-patches.ts @@ -0,0 +1,76 @@ +import { MongoshInternalError } from '@mongosh/errors'; +import { Callback, CloseOptions, Connection, ConnectionPool } from 'mongodb'; + +let alreadyPatched = false; + +// TODO: revisit whether we still need monkey patching in light of NODE-3263 +export function ensureMongoNodeNativePatchesAreApplied(): void { + if (alreadyPatched) { + return; + } + + patchConnectionPoolTracking(); + + alreadyPatched = true; +} + +const poolToConnections = new Map>(); + +function patchConnectionPoolTracking(): void { + const connectionPoolPrototype: ConnectionPool = require('mongodb/lib/cmap/connection_pool').ConnectionPool.prototype; + if (!connectionPoolPrototype) { + throw new MongoshInternalError('Failed to setup connection handling'); + } + + const originalCheckOut = connectionPoolPrototype.checkOut; + const newCheckOut: typeof originalCheckOut = function(this: ConnectionPool, cb: Callback): void { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const pool = this; + originalCheckOut.call(this, function(this: any, error, connection) { + if (connection) { + let connections = poolToConnections.get(pool); + if (!connections) { + connections = new Set(); + poolToConnections.set(pool, connections); + } + connections.add(connection); + } + + cb.call(this, error, connection); + }); + }; + connectionPoolPrototype.checkOut = newCheckOut; + + const originalCheckIn = connectionPoolPrototype.checkIn; + const newCheckIn: typeof originalCheckIn = function(this: ConnectionPool, connection: Connection): void { + if (connection) { + const connections = poolToConnections.get(this); + if (connections) { + connections.delete(connection); + } + } + originalCheckIn.call(this, connection); + }; + connectionPoolPrototype.checkIn = newCheckIn; + + const originalClose = connectionPoolPrototype.close; + const newClose: typeof originalClose = function(this: ConnectionPool, options: CloseOptions | Callback, cb?: Callback): void { + // eslint-disable-next-line @typescript-eslint/no-this-alias + const pool = this; + const connections = poolToConnections.get(pool); + if (!pool.closed && connections && typeof options === 'object' && options.force) { + const originalCallback = cb; + cb = function(this: any, error: any, result: any) { + poolToConnections.delete(pool); + [...connections].forEach(c => c.destroy({ force: true })); + + if (originalCallback) { + originalCallback.call(this, error, result); + } + }; + } + + originalClose.call(this, options as any, cb as any); + }; + connectionPoolPrototype.close = newClose; +} diff --git a/packages/shell-api/src/abstract-cursor.ts b/packages/shell-api/src/abstract-cursor.ts index 65302b79a4..8400c33411 100644 --- a/packages/shell-api/src/abstract-cursor.ts +++ b/packages/shell-api/src/abstract-cursor.ts @@ -1,9 +1,9 @@ import { shellApiClassNoHelp, - ShellApiClass, - returnsPromise, toShellResult, - returnType + returnType, + ShellApiWithMongoClass, + returnsPromise } from './decorators'; import type Mongo from './mongo'; import type { @@ -17,7 +17,7 @@ import { CursorIterationResult } from './result'; import { iterate, validateExplainableVerbosity, markAsExplainOutput } from './helpers'; @shellApiClassNoHelp -export abstract class AbstractCursor extends ShellApiClass { +export abstract class AbstractCursor extends ShellApiWithMongoClass { _mongo: Mongo; abstract _cursor: ServiceProviderAggregationCursor | ServiceProviderCursor; _currentIterationResult: CursorIterationResult | null = null; diff --git a/packages/shell-api/src/bulk.spec.ts b/packages/shell-api/src/bulk.spec.ts index 53ce06a004..3e58611199 100644 --- a/packages/shell-api/src/bulk.spec.ts +++ b/packages/shell-api/src/bulk.spec.ts @@ -4,7 +4,7 @@ import { fail } from 'assert'; import chai, { expect } from 'chai'; import sinonChai from 'sinon-chai'; import { EventEmitter } from 'events'; -import sinon, { StubbedInstance, stubInterface } from 'ts-sinon'; +import { StubbedInstance, stubInterface } from 'ts-sinon'; import Bulk, { BulkFindOp } from './bulk'; import Collection from './collection'; import { ALL_PLATFORMS, ALL_SERVER_VERSIONS, ALL_TOPOLOGIES } from './enums'; @@ -47,8 +47,8 @@ describe('Bulk API', () => { }); describe('Metadata', () => { describe('toShellResult', () => { - const mongo = sinon.spy(); - const b = new Bulk(mongo, { + const collection = stubInterface(); + const b = new Bulk(collection, { batches: [1, 2, 3, 4] } as any); it('value', async() => { diff --git a/packages/shell-api/src/bulk.ts b/packages/shell-api/src/bulk.ts index 414774b3df..42267fcdcc 100644 --- a/packages/shell-api/src/bulk.ts +++ b/packages/shell-api/src/bulk.ts @@ -1,4 +1,4 @@ -import { returnsPromise, ShellApiClass, shellApiClassDefault, returnType, deprecated } from './decorators'; +import { returnsPromise, shellApiClassDefault, returnType, deprecated, ShellApiWithMongoClass } from './decorators'; import Mongo from './mongo'; import { CommonErrors, MongoshInvalidInputError, MongoshUnimplementedError } from '@mongosh/errors'; import { @@ -17,7 +17,7 @@ import { BulkWriteResult } from './result'; import type Collection from './collection'; @shellApiClassDefault -export class BulkFindOp extends ShellApiClass { +export class BulkFindOp extends ShellApiWithMongoClass { _serviceProviderBulkFindOp: FindOperators; _parentBulk: Bulk; _hint: Document | undefined; @@ -28,6 +28,10 @@ export class BulkFindOp extends ShellApiClass { this._parentBulk = parentBulk; } + get _mongo(): Mongo { + return this._parentBulk._mongo; + } + [asPrintable](): string { return 'BulkFindOp'; } @@ -131,7 +135,7 @@ export class BulkFindOp extends ShellApiClass { @shellApiClassDefault -export default class Bulk extends ShellApiClass { +export default class Bulk extends ShellApiWithMongoClass { _mongo: Mongo; _collection: Collection; _batchCounts: any; @@ -139,7 +143,7 @@ export default class Bulk extends ShellApiClass { _serviceProviderBulkOp: OrderedBulkOperation | UnorderedBulkOperation; _ordered: boolean; - constructor(collection: any, innerBulk: OrderedBulkOperation | UnorderedBulkOperation, ordered = false) { + constructor(collection: Collection, innerBulk: OrderedBulkOperation | UnorderedBulkOperation, ordered = false) { super(); this._collection = collection; this._mongo = collection._mongo; diff --git a/packages/shell-api/src/change-stream-cursor.ts b/packages/shell-api/src/change-stream-cursor.ts index 3a786dd99a..2f05a7bb95 100644 --- a/packages/shell-api/src/change-stream-cursor.ts +++ b/packages/shell-api/src/change-stream-cursor.ts @@ -2,8 +2,8 @@ import { shellApiClassDefault, returnsPromise, returnType, - ShellApiClass, - deprecated + deprecated, + ShellApiWithMongoClass } from './decorators'; import { ChangeStream, @@ -22,7 +22,7 @@ import { printWarning } from './deprecation-warning'; import Mongo from './mongo'; @shellApiClassDefault -export default class ChangeStreamCursor extends ShellApiClass { +export default class ChangeStreamCursor extends ShellApiWithMongoClass { _mongo: Mongo; _cursor: ChangeStream; _currentIterationResult: CursorIterationResult | null = null; diff --git a/packages/shell-api/src/collection.ts b/packages/shell-api/src/collection.ts index 3572c43572..4411c38e0d 100644 --- a/packages/shell-api/src/collection.ts +++ b/packages/shell-api/src/collection.ts @@ -6,10 +6,10 @@ import { returnsPromise, returnType, serverVersions, - ShellApiClass, shellApiClassDefault, topologies, - deprecated + deprecated, + ShellApiWithMongoClass } from './decorators'; import { ADMIN_DB, asPrintable, namespaceInfo, ServerVersions, Topologies } from './enums'; import { @@ -79,7 +79,7 @@ type CollStatsShellOptions = CollStatsOptions & { @shellApiClassDefault @addSourceToResults -export default class Collection extends ShellApiClass { +export default class Collection extends ShellApiWithMongoClass { _mongo: Mongo; _database: Database; _name: string; diff --git a/packages/shell-api/src/database.ts b/packages/shell-api/src/database.ts index 53ca12ffb7..8d179bbd5b 100644 --- a/packages/shell-api/src/database.ts +++ b/packages/shell-api/src/database.ts @@ -5,10 +5,10 @@ import { returnsPromise, returnType, serverVersions, - ShellApiClass, shellApiClassDefault, topologies, - deprecated + deprecated, + ShellApiWithMongoClass } from './decorators'; import { ADMIN_DB, asPrintable, ServerVersions, Topologies } from './enums'; import { @@ -46,7 +46,7 @@ import { ShellApiErrors } from './error-codes'; type AuthDoc = {user: string, pwd: string, authDb?: string, mechanism?: string}; @shellApiClassDefault -export default class Database extends ShellApiClass { +export default class Database extends ShellApiWithMongoClass { _mongo: Mongo; _name: string; _collections: Record; diff --git a/packages/shell-api/src/decorators.ts b/packages/shell-api/src/decorators.ts index 09d3680402..7b27fee6a9 100644 --- a/packages/shell-api/src/decorators.ts +++ b/packages/shell-api/src/decorators.ts @@ -1,17 +1,17 @@ /* eslint-disable complexity */ -import Help from './help'; +import { MongoshInternalError } from '@mongosh/errors'; +import type { ReplPlatform } from '@mongosh/service-provider-core'; +import { Mongo, ShellInternalState } from '.'; import { - Topologies, ALL_PLATFORMS, - ALL_TOPOLOGIES, ALL_SERVER_VERSIONS, - shellApiType, + ALL_TOPOLOGIES, asPrintable, - namespaceInfo + namespaceInfo, shellApiType, Topologies } from './enums'; -import { MongoshInternalError } from '@mongosh/errors'; -import type { ReplPlatform } from '@mongosh/service-provider-core'; +import Help from './help'; import { addHiddenDataProperty } from './helpers'; +import { checkInterrupted } from './interruptor'; const addSourceToResultsSymbol = Symbol.for('@@mongosh.addSourceToResults'); const resultSource = Symbol.for('@@mongosh.resultSource'); @@ -52,8 +52,11 @@ export interface ShellResult { source?: ShellResultSourceInformation; } -export class ShellApiClass implements ShellApiInterface { - help: any; +export abstract class ShellApiClass implements ShellApiInterface { + public help: any; + + abstract get _internalState(): ShellInternalState; + get [shellApiType](): string { throw new MongoshInternalError('Shell API Type did not use decorators'); } @@ -68,6 +71,25 @@ export class ShellApiClass implements ShellApiInterface { } } +export abstract class ShellApiWithMongoClass extends ShellApiClass { + abstract get _mongo(): Mongo; + + get _internalState(): ShellInternalState { + // _mongo can be undefined in tests + return this._mongo?._internalState; + } +} + +export abstract class ShellApiValueClass extends ShellApiClass { + get _mongo(): Mongo { + throw new MongoshInternalError('Not supported on this value class'); + } + + get _internalState(): ShellInternalState { + throw new MongoshInternalError('Not supported on this value class'); + } +} + export function getShellApiType(rawValue: any): string | null { return (rawValue && rawValue[shellApiType]) ?? null; } @@ -133,6 +155,27 @@ function wrapWithAddSourceToResult(fn: Function): Function { return wrapper; } +function wrapWithInterruptChecks any>(fn: T): (args: Parameters) => ReturnType { + const wrapper = (fn as any).returnsPromise ? + markImplicitlyAwaited(async function(this: any, ...args: any[]): Promise { + const interrupted = checkInterrupted(this); + const result = await Promise.race([ + interrupted ? interrupted.asPromise() : new Promise(() => {}), + fn.call(this, ...args) + ]); + checkInterrupted(this); + return result; + }) : function(this: any, ...args: any[]): any { + checkInterrupted(this); + const result = fn.call(this, ...args); + checkInterrupted(this); + return result; + }; + Object.setPrototypeOf(wrapper, Object.getPrototypeOf(fn)); + Object.defineProperties(wrapper, Object.getOwnPropertyDescriptors(fn)); + return wrapper; +} + // This is a bit more restrictive than `AutocompleteParameters` used in the // internal state code, so that it can also be accessed by testing code in the // autocomplete package. You can expand this type to be closed to `AutocompleteParameters` @@ -224,6 +267,7 @@ export function shellApiClassGeneric(constructor: Function, hasHelp: boolean): v if ((constructor as any)[addSourceToResultsSymbol]) { method = wrapWithAddSourceToResult(method); } + method = wrapWithInterruptChecks(method); method.serverVersions = method.serverVersions || ALL_SERVER_VERSIONS; method.topologies = method.topologies || ALL_TOPOLOGIES; @@ -359,11 +403,25 @@ export function topologies(topologiesArray: Topologies[]): Function { } export const nonAsyncFunctionsReturningPromises: string[] = []; // For testing. export function returnsPromise(_target: any, _propertyKey: string, descriptor: PropertyDescriptor): void { - const orig = descriptor.value; - orig.returnsPromise = true; - descriptor.value = markImplicitlyAwaited(descriptor.value); - if (orig.constructor.name !== 'AsyncFunction') { - nonAsyncFunctionsReturningPromises.push(orig.name); + const originalFunction = descriptor.value; + originalFunction.returnsPromise = true; + + async function wrapper(this: any, ...args: any[]) { + try { + return await originalFunction.call(this, ...args); + } finally { + if (typeof setTimeout === 'function' && typeof setImmediate === 'function') { + // Not all JS environments have setImmediate + await new Promise(setImmediate); + } + } + } + Object.setPrototypeOf(wrapper, Object.getPrototypeOf(originalFunction)); + Object.defineProperties(wrapper, Object.getOwnPropertyDescriptors(originalFunction)); + descriptor.value = markImplicitlyAwaited(wrapper); + + if (originalFunction.constructor.name !== 'AsyncFunction') { + nonAsyncFunctionsReturningPromises.push(originalFunction.name); } } // This is use to mark functions that are executable in the shell in a POSIX-shell-like diff --git a/packages/shell-api/src/explainable.ts b/packages/shell-api/src/explainable.ts index b26c955549..3c2254e25a 100644 --- a/packages/shell-api/src/explainable.ts +++ b/packages/shell-api/src/explainable.ts @@ -4,9 +4,9 @@ import ExplainableCursor from './explainable-cursor'; import { returnsPromise, returnType, - ShellApiClass, shellApiClassDefault, - serverVersions + serverVersions, + ShellApiWithMongoClass } from './decorators'; import { asPrintable, ServerVersions } from './enums'; import { @@ -29,7 +29,7 @@ import type { } from '@mongosh/service-provider-core'; @shellApiClassDefault -export default class Explainable extends ShellApiClass { +export default class Explainable extends ShellApiWithMongoClass { _mongo: Mongo; _collection: Collection; _verbosity: ExplainVerbosityLike; diff --git a/packages/shell-api/src/field-level-encryption.ts b/packages/shell-api/src/field-level-encryption.ts index 4b80831d51..57af46bb8a 100644 --- a/packages/shell-api/src/field-level-encryption.ts +++ b/packages/shell-api/src/field-level-encryption.ts @@ -2,8 +2,8 @@ import { classPlatforms, returnsPromise, returnType, - ShellApiClass, - shellApiClassDefault + shellApiClassDefault, + ShellApiWithMongoClass } from './decorators'; import { ClientEncryption as MongoCryptClientEncryption, @@ -44,7 +44,7 @@ export interface ClientSideFieldLevelEncryptionOptions { @shellApiClassDefault @classPlatforms([ ReplPlatform.CLI ] ) -export class ClientEncryption extends ShellApiClass { +export class ClientEncryption extends ShellApiWithMongoClass { public _mongo: Mongo; public _libmongocrypt: MongoCryptClientEncryption; @@ -96,7 +96,7 @@ export class ClientEncryption extends ShellApiClass { @shellApiClassDefault @classPlatforms([ ReplPlatform.CLI ] ) -export class KeyVault extends ShellApiClass { +export class KeyVault extends ShellApiWithMongoClass { public _mongo: Mongo; public _clientEncryption: ClientEncryption; private _keyColl: Collection; diff --git a/packages/shell-api/src/interruptor.spec.ts b/packages/shell-api/src/interruptor.spec.ts new file mode 100644 index 0000000000..3c3db1edec --- /dev/null +++ b/packages/shell-api/src/interruptor.spec.ts @@ -0,0 +1,64 @@ +import { bson, ServiceProvider } from '@mongosh/service-provider-core'; +import { expect } from 'chai'; +import { EventEmitter } from 'events'; +import { StubbedInstance, stubInterface } from 'ts-sinon'; +import Database from './database'; +import Mongo from './mongo'; +import ShellInternalState from './shell-internal-state'; + +describe('interruptor', () => { + describe('with Shell API functions', () => { + let mongo: Mongo; + let serviceProvider: StubbedInstance; + let database: Database; + let bus: StubbedInstance; + let internalState: ShellInternalState; + + beforeEach(() => { + bus = stubInterface(); + serviceProvider = stubInterface(); + serviceProvider.initialDb = 'test'; + serviceProvider.bsonLibrary = bson; + serviceProvider.runCommand.resolves({ ok: 1 }); + serviceProvider.runCommandWithCheck.resolves({ ok: 1 }); + internalState = new ShellInternalState(serviceProvider, bus); + mongo = new Mongo(internalState, undefined, undefined, undefined, serviceProvider); + database = new Database(mongo, 'db1'); + }); + + it('causes an interrupt error to be thrown on entry', async() => { + internalState.interrupted.set(); + try { + await database.runCommand({ some: 1 }); + } catch (e) { + expect(e.name).to.equal('MongoshInterruptedError'); + expect(serviceProvider.runCommand).to.not.have.been.called; + expect(serviceProvider.runCommandWithCheck).to.not.have.been.called; + return; + } + expect.fail('Expected error'); + }); + + it('causes an interrupt error to be thrown on exit', async() => { + let resolveCall: (result: any) => void; + serviceProvider.runCommandWithCheck.callsFake(() => { + return new Promise(resolve => { + resolveCall = resolve; + }); + }); + + const runCommand = database.runCommand({ some: 1 }); + internalState.interrupted.set(); + resolveCall({ ok: 1 }); + + try { + await runCommand; + } catch (e) { + expect(e.name).to.equal('MongoshInterruptedError'); + expect(serviceProvider.runCommandWithCheck).to.have.been.called; + return; + } + expect.fail('Expected error'); + }); + }); +}); diff --git a/packages/shell-api/src/interruptor.ts b/packages/shell-api/src/interruptor.ts new file mode 100644 index 0000000000..a2ee55be4f --- /dev/null +++ b/packages/shell-api/src/interruptor.ts @@ -0,0 +1,74 @@ +import { MongoshBaseError, MongoshInternalError } from '@mongosh/errors'; +import { ShellApiClass } from './decorators'; +import { shellApiType } from './enums'; + +const kUncatchable = Symbol.for('@@mongosh.uncatchable'); + +export class MongoshInterruptedError extends MongoshBaseError { + [kUncatchable] = true; + + constructor() { + super('MongoshInterruptedError', 'execution was interrupted'); + } +} + +export class InterruptFlag { + private interrupted = false; + private deferred: { + reject: (e: MongoshInterruptedError) => void; + promise: Promise; + }; + + constructor() { + this.deferred = this.defer(); + } + + public isSet(): boolean { + return this.interrupted; + } + + /** + * The returned promise will never be resolved but is rejected + * when the interrupt is set. The rejection happens with an + * instance of `MongoshInterruptedError`. + * @returns Promise that is rejected when the interrupt is set + */ + public asPromise(): Promise { + return this.deferred.promise; + } + + public set(): void { + this.interrupted = true; + this.deferred.reject(new MongoshInterruptedError()); + } + + public reset(): void { + this.interrupted = false; + this.deferred = this.defer(); + } + + private defer(): { reject: (e: MongoshInterruptedError) => void; promise: Promise; } { + const result: any = {}; + result.promise = new Promise((_, reject) => { + result.reject = reject; + }); + result.promise.catch(() => { + // we ignore the error here - all others should be notified + // we just have to ensure there's at least one handler for it + // to prevent an UnhandledPromiseRejection + }); + return result; + } +} + +export function checkInterrupted(apiClass: any): InterruptFlag | undefined { + if (!apiClass[shellApiType]) { + throw new MongoshInternalError('checkInterrupted can only be called for functions from shell API classes'); + } + // internalState can be undefined in tests + const internalState = (apiClass as ShellApiClass)._internalState; + if (internalState?.interrupted?.isSet()) { + throw new MongoshInterruptedError(); + } + return internalState?.interrupted; +} diff --git a/packages/shell-api/src/mongo.ts b/packages/shell-api/src/mongo.ts index eabc496618..e2f00bd7e7 100644 --- a/packages/shell-api/src/mongo.ts +++ b/packages/shell-api/src/mongo.ts @@ -309,6 +309,10 @@ export default class Mongo extends ShellApiClass { await this._serviceProvider.close(force); } + async _suspend(): Promise<() => Promise> { + return await this._serviceProvider.suspend(); + } + getReadPrefMode(): ReadPreferenceModeId { return this._serviceProvider.getReadPreference().mode; } diff --git a/packages/shell-api/src/plan-cache.ts b/packages/shell-api/src/plan-cache.ts index d7d71f0f74..9b51d23a06 100644 --- a/packages/shell-api/src/plan-cache.ts +++ b/packages/shell-api/src/plan-cache.ts @@ -1,17 +1,18 @@ import { returnsPromise, serverVersions, - ShellApiClass, shellApiClassDefault, - deprecated + deprecated, + ShellApiWithMongoClass } from './decorators'; import { Document } from '@mongosh/service-provider-core'; import Collection from './collection'; import { asPrintable, ServerVersions } from './enums'; import { MongoshDeprecatedError } from '@mongosh/errors'; +import Mongo from './mongo'; @shellApiClassDefault -export default class PlanCache extends ShellApiClass { +export default class PlanCache extends ShellApiWithMongoClass { _collection: Collection; constructor(collection: Collection) { @@ -19,6 +20,10 @@ export default class PlanCache extends ShellApiClass { this._collection = collection; } + get _mongo(): Mongo { + return this._collection._mongo; + } + /** * Internal method to determine what is printed for this class. */ diff --git a/packages/shell-api/src/replica-set.ts b/packages/shell-api/src/replica-set.ts index 620fd3dacd..cf5416bf4a 100644 --- a/packages/shell-api/src/replica-set.ts +++ b/packages/shell-api/src/replica-set.ts @@ -1,9 +1,9 @@ import Database from './database'; import { shellApiClassDefault, - ShellApiClass, returnsPromise, - deprecated + deprecated, + ShellApiWithMongoClass } from './decorators'; import { Document @@ -13,9 +13,10 @@ import { assertArgsDefinedType } from './helpers'; import { CommonErrors, MongoshDeprecatedError, MongoshInvalidInputError, MongoshRuntimeError } from '@mongosh/errors'; import { CommandResult } from './result'; import { redactCredentials } from '@mongosh/history'; +import { Mongo } from '.'; @shellApiClassDefault -export default class ReplicaSet extends ShellApiClass { +export default class ReplicaSet extends ShellApiWithMongoClass { _database: Database; constructor(database: Database) { @@ -23,6 +24,10 @@ export default class ReplicaSet extends ShellApiClass { this._database = database; } + get _mongo(): Mongo { + return this._database._mongo; + } + /** * rs.initiate calls replSetInitiate admin command. * diff --git a/packages/shell-api/src/result.ts b/packages/shell-api/src/result.ts index b7ec849ae9..b2356e7af2 100644 --- a/packages/shell-api/src/result.ts +++ b/packages/shell-api/src/result.ts @@ -1,9 +1,9 @@ -import { ShellApiClass, shellApiClassDefault } from './decorators'; +import { shellApiClassDefault, ShellApiValueClass } from './decorators'; import { shellApiType, asPrintable } from './enums'; import { Document, ObjectIdType } from '@mongosh/service-provider-core'; @shellApiClassDefault -export class CommandResult extends ShellApiClass { +export class CommandResult extends ShellApiValueClass { value: unknown; type: string; constructor(type: string, value: unknown) { @@ -22,7 +22,7 @@ export class CommandResult extends ShellApiClass { } @shellApiClassDefault -export class BulkWriteResult extends ShellApiClass { +export class BulkWriteResult extends ShellApiValueClass { acknowledged: boolean; insertedCount: number; insertedIds: {[index: number]: ObjectIdType}; @@ -53,7 +53,7 @@ export class BulkWriteResult extends ShellApiClass { } @shellApiClassDefault -export class InsertManyResult extends ShellApiClass { +export class InsertManyResult extends ShellApiValueClass { acknowledged: boolean; insertedIds: { [key: number]: ObjectIdType }; constructor(acknowledged: boolean, insertedIds: { [key: number]: ObjectIdType }) { @@ -64,7 +64,7 @@ export class InsertManyResult extends ShellApiClass { } @shellApiClassDefault -export class InsertOneResult extends ShellApiClass { +export class InsertOneResult extends ShellApiValueClass { acknowledged: boolean; insertedId: ObjectIdType | undefined; constructor(acknowledged: boolean, insertedId?: ObjectIdType) { @@ -75,7 +75,7 @@ export class InsertOneResult extends ShellApiClass { } @shellApiClassDefault -export class UpdateResult extends ShellApiClass { +export class UpdateResult extends ShellApiValueClass { acknowledged: boolean; insertedId: ObjectIdType; matchedCount: number; @@ -97,7 +97,7 @@ export class UpdateResult extends ShellApiClass { } @shellApiClassDefault -export class DeleteResult extends ShellApiClass { +export class DeleteResult extends ShellApiValueClass { acknowledged: boolean; deletedCount: number | undefined; constructor(acknowledged: boolean, deletedCount: number | undefined) { @@ -108,7 +108,7 @@ export class DeleteResult extends ShellApiClass { } @shellApiClassDefault -export class CursorIterationResult extends ShellApiClass { +export class CursorIterationResult extends ShellApiValueClass { cursorHasMore: boolean; documents: Document[]; diff --git a/packages/shell-api/src/session.ts b/packages/shell-api/src/session.ts index 5de76fcb86..659f53d6ce 100644 --- a/packages/shell-api/src/session.ts +++ b/packages/shell-api/src/session.ts @@ -2,8 +2,8 @@ import { classPlatforms, classReturnsPromise, returnsPromise, - ShellApiClass, - shellApiClassDefault + shellApiClassDefault, + ShellApiWithMongoClass } from './decorators'; import { Document, @@ -25,11 +25,11 @@ import { assertArgsDefinedType } from './helpers'; @shellApiClassDefault @classReturnsPromise @classPlatforms([ ReplPlatform.CLI ] ) -export default class Session extends ShellApiClass { +export default class Session extends ShellApiWithMongoClass { public id: ServerSessionId | undefined; public _session: ClientSession; public _options: ClientSessionOptions; - private _mongo: Mongo; + public _mongo: Mongo; private _databases: Record; constructor(mongo: Mongo, options: ClientSessionOptions, session: ClientSession) { diff --git a/packages/shell-api/src/shard.ts b/packages/shell-api/src/shard.ts index e4bf98d213..ac1adba3dc 100644 --- a/packages/shell-api/src/shard.ts +++ b/packages/shell-api/src/shard.ts @@ -1,7 +1,7 @@ import Database from './database'; import { shellApiClassDefault, - ShellApiClass, returnsPromise, serverVersions + returnsPromise, serverVersions, ShellApiWithMongoClass } from './decorators'; import type { Document } from '@mongosh/service-provider-core'; @@ -9,9 +9,10 @@ import { assertArgsDefinedType, getConfigDB, getPrintableShardStatus } from './h import { ServerVersions, asPrintable } from './enums'; import { CommandResult, UpdateResult } from './result'; import { redactCredentials } from '@mongosh/history'; +import Mongo from './mongo'; @shellApiClassDefault -export default class Shard extends ShellApiClass { +export default class Shard extends ShellApiWithMongoClass { _database: Database; constructor(database: Database) { @@ -19,6 +20,10 @@ export default class Shard extends ShellApiClass { this._database = database; } + get _mongo(): Mongo { + return this._database._mongo; + } + /** * Internal method to determine what is printed for this class. */ diff --git a/packages/shell-api/src/shell-api.ts b/packages/shell-api/src/shell-api.ts index b4b99f0e64..8bff5fafbb 100644 --- a/packages/shell-api/src/shell-api.ts +++ b/packages/shell-api/src/shell-api.ts @@ -106,7 +106,7 @@ export default class ShellApi extends ShellApiClass { this.config = new ShellConfig(internalState); } - get internalState(): ShellInternalState { + get _internalState(): ShellInternalState { return this[internalStateSymbol]; } @@ -121,24 +121,24 @@ export default class ShellApi extends ShellApiClass { @directShellCommand @shellCommandCompleter(useCompleter) use(db: string): any { - return this.internalState.currentDb._mongo.use(db); + return this._internalState.currentDb._mongo.use(db); } @directShellCommand @returnsPromise @shellCommandCompleter(showCompleter) async show(cmd: string, arg?: string): Promise { - return await this.internalState.currentDb._mongo.show(cmd, arg); + return await this._internalState.currentDb._mongo.show(cmd, arg); } @directShellCommand @returnsPromise @platforms([ ReplPlatform.CLI ] ) async exit(): Promise { - assertCLI(this.internalState.initialServiceProvider.platform, 'the exit/quit commands'); - await this.internalState.close(true); + assertCLI(this._internalState.initialServiceProvider.platform, 'the exit/quit commands'); + await this._internalState.close(true); // This should never actually return. - await this.internalState.evaluationListener.onExit?.(); + await this._internalState.evaluationListener.onExit?.(); throw new MongoshInternalError('.onExit listener returned'); } @@ -156,10 +156,10 @@ export default class ShellApi extends ShellApiClass { uri?: string, fleOptions?: ClientSideFieldLevelEncryptionOptions, otherOptions?: { api?: ServerApi | ServerApiVersionId }): Promise { - assertCLI(this.internalState.initialServiceProvider.platform, 'new Mongo connections'); - const mongo = new Mongo(this.internalState, uri, fleOptions, otherOptions); + assertCLI(this._internalState.initialServiceProvider.platform, 'new Mongo connections'); + const mongo = new Mongo(this._internalState, uri, fleOptions, otherOptions); await mongo.connect(); - this.internalState.mongos.push(mongo); + this._internalState.mongos.push(mongo); return mongo; } @@ -168,10 +168,10 @@ export default class ShellApi extends ShellApiClass { @platforms([ ReplPlatform.CLI ] ) async connect(uri: string, user?: string, pwd?: string): Promise { assertArgsDefinedType([uri, user, pwd], ['string', [undefined, 'string'], [undefined, 'string']], 'connect'); - assertCLI(this.internalState.initialServiceProvider.platform, 'new Mongo connections'); - const mongo = new Mongo(this.internalState, uri); + assertCLI(this._internalState.initialServiceProvider.platform, 'new Mongo connections'); + const mongo = new Mongo(this._internalState, uri); await mongo.connect(user, pwd); - this.internalState.mongos.push(mongo); + this._internalState.mongos.push(mongo); const db = mongo._serviceProvider.initialDb || DEFAULT_DB; return mongo.getDB(db); } @@ -179,10 +179,10 @@ export default class ShellApi extends ShellApiClass { @directShellCommand @returnsPromise async it(): Promise { - if (!this.internalState.currentCursor) { + if (!this._internalState.currentCursor) { return new CursorIterationResult(); } - return await this.internalState.currentCursor._it(); + return await this._internalState.currentCursor._it(); } version(): string { @@ -193,21 +193,21 @@ export default class ShellApi extends ShellApiClass { @returnsPromise async load(filename: string): Promise { assertArgsDefinedType([filename], ['string'], 'load'); - if (!this.internalState.evaluationListener.onLoad) { + if (!this._internalState.evaluationListener.onLoad) { throw new MongoshUnimplementedError( 'load is not currently implemented for this platform', CommonErrors.NotImplemented ); } - this.internalState.messageBus.emit('mongosh:api-load-file', { + this._internalState.messageBus.emit('mongosh:api-load-file', { nested: this.loadCallNestingLevel > 0, filename }); const { resolvedFilename, evaluate - } = await this.internalState.evaluationListener.onLoad(filename); + } = await this._internalState.evaluationListener.onLoad(filename); - const context = this.internalState.context; + const context = this._internalState.context; const previousFilename = context.__filename; context.__filename = resolvedFilename; context.__dirname = dirname(resolvedFilename); @@ -230,7 +230,7 @@ export default class ShellApi extends ShellApiClass { @returnsPromise @platforms([ ReplPlatform.CLI ] ) async enableTelemetry(): Promise { - const result = await this.internalState.evaluationListener.setConfig?.('enableTelemetry', true); + const result = await this._internalState.evaluationListener.setConfig?.('enableTelemetry', true); if (result === 'success') { return i18n.__('cli-repl.cli-repl.enabledTelemetry'); } @@ -239,7 +239,7 @@ export default class ShellApi extends ShellApiClass { @returnsPromise @platforms([ ReplPlatform.CLI ] ) async disableTelemetry(): Promise { - const result = await this.internalState.evaluationListener.setConfig?.('enableTelemetry', false); + const result = await this._internalState.evaluationListener.setConfig?.('enableTelemetry', false); if (result === 'success') { return i18n.__('cli-repl.cli-repl.disabledTelemetry'); } @@ -248,7 +248,7 @@ export default class ShellApi extends ShellApiClass { @returnsPromise @platforms([ ReplPlatform.CLI ] ) async passwordPrompt(): Promise { - const { evaluationListener } = this.internalState; + const { evaluationListener } = this._internalState; if (!evaluationListener.onPrompt) { throw new MongoshUnimplementedError('passwordPrompt() is not available in this shell', CommonErrors.NotImplemented); } @@ -262,7 +262,7 @@ export default class ShellApi extends ShellApiClass { @returnsPromise async print(...origArgs: any[]): Promise { - const { evaluationListener } = this.internalState; + const { evaluationListener } = this._internalState; const args: ShellResult[] = await Promise.all(origArgs.map(arg => toShellResult(arg))); await evaluationListener.onPrint?.(args); @@ -276,7 +276,7 @@ export default class ShellApi extends ShellApiClass { @directShellCommand @returnsPromise async cls(): Promise { - const { evaluationListener } = this.internalState; + const { evaluationListener } = this._internalState; await evaluationListener.onClearCommand?.(); } } diff --git a/packages/shell-api/src/shell-internal-state.ts b/packages/shell-api/src/shell-internal-state.ts index f02612f34c..b36a4937a5 100644 --- a/packages/shell-api/src/shell-internal-state.ts +++ b/packages/shell-api/src/shell-internal-state.ts @@ -8,7 +8,7 @@ import { TopologyDescription, TopologyTypeId } from '@mongosh/service-provider-core'; -import type { ApiEvent, MongoshBus, ConfigProvider, ShellUserConfig } from '@mongosh/types'; +import type { ApiEvent, ConfigProvider, MongoshBus, ShellUserConfig } from '@mongosh/types'; import { EventEmitter } from 'events'; import redactInfo from 'mongodb-redact'; import ChangeStreamCursor from './change-stream-cursor'; @@ -26,6 +26,7 @@ import { ShellApi, ShellResult } from './index'; +import { InterruptFlag } from './interruptor'; import NoDatabase from './no-db'; import constructShellBson from './shell-bson'; @@ -106,6 +107,12 @@ export default class ShellInternalState { public mongocryptdSpawnPath: string | null; public batchSizeFromDBQuery: number | undefined = undefined; + public readonly interrupted = new InterruptFlag(); + public resumeMongosAfterInterrupt: Array<{ + mongo: Mongo, + resume: (() => Promise) | null + }> | undefined; + constructor(initialServiceProvider: ServiceProvider, messageBus: any = new EventEmitter(), cliOptions: ShellCliOptions = {}) { this.initialServiceProvider = initialServiceProvider; this.messageBus = messageBus; @@ -311,6 +318,49 @@ export default class ShellInternalState { }; } + async onInterruptExecution(): Promise { + this.interrupted.set(); + this.currentCursor = null; + + this.resumeMongosAfterInterrupt = await Promise.all(this.mongos.map(async m => { + try { + return { + mongo: m, + resume: await m._suspend() + }; + } catch (e) { + return { + mongo: m, + resume: null + }; + } + })); + return !this.resumeMongosAfterInterrupt.find(r => r.resume === null); + } + + async onResumeExecution(): Promise { + const promises = this.resumeMongosAfterInterrupt?.map(async r => { + if (!this.mongos.find(m => m === r.mongo)) { + // we do not resume mongo instances that we don't track anymore + return true; + } + if (r.resume === null) { + return false; + } + try { + await r.resume(); + return true; + } catch (e) { + return false; + } + }) ?? []; + this.resumeMongosAfterInterrupt = undefined; + + const result = await Promise.all(promises); + this.interrupted.reset(); + return !result.find(r => r === false); + } + async getDefaultPrompt(): Promise { return `${this.getDefaultPromptPrefix()}${this.getTopologySpecificPrompt()}> `; } diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index 622db8aa34..8ef21fcec7 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -92,17 +92,20 @@ export interface MongoshBusEventsMap { 'mongosh:mongoshrc-load': () => void; 'mongosh:mongoshrc-mongorc-warn': () => void; 'mongosh:eval-cli-script': () => void; + 'mongosh:eval-interrupted': () => void; 'mongosh:mongocryptd-tryspawn': (ev: MongocryptdTrySpawnEvent) => void; 'mongosh:mongocryptd-error': (ev: MongocryptdErrorEvent) => void; 'mongosh:mongocryptd-log': (ev: MongocryptdLogEvent) => void; 'mongosh:closed': () => void; // For testing. 'mongosh:eval-complete': () => void; // For testing. 'mongosh:autocompletion-complete': () => void; // For testing. + 'mongosh:interrupt-complete': () => void; // For testing. } export interface MongoshBus { // TypeScript uses something like this itself for its EventTarget definitions. on(event: K, listener: MongoshBusEventsMap[K]): this; + once(event: K, listener: MongoshBusEventsMap[K]): this; emit(event: K, ...args: MongoshBusEventsMap[K] extends (...args: infer P) => any ? P : never): unknown; }