Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions packages/shell-api/src/decorators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,21 @@ function wrapWithApiChecks<T extends(...args: any[]) => any>(fn: T, className: s
markImplicitlyAwaited(async function(this: any, ...args: any[]): Promise<any> {
const internalState = getShellInternalState(this);
checkForDeprecation(internalState, className, fn);
const interrupted = checkInterrupted(internalState);
const interruptFlag = checkInterrupted(internalState);
const interrupt = interruptFlag?.asPromise();

let result: any;
try {
result = await Promise.race([
interrupted ? interrupted.asPromise() : new Promise(() => {}),
interrupt?.promise ?? new Promise<never>(() => {}),
fn.call(this, ...args)
]);
} catch (e) {
throw rephraseMongoError(e);
} finally {
if (interrupt) {
interrupt.destroy();
}
}
checkInterrupted(internalState);
return result;
Expand Down
39 changes: 39 additions & 0 deletions packages/shell-api/src/interruptor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,48 @@ import { EventEmitter } from 'events';
import { StubbedInstance, stubInterface } from 'ts-sinon';
import Database from './database';
import Mongo from './mongo';
import { InterruptFlag, MongoshInterruptedError } from './interruptor';
import ShellInternalState from './shell-internal-state';
import { promisify } from 'util';

describe('interruptor', () => {
describe('InterruptFlag', () => {
let interruptFlag: InterruptFlag;

beforeEach(() => {
interruptFlag = new InterruptFlag();
});

describe('asPromise', () => {
let interruptPromise: { destroy: () => void; promise: Promise<never> };

it('rejects the promise on interrupt', async() => {
interruptPromise = interruptFlag.asPromise();
let interruptError: MongoshInterruptedError | undefined;
interruptPromise.promise.catch(e => {
interruptError = e;
});
expect(interruptError).to.be.undefined;
interruptFlag.set();
await promisify(process.nextTick)();
expect(interruptError).to.be.instanceOf(MongoshInterruptedError);
});

it('rejects immediately if the interrupt happened before', async() => {
interruptFlag.set();

interruptPromise = interruptFlag.asPromise();
let interruptError: MongoshInterruptedError | undefined;
interruptPromise.promise.catch(e => {
interruptError = e;
});

await promisify(process.nextTick)();
expect(interruptError).to.be.instanceOf(MongoshInterruptedError);
});
});
});

describe('with Shell API functions', () => {
let mongo: Mongo;
let serviceProvider: StubbedInstance<ServiceProvider>;
Expand Down
49 changes: 24 additions & 25 deletions packages/shell-api/src/interruptor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { MongoshBaseError } from '@mongosh/errors';
import { EventEmitter } from 'events';
import ShellInternalState from './shell-internal-state';

const interruptEvent = 'interrupted';
const kUncatchable = Symbol.for('@@mongosh.uncatchable');

export class MongoshInterruptedError extends MongoshBaseError {
Expand All @@ -13,14 +15,7 @@ export class MongoshInterruptedError extends MongoshBaseError {

export class InterruptFlag {
private interrupted = false;
private deferred: {
reject: (e: MongoshInterruptedError) => void;
promise: Promise<never>;
};

constructor() {
this.deferred = this.defer();
}
private onInterrupt = new EventEmitter();

public isSet(): boolean {
return this.interrupted;
Expand All @@ -32,31 +27,35 @@ export class InterruptFlag {
* instance of `MongoshInterruptedError`.
* @returns Promise that is rejected when the interrupt is set
*/
public asPromise(): Promise<never> {
return this.deferred.promise;
public asPromise(): { destroy: () => void; promise: Promise<never> } {
if (this.interrupted) {
return {
destroy: () => {},
promise: Promise.reject(new MongoshInterruptedError())
};
}

let destroy: (() => void) | undefined;
const promise = new Promise<never>((_, reject) => {
destroy = () => {
this.onInterrupt.removeListener(interruptEvent, reject);
reject(null);
};
this.onInterrupt.once(interruptEvent, reject);
});
return {
destroy: destroy as unknown as () => void,
promise
};
}

public set(): void {
this.interrupted = true;
this.deferred.reject(new MongoshInterruptedError());
this.onInterrupt.emit(interruptEvent, new MongoshInterruptedError());
}

public reset(): void {
this.interrupted = false;
this.deferred = this.defer();
}

private defer(): { reject: (e: MongoshInterruptedError) => void; promise: Promise<never>; } {
const result: any = {};
result.promise = new Promise<never>((_, reject) => {
result.reject = reject;
});
result.promise.catch(() => {
// we ignore the error here - all others should be notified
// we just have to ensure there's at least one handler for it
// to prevent an UnhandledPromiseRejection
});
return result;
}
}

Expand Down