diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index 39a2cc715..7c5a18611 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -12,10 +12,29 @@ import { import type { Streams } from './streams'; import type { MQLPipeline } from './mql-types'; +export type StreamProcessorData = Document & { name: string }; + @shellApiClassDefault export class StreamProcessor extends ShellApiWithMongoClass { - constructor(public _streams: Streams, public name: string) { + private _streams: Streams; + + public name: string; + + constructor(_streams: Streams, data: StreamProcessorData) { super(); + + this._streams = _streams; + this.name = data.name; + + // We may overwrite the name property but that should be fine + for (const [key, value] of Object.entries(data)) { + Object.defineProperty(this, key, { + value, + configurable: true, + enumerable: true, + writable: true, + }); + } } get _mongo(): Mongo { @@ -23,7 +42,19 @@ export class StreamProcessor extends ShellApiWithMongoClass { } [asPrintable]() { - return `Atlas Stream Processor: ${this.name}`; + const result: Document = {}; + const descriptors = Object.getOwnPropertyDescriptors(this); + for (const [key, value] of Object.entries(descriptors)) { + if (key.startsWith('_')) { + return; + } + + if (value.value && value.enumerable) { + result[key] = value.value; + } + } + + return result; } @returnsPromise @@ -142,7 +173,7 @@ export class StreamProcessor extends ShellApiWithMongoClass { try { await Promise.race([ this._instanceState.shellApi.sleep(1000), // wait 1 second - interruptable.promise, // unless interruppted + interruptable.promise, // unless interrupted ]); } finally { interruptable.destroy(); diff --git a/packages/shell-api/src/streams.spec.ts b/packages/shell-api/src/streams.spec.ts index 68995d790..ff015a171 100644 --- a/packages/shell-api/src/streams.spec.ts +++ b/packages/shell-api/src/streams.spec.ts @@ -6,6 +6,7 @@ import { Database } from './database'; import { Streams } from './streams'; import { InterruptFlag, MongoshInterruptedError } from './interruptor'; import type { MongoshInvalidInputError } from '@mongosh/errors'; +import { asPrintable } from './enums'; describe('Streams', function () { let mongo: Mongo; @@ -60,7 +61,7 @@ describe('Streams', function () { const pipeline = [{ $match: { foo: 'bar' } }]; const result = await streams.createStreamProcessor('spm', pipeline); - expect(result).to.eql(streams.getProcessor('spm')); + expect(result).to.eql(streams.getProcessor({ name: 'spm', pipeline })); const cmd = { createStreamProcessor: 'spm', pipeline }; expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; @@ -172,7 +173,7 @@ describe('Streams', function () { .resolves({ ok: 1 }); const pipeline = [{ $match: { foo: 'bar' } }]; const processor = await streams.createStreamProcessor(name, pipeline); - expect(processor).to.eql(streams.getProcessor(name)); + expect(processor).to.eql(streams.getProcessor({ name, pipeline })); const cmd = { createStreamProcessor: name, pipeline }; expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; return { runCmdStub, processor }; @@ -313,6 +314,104 @@ describe('Streams', function () { }); }); + describe('listStreamProcessors', function () { + it('passes filter parameter correctly', async function () { + const runCmdStub = sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1, streamProcessors: [] }); + + const complexFilter = { + name: { $regex: '^test' }, + state: { $in: ['STARTED', 'STOPPED'] }, + 'options.dlq.connectionName': 'atlas-sql', + }; + + await streams.listStreamProcessors(complexFilter); + + const cmd = { listStreamProcessors: 1, filter: complexFilter }; + expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + }); + + it('returns error when command fails', async function () { + const error = { ok: 0, errmsg: 'Command failed' }; + sinon.stub(mongo._serviceProvider, 'runCommand').resolves(error); + + const filter = { name: 'test' }; + const result = await streams.listStreamProcessors(filter); + expect(result).to.eql(error); + }); + + it('returns empty array when no processors exist', async function () { + const runCmdStub = sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1, streamProcessors: [] }); + + const filter = {}; + const result = await streams.listStreamProcessors(filter); + + expect(Array.isArray(result)).to.be.true; + expect(result.length).to.equal(0); + + const cmd = { listStreamProcessors: 1, filter }; + expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + }); + + it('ensures complete stream processor objects are defined in response', async function () { + const completeProcessor = { + id: '6916541aa9733d72cff41f27', + name: 'complete-processor', + pipeline: [ + { $match: { status: 'active' } }, + { $project: { _id: 1, name: 1, timestamp: 1 } }, + ], + state: 'STARTED', + tier: 'SP2', + errorMsg: '', + lastModified: new Date('2023-01-01T00:00:00Z'), + lastStateChange: new Date('2023-01-01T00:00:00Z'), + }; + + sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1, streamProcessors: [completeProcessor] }); + + const result = await streams.listStreamProcessors({}); + + // Verify the raw processor data is preserved in asPrintable + const rawProcessors = result[asPrintable](); + expect(rawProcessors).to.have.length(1); + + // deep comparison to ensure all fields are present an equal + expect(rawProcessors[0]).to.eql(completeProcessor); + }); + + it('ensures you can drill down into individual processor attributes', async function () { + const completeProcessor = { + id: '6916541aa9733d72cff41f27', + name: 'complete-processor', + pipeline: [ + { $match: { status: 'active' } }, + { $project: { _id: 1, name: 1, timestamp: 1 } }, + ], + state: 'STARTED', + tier: 'SP2', + errorMsg: '', + lastModified: new Date('2023-01-01T00:00:00Z'), + lastStateChange: new Date('2023-01-01T00:00:00Z'), + }; + + sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1, streamProcessors: [completeProcessor] }); + + const result = await streams.listStreamProcessors({}); + + // verify users can access properties on individual processors + expect(result[0].name).to.equal(completeProcessor.name); + expect(result[0].pipeline).to.eql(completeProcessor.pipeline); + }); + }); + describe('listWorkspaceDefaults', function () { it('returns tier and maxTierSize', async function () { const runCmdStub = sinon diff --git a/packages/shell-api/src/streams.ts b/packages/shell-api/src/streams.ts index f2c2047e2..a77083e62 100644 --- a/packages/shell-api/src/streams.ts +++ b/packages/shell-api/src/streams.ts @@ -6,6 +6,7 @@ import { shellApiClassDefault, ShellApiWithMongoClass, } from './decorators'; +import type { StreamProcessorData } from './stream-processor'; import StreamProcessor from './stream-processor'; import { ADMIN_DB, asPrintable, shellApiType } from './enums'; import type { Database, DatabaseWithSchema } from './database'; @@ -34,7 +35,7 @@ export class Streams< return v; } if (typeof prop === 'string' && !prop.startsWith('_')) { - return target.getProcessor(prop); + return target.getProcessor({ name: prop }); } }, }); @@ -55,8 +56,8 @@ export class Streams< return 'Atlas Stream Processing'; } - getProcessor(name: string): StreamProcessor { - return new StreamProcessor(this, name); + getProcessor(data: StreamProcessorData): StreamProcessor { + return new StreamProcessor(this, data); } @returnsPromise @@ -82,7 +83,7 @@ export class Streams< limit: number; cursorId: number; }; - const sp = this.getProcessor(name); + const sp = this.getProcessor({ name }); async function dropSp() { try { @@ -129,8 +130,7 @@ export class Streams< if (result.ok !== 1) { return result; } - - return this.getProcessor(name); + return this.getProcessor({ name, pipeline, ...options }); } @returnsPromise @@ -143,9 +143,15 @@ export class Streams< return result; } const rawProcessors = result.streamProcessors; - const sps = rawProcessors.map((sp: StreamProcessor) => - this.getProcessor(sp.name) - ); + const sps = rawProcessors + .map((sp: Document) => { + if (sp.name) { + return this.getProcessor(sp as StreamProcessorData); + } + + return undefined; + }) + .filter((sp: Document | undefined) => !!sp); return Object.defineProperties(sps, { [asPrintable]: { value: () => rawProcessors }, @@ -174,7 +180,7 @@ export class Streams< })) as WorkspaceDefaults; } - async _runStreamCommand(cmd: Document, options: Document = {}) { + _runStreamCommand(cmd: Document, options: Document = {}): Promise { return this._mongo._serviceProvider.runCommand(ADMIN_DB, cmd, options); // run cmd } }