From f74918b45301b1ddd3ba8508cc56d74754a14c3c Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Fri, 22 Nov 2024 03:20:37 +0000 Subject: [PATCH 1/3] feat(shell-api): add options in stream processor start, stop, and drop MONGOSH-1920 --- packages/shell-api/src/stream-processor.ts | 13 +++-- packages/shell-api/src/streams.spec.ts | 55 +++++++++++++++++++++- 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index a70b9c8c04..789fc9dd74 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -26,27 +26,30 @@ export default class StreamProcessor extends ShellApiWithMongoClass { } @returnsPromise - async start() { + async start(options: Document = {}) { return await this._streams._runStreamCommand({ startStreamProcessor: this.name, + ...options, }); } @returnsPromise - async stop() { + async stop(options: Document = {}) { return await this._streams._runStreamCommand({ stopStreamProcessor: this.name, + ...options, }); } @returnsPromise - async drop() { - return this._drop(); + async drop(options: Document = {}) { + return this._drop(options); } - async _drop() { + async _drop(options: Document = {}) { return await this._streams._runStreamCommand({ dropStreamProcessor: this.name, + ...options, }); } diff --git a/packages/shell-api/src/streams.spec.ts b/packages/shell-api/src/streams.spec.ts index 135db78566..488930ad58 100644 --- a/packages/shell-api/src/streams.spec.ts +++ b/packages/shell-api/src/streams.spec.ts @@ -7,7 +7,7 @@ import { Streams } from './streams'; import { InterruptFlag, MongoshInterruptedError } from './interruptor'; import type { MongoshInvalidInputError } from '@mongosh/errors'; -describe('Streams', function () { +describe.only('Streams', function () { let mongo: Mongo; let streams: Streams; const identity = (a: unknown) => a; @@ -164,6 +164,59 @@ describe('Streams', function () { }); }); + // Validate supplying options in start,stop, and drop commands. + describe('options', function () { + it('supplies options in start, stop, and drop', async function () { + // Create the stream processor. + const runCmdStub = sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1 }); + const name = 'optionsTest'; + const pipeline = [{ $match: { foo: 'bar' } }]; + const processor = await streams.createStreamProcessor(name, pipeline); + expect(processor).to.eql(streams.getProcessor(name)); + const cmd = { createStreamProcessor: name, pipeline }; + expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + + // Start the stream processor with an extra option. + await processor.start({ resumeFromCheckpoint: false }); + expect( + runCmdStub.calledWithExactly( + 'admin', + { startStreamProcessor: name, resumeFromCheckpoint: false }, + {} + ) + ).to.be.true; + + // Stop the stream processor with an extra option. + await processor.stop({ force: true }); + expect( + runCmdStub.calledWithExactly( + 'admin', + { stopStreamProcessor: name, force: true }, + {} + ) + ).to.be.true; + + // Drop the stream processor with a few extra options. + const opts = { + force: true, + ttl: { unit: 'day', size: 30 }, + }; + await processor.drop(opts); + expect( + runCmdStub.calledWithExactly( + 'admin', + { + dropStreamProcessor: name, + ...opts, + }, + {} + ) + ).to.be.true; + }); + }); + describe('modify', function () { it('throws with invalid parameters', async function () { // Create the stream processor. From a2861683dae17f44bbce4ad2cb6d0ec7bebb5d76 Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Fri, 22 Nov 2024 03:26:43 +0000 Subject: [PATCH 2/3] remove .only() for local testing --- packages/shell-api/src/streams.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/shell-api/src/streams.spec.ts b/packages/shell-api/src/streams.spec.ts index 488930ad58..171fc3c138 100644 --- a/packages/shell-api/src/streams.spec.ts +++ b/packages/shell-api/src/streams.spec.ts @@ -7,7 +7,7 @@ import { Streams } from './streams'; import { InterruptFlag, MongoshInterruptedError } from './interruptor'; import type { MongoshInvalidInputError } from '@mongosh/errors'; -describe.only('Streams', function () { +describe('Streams', function () { let mongo: Mongo; let streams: Streams; const identity = (a: unknown) => a; From d166bc910480eb589a4a66ade08f904d13964874 Mon Sep 17 00:00:00 2001 From: Matthew Normyle Date: Fri, 22 Nov 2024 03:33:29 +0000 Subject: [PATCH 3/3] add test utility for creating processor --- packages/shell-api/src/streams.spec.ts | 49 ++++++++++---------------- 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/packages/shell-api/src/streams.spec.ts b/packages/shell-api/src/streams.spec.ts index 171fc3c138..8a26b6d71b 100644 --- a/packages/shell-api/src/streams.spec.ts +++ b/packages/shell-api/src/streams.spec.ts @@ -164,19 +164,24 @@ describe('Streams', function () { }); }); + // Create a stream processor. + const createProcessor = async (name: string) => { + const runCmdStub = sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1 }); + const pipeline = [{ $match: { foo: 'bar' } }]; + const processor = await streams.createStreamProcessor(name, pipeline); + expect(processor).to.eql(streams.getProcessor(name)); + const cmd = { createStreamProcessor: name, pipeline }; + expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + return { runCmdStub, processor }; + }; + // Validate supplying options in start,stop, and drop commands. describe('options', function () { it('supplies options in start, stop, and drop', async function () { - // Create the stream processor. - const runCmdStub = sinon - .stub(mongo._serviceProvider, 'runCommand') - .resolves({ ok: 1 }); - const name = 'optionsTest'; - const pipeline = [{ $match: { foo: 'bar' } }]; - const processor = await streams.createStreamProcessor(name, pipeline); - expect(processor).to.eql(streams.getProcessor(name)); - const cmd = { createStreamProcessor: name, pipeline }; - expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + const name = 'testOptions'; + const { runCmdStub, processor } = await createProcessor(name); // Start the stream processor with an extra option. await processor.start({ resumeFromCheckpoint: false }); @@ -219,16 +224,7 @@ describe('Streams', function () { describe('modify', function () { it('throws with invalid parameters', async function () { - // Create the stream processor. - const runCmdStub = sinon - .stub(mongo._serviceProvider, 'runCommand') - .resolves({ ok: 1 }); - const name = 'p1'; - const pipeline = [{ $match: { foo: 'bar' } }]; - const processor = await streams.createStreamProcessor(name, pipeline); - expect(processor).to.eql(streams.getProcessor(name)); - const cmd = { createStreamProcessor: name, pipeline }; - expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + const { processor } = await createProcessor('testModify'); // No arguments to modify. const caught = await processor @@ -259,17 +255,8 @@ describe('Streams', function () { }); it('works with pipeline and options arguments', async function () { - const runCmdStub = sinon - .stub(mongo._serviceProvider, 'runCommand') - .resolves({ ok: 1 }); - - // Create the stream processor. - const name = 'p1'; - const pipeline = [{ $match: { foo: 'bar' } }]; - const processor = await streams.createStreamProcessor(name, pipeline); - expect(processor).to.eql(streams.getProcessor(name)); - const cmd = { createStreamProcessor: name, pipeline }; - expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + const name = 'testModify'; + const { runCmdStub, processor } = await createProcessor(name); // Start the stream processor. await processor.start();