Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/shell-api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
TypeSignature,
} from './decorators';
import { Topologies, ServerVersions } from './enums';
import { InterruptFlag } from './interruptor';

export {
AggregationCursor,
Expand Down Expand Up @@ -68,4 +69,5 @@ export {
TypeSignature,
OnLoadResult,
ShellPlugin,
InterruptFlag,
};
80 changes: 70 additions & 10 deletions packages/shell-api/src/interruptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@ export class MongoshInterruptedError extends MongoshBaseError {
}
}

export interface InterruptWatcher {
destroy: () => void;
promise: Promise<never>;
}

/**
* Contains the interruption state for a given shell instance and
* exposes ways to listen to changes of that state.
*/
export class InterruptFlag {
private interrupted = false;
private onInterruptListeners: ((err: Error) => void)[] = [];
private onInterruptListeners = new Set<
(err: Error) => void | Promise<void>
>();

/**
* Returns whether an interrupt is currently in progress, i.e.
Expand All @@ -48,24 +55,28 @@ export class InterruptFlag {
* instance of `MongoshInterruptedError`.
* @returns Promise that is rejected when the interrupt is set
*/
public asPromise(): { destroy: () => void; promise: Promise<never> } {
public asPromise(): InterruptWatcher {
if (this.interrupted) {
const promise = Promise.reject(new MongoshInterruptedError());
promise.catch(() => {
/* suppress potential unhandled rejection */
});
return {
destroy: () => {},
promise: Promise.reject(new MongoshInterruptedError()),
promise,
};
}

let destroy: (() => void) | undefined;
const promise = new Promise<never>((_, reject) => {
destroy = () => {
const index = this.onInterruptListeners.indexOf(reject);
if (index !== -1) {
this.onInterruptListeners.splice(index, 1);
}
this.onInterruptListeners.delete(reject);
reject(null);
};
this.onInterruptListeners.push(reject);
this.onInterruptListeners.add(reject);
});
promise.catch(() => {
/* suppress potential unhandled rejection */
});
return {
destroy: destroy as unknown as () => void,
Expand All @@ -75,12 +86,22 @@ export class InterruptFlag {

/**
* Mark an interrupt as having occurred.
*
* This should almost always be instantenous, although an additional listener
* installed through withOverrideInterruptBehavior() may perform additional
* cleanup work before the current connection is ready to be severed.
*/
public set(): void {
public async set(): Promise<void> {
this.interrupted = true;
const err = new MongoshInterruptedError();
for (const listener of [...this.onInterruptListeners]) {
listener(err);
try {
await listener(err);
} catch {
// Not a lot we can do about an error in an interrupt listener.
// If the listener was added via `withOverrideInterruptBehavior()`,
// then that function also propagates the error back to the caller.
}
}
}

Expand All @@ -90,4 +111,43 @@ export class InterruptFlag {
public reset(): void {
this.interrupted = false;
}

/**
* Run a function while providing a way to run specific cleanup code
* before an interrupt inside it fires. This is different from a
* try/finally in that a finally block may not run before mongosh's own
* interruption handling code, including closing MongoClients to abort
* connections.
*/
public async withOverrideInterruptBehavior<
Action extends (watcher: InterruptWatcher) => any,
OnInterrupt extends () => Promise<void> | void
>(fn: Action, onInterrupt: OnInterrupt): Promise<ReturnType<Action>> {
const watcher = this.asPromise();
let listener!: () => Promise<void>;
const onInterruptFinishPromise = new Promise<void>((resolve) => {
listener = async () => {
const interruptHandlerResult = onInterrupt();
resolve(interruptHandlerResult);
return interruptHandlerResult;
};
});
this.onInterruptListeners.add(listener);
try {
this.checkpoint();
const resultPromise = fn(watcher);
resultPromise.catch(() => {
/* suppress potential unhandled rejection */
});
return await Promise.race([resultPromise, watcher.promise]);
} catch (err) {
if (this.interrupted) {
await onInterruptFinishPromise;
}
throw err;
} finally {
watcher.destroy();
this.onInterruptListeners.delete(listener);
}
}
}
2 changes: 1 addition & 1 deletion packages/shell-api/src/shell-instance-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ export default class ShellInstanceState {
}

async onInterruptExecution(): Promise<boolean> {
this.interrupted.set();
await this.interrupted.set();
this.currentCursor = null;

this.resumeMongosAfterInterrupt = await Promise.all(
Expand Down
35 changes: 14 additions & 21 deletions packages/shell-api/src/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,29 +72,22 @@ export class Streams extends ShellApiWithMongoClass {
};
const sp = this.getProcessor(name);

const stopAndDrop = () => {
sp.stop()
.then(() => sp.drop())
.catch(() => {
/* ignore */
});
};

try {
await sp._sampleFrom(cursorId);
} catch (err) {
// try to stop and drop the temp processor on error
// wait until execution resumed if its interrupted
const isInterrupted = err instanceof MongoshInterruptedError;
Promise.resolve(
isInterrupted ? this._instanceState.onResumeExecution() : 0
)
.then(stopAndDrop)
.catch(() => void 0);

throw err;
async function stopAndDrop() {
try {
await sp.stop();
await sp.drop();
} catch {
/* ignore */
}
}

await this._instanceState.interrupted.withOverrideInterruptBehavior(
async () => {
await sp._sampleFrom(cursorId);
},
stopAndDrop
);

// stop and drop the temp processor if reached the end of sample
return stopAndDrop();
}
Expand Down