From 39129c4892caafde52b96aba336afaefad67f369 Mon Sep 17 00:00:00 2001 From: Taylor Cannon Date: Thu, 13 Nov 2025 16:02:41 -0600 Subject: [PATCH 1/4] STREAMS-1136: Add ability to drill into stream processor attributes --- packages/shell-api/src/stream-processor.ts | 53 ++++++++++- packages/shell-api/src/streams.spec.ts | 103 ++++++++++++++++++++- packages/shell-api/src/streams.ts | 26 ++++-- 3 files changed, 167 insertions(+), 15 deletions(-) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index 39a2cc7150..41252c58ba 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -12,10 +12,30 @@ import { import type { Streams } from './streams'; import type { MQLPipeline } from './mql-types'; +export type StreamProcessorData = Document & { name: string }; + @shellApiClassDefault export class StreamProcessor extends ShellApiWithMongoClass { - constructor(public _streams: Streams, public name: string) { + public name: string; + public id?: string; + public pipeline?: MQLPipeline; + public state?: string; + public tier?: string; + public errorMsg?: string; + public lastModified?: Date; + public lastStateChange?: Date; + + constructor(public _streams: Streams, data: StreamProcessorData) { super(); + + this.name = data.name; + this.id = data.id; + this.pipeline = data.pipeline; + this.state = data.state; + this.tier = data.tier; + this.errorMsg = data.errorMsg; + this.lastModified = data.lastModified; + this.lastStateChange = data.lastStateChange; } get _mongo(): Mongo { @@ -23,7 +43,34 @@ export class StreamProcessor extends ShellApiWithMongoClass { } [asPrintable]() { - return `Atlas Stream Processor: ${this.name}`; + const result: Document = { + name: this.name, + }; + + // Only add keys onto the document if they have values + if (this.id) { + result.id = this.id; + } + if (this.pipeline) { + result.pipeline = this.pipeline; + } + if (this.state) { + result.state = this.state; + } + if (this.tier) { + result.tier = this.tier; + } + if (this.errorMsg) { + result.errorMsg = this.errorMsg; + } + if (this.lastModified) { + result.lastModified = this.lastModified; + } + if (this.lastStateChange) { + result.lastStateChange = this.lastStateChange; + } + + return result; } @returnsPromise @@ -142,7 +189,7 @@ export class StreamProcessor extends ShellApiWithMongoClass { try { await Promise.race([ this._instanceState.shellApi.sleep(1000), // wait 1 second - interruptable.promise, // unless interruppted + interruptable.promise, // unless interrupted ]); } finally { interruptable.destroy(); diff --git a/packages/shell-api/src/streams.spec.ts b/packages/shell-api/src/streams.spec.ts index 68995d790d..074007f68d 100644 --- a/packages/shell-api/src/streams.spec.ts +++ b/packages/shell-api/src/streams.spec.ts @@ -6,6 +6,7 @@ import { Database } from './database'; import { Streams } from './streams'; import { InterruptFlag, MongoshInterruptedError } from './interruptor'; import type { MongoshInvalidInputError } from '@mongosh/errors'; +import { asPrintable } from './enums'; describe('Streams', function () { let mongo: Mongo; @@ -60,7 +61,7 @@ describe('Streams', function () { const pipeline = [{ $match: { foo: 'bar' } }]; const result = await streams.createStreamProcessor('spm', pipeline); - expect(result).to.eql(streams.getProcessor('spm')); + expect(result).to.eql(streams.getProcessor({ name: 'spm', pipeline })); const cmd = { createStreamProcessor: 'spm', pipeline }; expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; @@ -172,7 +173,7 @@ describe('Streams', function () { .resolves({ ok: 1 }); const pipeline = [{ $match: { foo: 'bar' } }]; const processor = await streams.createStreamProcessor(name, pipeline); - expect(processor).to.eql(streams.getProcessor(name)); + expect(processor).to.eql(streams.getProcessor({ name, pipeline })); const cmd = { createStreamProcessor: name, pipeline }; expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; return { runCmdStub, processor }; @@ -313,6 +314,104 @@ describe('Streams', function () { }); }); + describe('listStreamProcessors', function () { + it('passes filter parameter correctly', async function () { + const runCmdStub = sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1, streamProcessors: [] }); + + const complexFilter = { + name: { $regex: '^test' }, + state: { $in: ['STARTED', 'STOPPED'] }, + 'options.dlq.connectionName': 'atlas-sql', + }; + + await streams.listStreamProcessors(complexFilter); + + const cmd = { listStreamProcessors: 1, filter: complexFilter }; + expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + }); + + it('returns error when command fails', async function () { + const error = { ok: 0, errmsg: 'Command failed' }; + sinon.stub(mongo._serviceProvider, 'runCommand').resolves(error); + + const filter = { name: 'test' }; + const result = await streams.listStreamProcessors(filter); + expect(result).to.eql(error); + }); + + it('returns empty array when no processors exist', async function () { + const runCmdStub = sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1, streamProcessors: [] }); + + const filter = {}; + const result = await streams.listStreamProcessors(filter); + + expect(Array.isArray(result)).to.be.true; + expect(result.length).to.equal(0); + + const cmd = { listStreamProcessors: 1, filter }; + expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true; + }); + + it('ensures complete stream processor objects are defined in response', async function () { + const completeProcessor = { + id: '6916541aa9733d72cff41f27', + name: 'complete-processor', + pipeline: [ + { $match: { status: 'active' } }, + { $project: { _id: 1, name: 1, timestamp: 1 } }, + ], + state: 'STARTED', + tier: 'SP2', + errorMsg: '', + lastModified: new Date('2023-01-01T00:00:00Z'), + lastStateChange: new Date('2023-01-01T00:00:00Z'), + }; + + sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1, streamProcessors: [completeProcessor] }); + + const result = await streams.listStreamProcessors({}); + + // Verify the raw processor data is preserved in asPrintable + const rawProcessors = result[asPrintable](); + expect(rawProcessors).to.have.length(1); + + // deep comparison to ensure all fields are present an equal + expect(rawProcessors[0]).to.eql(completeProcessor); + }); + + it('ensures you can drill down into individual processor attributes', async function () { + const completeProcessor = { + id: '6916541aa9733d72cff41f27', + name: 'complete-processor', + pipeline: [ + { $match: { status: 'active' } }, + { $project: { _id: 1, name: 1, timestamp: 1 } }, + ], + state: 'STARTED', + tier: 'SP2', + errorMsg: '', + lastModified: new Date('2023-01-01T00:00:00Z'), + lastStateChange: new Date('2023-01-01T00:00:00Z'), + }; + + sinon + .stub(mongo._serviceProvider, 'runCommand') + .resolves({ ok: 1, streamProcessors: [completeProcessor] }); + + const result = await streams.listStreamProcessors({}); + + // verify users an access properties on individual processors + expect(result[0].name).to.equal(completeProcessor.name); + expect(result[0].pipeline).to.eql(completeProcessor.pipeline); + }); + }); + describe('listWorkspaceDefaults', function () { it('returns tier and maxTierSize', async function () { const runCmdStub = sinon diff --git a/packages/shell-api/src/streams.ts b/packages/shell-api/src/streams.ts index f2c2047e2b..4afc40829e 100644 --- a/packages/shell-api/src/streams.ts +++ b/packages/shell-api/src/streams.ts @@ -6,6 +6,7 @@ import { shellApiClassDefault, ShellApiWithMongoClass, } from './decorators'; +import type { StreamProcessorData } from './stream-processor'; import StreamProcessor from './stream-processor'; import { ADMIN_DB, asPrintable, shellApiType } from './enums'; import type { Database, DatabaseWithSchema } from './database'; @@ -34,7 +35,7 @@ export class Streams< return v; } if (typeof prop === 'string' && !prop.startsWith('_')) { - return target.getProcessor(prop); + return target.getProcessor({ name: prop }); } }, }); @@ -55,8 +56,8 @@ export class Streams< return 'Atlas Stream Processing'; } - getProcessor(name: string): StreamProcessor { - return new StreamProcessor(this, name); + getProcessor(data: StreamProcessorData): StreamProcessor { + return new StreamProcessor(this, data); } @returnsPromise @@ -82,7 +83,7 @@ export class Streams< limit: number; cursorId: number; }; - const sp = this.getProcessor(name); + const sp = this.getProcessor({ name }); async function dropSp() { try { @@ -129,8 +130,7 @@ export class Streams< if (result.ok !== 1) { return result; } - - return this.getProcessor(name); + return this.getProcessor({ name, pipeline, ...options }); } @returnsPromise @@ -143,9 +143,15 @@ export class Streams< return result; } const rawProcessors = result.streamProcessors; - const sps = rawProcessors.map((sp: StreamProcessor) => - this.getProcessor(sp.name) - ); + const sps = rawProcessors + .map((sp: Document) => { + if (sp.name) { + return this.getProcessor(sp as StreamProcessorData); + } + + return; + }) + .filter((sp: Document | undefined) => !!sp); return Object.defineProperties(sps, { [asPrintable]: { value: () => rawProcessors }, @@ -174,7 +180,7 @@ export class Streams< })) as WorkspaceDefaults; } - async _runStreamCommand(cmd: Document, options: Document = {}) { + _runStreamCommand(cmd: Document, options: Document = {}): Promise { return this._mongo._serviceProvider.runCommand(ADMIN_DB, cmd, options); // run cmd } } From 5544ece9eecd8161fccdd77113129d4b3b349243 Mon Sep 17 00:00:00 2001 From: Taylor Aron Cannon Date: Fri, 14 Nov 2025 21:14:53 -0600 Subject: [PATCH 2/4] Code review feedback --- packages/shell-api/src/stream-processor.ts | 9 ++++++--- packages/shell-api/src/streams.spec.ts | 2 +- packages/shell-api/src/streams.ts | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index 41252c58ba..a722187079 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -60,9 +60,6 @@ export class StreamProcessor extends ShellApiWithMongoClass { if (this.tier) { result.tier = this.tier; } - if (this.errorMsg) { - result.errorMsg = this.errorMsg; - } if (this.lastModified) { result.lastModified = this.lastModified; } @@ -70,6 +67,12 @@ export class StreamProcessor extends ShellApiWithMongoClass { result.lastStateChange = this.lastStateChange; } + // Check explicitly for undefined so that empty string can be exposed + // back to the user + if (this.errorMsg !== undefined) { + result.errorMsg = this.errorMsg; + } + return result; } diff --git a/packages/shell-api/src/streams.spec.ts b/packages/shell-api/src/streams.spec.ts index 074007f68d..ff015a171f 100644 --- a/packages/shell-api/src/streams.spec.ts +++ b/packages/shell-api/src/streams.spec.ts @@ -406,7 +406,7 @@ describe('Streams', function () { const result = await streams.listStreamProcessors({}); - // verify users an access properties on individual processors + // verify users can access properties on individual processors expect(result[0].name).to.equal(completeProcessor.name); expect(result[0].pipeline).to.eql(completeProcessor.pipeline); }); diff --git a/packages/shell-api/src/streams.ts b/packages/shell-api/src/streams.ts index 4afc40829e..a77083e622 100644 --- a/packages/shell-api/src/streams.ts +++ b/packages/shell-api/src/streams.ts @@ -149,7 +149,7 @@ export class Streams< return this.getProcessor(sp as StreamProcessorData); } - return; + return undefined; }) .filter((sp: Document | undefined) => !!sp); From 4d5ec3fbf01b1a469c1f6d1bd94243f84105e813 Mon Sep 17 00:00:00 2001 From: Taylor Aron Cannon Date: Mon, 17 Nov 2025 11:36:52 -0600 Subject: [PATCH 3/4] Make the data structure more dynamic --- packages/shell-api/src/stream-processor.ts | 67 ++++++++-------------- 1 file changed, 23 insertions(+), 44 deletions(-) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index a722187079..62a08e641e 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -16,26 +16,23 @@ export type StreamProcessorData = Document & { name: string }; @shellApiClassDefault export class StreamProcessor extends ShellApiWithMongoClass { + private _streams: Streams; + public name: string; - public id?: string; - public pipeline?: MQLPipeline; - public state?: string; - public tier?: string; - public errorMsg?: string; - public lastModified?: Date; - public lastStateChange?: Date; - - constructor(public _streams: Streams, data: StreamProcessorData) { + + constructor(_streams: Streams, data: StreamProcessorData) { super(); + this._streams = _streams; this.name = data.name; - this.id = data.id; - this.pipeline = data.pipeline; - this.state = data.state; - this.tier = data.tier; - this.errorMsg = data.errorMsg; - this.lastModified = data.lastModified; - this.lastStateChange = data.lastStateChange; + + // We may overwrite the name property but that should be + // fine + Object.keys(data).forEach((key) => { + Object.defineProperty(this, key, { + value: data[key], + }); + }); } get _mongo(): Mongo { @@ -43,35 +40,17 @@ export class StreamProcessor extends ShellApiWithMongoClass { } [asPrintable]() { - const result: Document = { - name: this.name, - }; - - // Only add keys onto the document if they have values - if (this.id) { - result.id = this.id; - } - if (this.pipeline) { - result.pipeline = this.pipeline; - } - if (this.state) { - result.state = this.state; - } - if (this.tier) { - result.tier = this.tier; - } - if (this.lastModified) { - result.lastModified = this.lastModified; - } - if (this.lastStateChange) { - result.lastStateChange = this.lastStateChange; - } + const result: Document = {}; + const descriptors = Object.getOwnPropertyDescriptors(this); + Object.getOwnPropertyNames(descriptors).forEach((prop) => { + if (prop.startsWith('_')) { + return; + } - // Check explicitly for undefined so that empty string can be exposed - // back to the user - if (this.errorMsg !== undefined) { - result.errorMsg = this.errorMsg; - } + if (descriptors[prop].value) { + result[prop] = descriptors[prop].value; + } + }); return result; } From a556a8c9f0f6d5af5a7a5baa4c1805bd9e7bd460 Mon Sep 17 00:00:00 2001 From: Taylor Aron Cannon Date: Tue, 18 Nov 2025 12:37:12 -0600 Subject: [PATCH 4/4] code review feedback --- packages/shell-api/src/stream-processor.ts | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/packages/shell-api/src/stream-processor.ts b/packages/shell-api/src/stream-processor.ts index 62a08e641e..7c5a18611b 100644 --- a/packages/shell-api/src/stream-processor.ts +++ b/packages/shell-api/src/stream-processor.ts @@ -26,13 +26,15 @@ export class StreamProcessor extends ShellApiWithMongoClass { this._streams = _streams; this.name = data.name; - // We may overwrite the name property but that should be - // fine - Object.keys(data).forEach((key) => { + // We may overwrite the name property but that should be fine + for (const [key, value] of Object.entries(data)) { Object.defineProperty(this, key, { - value: data[key], + value, + configurable: true, + enumerable: true, + writable: true, }); - }); + } } get _mongo(): Mongo { @@ -42,15 +44,15 @@ export class StreamProcessor extends ShellApiWithMongoClass { [asPrintable]() { const result: Document = {}; const descriptors = Object.getOwnPropertyDescriptors(this); - Object.getOwnPropertyNames(descriptors).forEach((prop) => { - if (prop.startsWith('_')) { + for (const [key, value] of Object.entries(descriptors)) { + if (key.startsWith('_')) { return; } - if (descriptors[prop].value) { - result[prop] = descriptors[prop].value; + if (value.value && value.enumerable) { + result[key] = value.value; } - }); + } return result; }