diff --git a/packages/shell-api/src/index.ts b/packages/shell-api/src/index.ts index e7636e59d7..e5f0fb4066 100644 --- a/packages/shell-api/src/index.ts +++ b/packages/shell-api/src/index.ts @@ -33,6 +33,7 @@ import { TypeSignature, } from './decorators'; import { Topologies, ServerVersions } from './enums'; +import { InterruptFlag } from './interruptor'; export { AggregationCursor, @@ -68,4 +69,5 @@ export { TypeSignature, OnLoadResult, ShellPlugin, + InterruptFlag, }; diff --git a/packages/shell-api/src/interruptor.ts b/packages/shell-api/src/interruptor.ts index 8aeacfd4b4..4445a23919 100644 --- a/packages/shell-api/src/interruptor.ts +++ b/packages/shell-api/src/interruptor.ts @@ -15,13 +15,20 @@ export class MongoshInterruptedError extends MongoshBaseError { } } +export interface InterruptWatcher { + destroy: () => void; + promise: Promise; +} + /** * Contains the interruption state for a given shell instance and * exposes ways to listen to changes of that state. */ export class InterruptFlag { private interrupted = false; - private onInterruptListeners: ((err: Error) => void)[] = []; + private onInterruptListeners = new Set< + (err: Error) => void | Promise + >(); /** * Returns whether an interrupt is currently in progress, i.e. @@ -48,24 +55,28 @@ export class InterruptFlag { * instance of `MongoshInterruptedError`. * @returns Promise that is rejected when the interrupt is set */ - public asPromise(): { destroy: () => void; promise: Promise } { + public asPromise(): InterruptWatcher { if (this.interrupted) { + const promise = Promise.reject(new MongoshInterruptedError()); + promise.catch(() => { + /* suppress potential unhandled rejection */ + }); return { destroy: () => {}, - promise: Promise.reject(new MongoshInterruptedError()), + promise, }; } let destroy: (() => void) | undefined; const promise = new Promise((_, reject) => { destroy = () => { - const index = this.onInterruptListeners.indexOf(reject); - if (index !== -1) { - this.onInterruptListeners.splice(index, 1); - } + this.onInterruptListeners.delete(reject); reject(null); }; - this.onInterruptListeners.push(reject); + this.onInterruptListeners.add(reject); + }); + promise.catch(() => { + /* suppress potential unhandled rejection */ }); return { destroy: destroy as unknown as () => void, @@ -75,12 +86,22 @@ export class InterruptFlag { /** * Mark an interrupt as having occurred. + * + * This should almost always be instantenous, although an additional listener + * installed through withOverrideInterruptBehavior() may perform additional + * cleanup work before the current connection is ready to be severed. */ - public set(): void { + public async set(): Promise { this.interrupted = true; const err = new MongoshInterruptedError(); for (const listener of [...this.onInterruptListeners]) { - listener(err); + try { + await listener(err); + } catch { + // Not a lot we can do about an error in an interrupt listener. + // If the listener was added via `withOverrideInterruptBehavior()`, + // then that function also propagates the error back to the caller. + } } } @@ -90,4 +111,43 @@ export class InterruptFlag { public reset(): void { this.interrupted = false; } + + /** + * Run a function while providing a way to run specific cleanup code + * before an interrupt inside it fires. This is different from a + * try/finally in that a finally block may not run before mongosh's own + * interruption handling code, including closing MongoClients to abort + * connections. + */ + public async withOverrideInterruptBehavior< + Action extends (watcher: InterruptWatcher) => any, + OnInterrupt extends () => Promise | void + >(fn: Action, onInterrupt: OnInterrupt): Promise> { + const watcher = this.asPromise(); + let listener!: () => Promise; + const onInterruptFinishPromise = new Promise((resolve) => { + listener = async () => { + const interruptHandlerResult = onInterrupt(); + resolve(interruptHandlerResult); + return interruptHandlerResult; + }; + }); + this.onInterruptListeners.add(listener); + try { + this.checkpoint(); + const resultPromise = fn(watcher); + resultPromise.catch(() => { + /* suppress potential unhandled rejection */ + }); + return await Promise.race([resultPromise, watcher.promise]); + } catch (err) { + if (this.interrupted) { + await onInterruptFinishPromise; + } + throw err; + } finally { + watcher.destroy(); + this.onInterruptListeners.delete(listener); + } + } } diff --git a/packages/shell-api/src/shell-instance-state.ts b/packages/shell-api/src/shell-instance-state.ts index 4f022cd55d..4afbfe6b03 100644 --- a/packages/shell-api/src/shell-instance-state.ts +++ b/packages/shell-api/src/shell-instance-state.ts @@ -437,7 +437,7 @@ export default class ShellInstanceState { } async onInterruptExecution(): Promise { - this.interrupted.set(); + await this.interrupted.set(); this.currentCursor = null; this.resumeMongosAfterInterrupt = await Promise.all( diff --git a/packages/shell-api/src/streams.ts b/packages/shell-api/src/streams.ts index 99491297e6..64ff5738e7 100644 --- a/packages/shell-api/src/streams.ts +++ b/packages/shell-api/src/streams.ts @@ -72,29 +72,22 @@ export class Streams extends ShellApiWithMongoClass { }; const sp = this.getProcessor(name); - const stopAndDrop = () => { - sp.stop() - .then(() => sp.drop()) - .catch(() => { - /* ignore */ - }); - }; - - try { - await sp._sampleFrom(cursorId); - } catch (err) { - // try to stop and drop the temp processor on error - // wait until execution resumed if its interrupted - const isInterrupted = err instanceof MongoshInterruptedError; - Promise.resolve( - isInterrupted ? this._instanceState.onResumeExecution() : 0 - ) - .then(stopAndDrop) - .catch(() => void 0); - - throw err; + async function stopAndDrop() { + try { + await sp.stop(); + await sp.drop(); + } catch { + /* ignore */ + } } + await this._instanceState.interrupted.withOverrideInterruptBehavior( + async () => { + await sp._sampleFrom(cursorId); + }, + stopAndDrop + ); + // stop and drop the temp processor if reached the end of sample return stopAndDrop(); }