Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions packages/shell-api/src/stream-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,49 @@ import {
import type { Streams } from './streams';
import type { MQLPipeline } from './mql-types';

export type StreamProcessorData = Document & { name: string };

@shellApiClassDefault
export class StreamProcessor extends ShellApiWithMongoClass {
constructor(public _streams: Streams, public name: string) {
private _streams: Streams;

public name: string;

constructor(_streams: Streams, data: StreamProcessorData) {
super();

this._streams = _streams;
this.name = data.name;

// We may overwrite the name property but that should be fine
for (const [key, value] of Object.entries(data)) {
Object.defineProperty(this, key, {
value,
configurable: true,
enumerable: true,
writable: true,
});
}
}

get _mongo(): Mongo {
return this._streams._mongo;
}

[asPrintable]() {
return `Atlas Stream Processor: ${this.name}`;
const result: Document = {};
const descriptors = Object.getOwnPropertyDescriptors(this);
for (const [key, value] of Object.entries(descriptors)) {
if (key.startsWith('_')) {
return;
}

if (value.value && value.enumerable) {
result[key] = value.value;
}
}

return result;
}

@returnsPromise
Expand Down Expand Up @@ -142,7 +173,7 @@ export class StreamProcessor extends ShellApiWithMongoClass {
try {
await Promise.race([
this._instanceState.shellApi.sleep(1000), // wait 1 second
interruptable.promise, // unless interruppted
interruptable.promise, // unless interrupted
]);
} finally {
interruptable.destroy();
Expand Down
103 changes: 101 additions & 2 deletions packages/shell-api/src/streams.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Database } from './database';
import { Streams } from './streams';
import { InterruptFlag, MongoshInterruptedError } from './interruptor';
import type { MongoshInvalidInputError } from '@mongosh/errors';
import { asPrintable } from './enums';

describe('Streams', function () {
let mongo: Mongo;
Expand Down Expand Up @@ -60,7 +61,7 @@ describe('Streams', function () {

const pipeline = [{ $match: { foo: 'bar' } }];
const result = await streams.createStreamProcessor('spm', pipeline);
expect(result).to.eql(streams.getProcessor('spm'));
expect(result).to.eql(streams.getProcessor({ name: 'spm', pipeline }));

const cmd = { createStreamProcessor: 'spm', pipeline };
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;
Expand Down Expand Up @@ -172,7 +173,7 @@ describe('Streams', function () {
.resolves({ ok: 1 });
const pipeline = [{ $match: { foo: 'bar' } }];
const processor = await streams.createStreamProcessor(name, pipeline);
expect(processor).to.eql(streams.getProcessor(name));
expect(processor).to.eql(streams.getProcessor({ name, pipeline }));
const cmd = { createStreamProcessor: name, pipeline };
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;
return { runCmdStub, processor };
Expand Down Expand Up @@ -313,6 +314,104 @@ describe('Streams', function () {
});
});

describe('listStreamProcessors', function () {
it('passes filter parameter correctly', async function () {
const runCmdStub = sinon
.stub(mongo._serviceProvider, 'runCommand')
.resolves({ ok: 1, streamProcessors: [] });

const complexFilter = {
name: { $regex: '^test' },
state: { $in: ['STARTED', 'STOPPED'] },
'options.dlq.connectionName': 'atlas-sql',
};

await streams.listStreamProcessors(complexFilter);

const cmd = { listStreamProcessors: 1, filter: complexFilter };
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;
});

it('returns error when command fails', async function () {
const error = { ok: 0, errmsg: 'Command failed' };
sinon.stub(mongo._serviceProvider, 'runCommand').resolves(error);

const filter = { name: 'test' };
const result = await streams.listStreamProcessors(filter);
expect(result).to.eql(error);
});

it('returns empty array when no processors exist', async function () {
const runCmdStub = sinon
.stub(mongo._serviceProvider, 'runCommand')
.resolves({ ok: 1, streamProcessors: [] });

const filter = {};
const result = await streams.listStreamProcessors(filter);

expect(Array.isArray(result)).to.be.true;
expect(result.length).to.equal(0);

const cmd = { listStreamProcessors: 1, filter };
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;
});

it('ensures complete stream processor objects are defined in response', async function () {
const completeProcessor = {
id: '6916541aa9733d72cff41f27',
name: 'complete-processor',
pipeline: [
{ $match: { status: 'active' } },
{ $project: { _id: 1, name: 1, timestamp: 1 } },
],
state: 'STARTED',
tier: 'SP2',
errorMsg: '',
lastModified: new Date('2023-01-01T00:00:00Z'),
lastStateChange: new Date('2023-01-01T00:00:00Z'),
};

sinon
.stub(mongo._serviceProvider, 'runCommand')
.resolves({ ok: 1, streamProcessors: [completeProcessor] });

const result = await streams.listStreamProcessors({});

// Verify the raw processor data is preserved in asPrintable
const rawProcessors = result[asPrintable]();
expect(rawProcessors).to.have.length(1);

// deep comparison to ensure all fields are present an equal
expect(rawProcessors[0]).to.eql(completeProcessor);
});

it('ensures you can drill down into individual processor attributes', async function () {
const completeProcessor = {
id: '6916541aa9733d72cff41f27',
name: 'complete-processor',
pipeline: [
{ $match: { status: 'active' } },
{ $project: { _id: 1, name: 1, timestamp: 1 } },
],
state: 'STARTED',
tier: 'SP2',
errorMsg: '',
lastModified: new Date('2023-01-01T00:00:00Z'),
lastStateChange: new Date('2023-01-01T00:00:00Z'),
};

sinon
.stub(mongo._serviceProvider, 'runCommand')
.resolves({ ok: 1, streamProcessors: [completeProcessor] });

const result = await streams.listStreamProcessors({});

// verify users can access properties on individual processors
expect(result[0].name).to.equal(completeProcessor.name);
expect(result[0].pipeline).to.eql(completeProcessor.pipeline);
});
});

describe('listWorkspaceDefaults', function () {
it('returns tier and maxTierSize', async function () {
const runCmdStub = sinon
Expand Down
26 changes: 16 additions & 10 deletions packages/shell-api/src/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
shellApiClassDefault,
ShellApiWithMongoClass,
} from './decorators';
import type { StreamProcessorData } from './stream-processor';
import StreamProcessor from './stream-processor';
import { ADMIN_DB, asPrintable, shellApiType } from './enums';
import type { Database, DatabaseWithSchema } from './database';
Expand Down Expand Up @@ -34,7 +35,7 @@ export class Streams<
return v;
}
if (typeof prop === 'string' && !prop.startsWith('_')) {
return target.getProcessor(prop);
return target.getProcessor({ name: prop });
}
},
});
Expand All @@ -55,8 +56,8 @@ export class Streams<
return 'Atlas Stream Processing';
}

getProcessor(name: string): StreamProcessor {
return new StreamProcessor(this, name);
getProcessor(data: StreamProcessorData): StreamProcessor {
return new StreamProcessor(this, data);
}

@returnsPromise
Expand All @@ -82,7 +83,7 @@ export class Streams<
limit: number;
cursorId: number;
};
const sp = this.getProcessor(name);
const sp = this.getProcessor({ name });

async function dropSp() {
try {
Expand Down Expand Up @@ -129,8 +130,7 @@ export class Streams<
if (result.ok !== 1) {
return result;
}

return this.getProcessor(name);
return this.getProcessor({ name, pipeline, ...options });
}

@returnsPromise
Expand All @@ -143,9 +143,15 @@ export class Streams<
return result;
}
const rawProcessors = result.streamProcessors;
const sps = rawProcessors.map((sp: StreamProcessor) =>
this.getProcessor(sp.name)
);
const sps = rawProcessors
.map((sp: Document) => {
if (sp.name) {
return this.getProcessor(sp as StreamProcessorData);
}

return undefined;
})
.filter((sp: Document | undefined) => !!sp);

return Object.defineProperties(sps, {
[asPrintable]: { value: () => rawProcessors },
Expand Down Expand Up @@ -174,7 +180,7 @@ export class Streams<
})) as WorkspaceDefaults;
}

async _runStreamCommand(cmd: Document, options: Document = {}) {
_runStreamCommand(cmd: Document, options: Document = {}): Promise<Document> {
return this._mongo._serviceProvider.runCommand(ADMIN_DB, cmd, options); // run cmd
}
}
Loading