Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { Readable } from 'stream';
import type { Binary, Document, Timestamp } from './bson';
import { Collection } from './collection';
import { CHANGE, CLOSE, END, ERROR, INIT, MORE, RESPONSE, RESUME_TOKEN_CHANGED } from './constants';
import { type CursorStreamOptions, CursorTimeoutContext } from './cursor/abstract_cursor';
import { CursorTimeoutContext } from './cursor/abstract_cursor';
import { ChangeStreamCursor, type ChangeStreamCursorOptions } from './cursor/change_stream_cursor';
import { Db } from './db';
import {
Expand Down Expand Up @@ -598,7 +598,6 @@ export class ChangeStream<
type: symbol;
/** @internal */
private cursor: ChangeStreamCursor<TSchema, TChange>;
streamOptions?: CursorStreamOptions;
/** @internal */
private cursorStream?: Readable & AsyncIterable<TChange>;
/** @internal */
Expand Down Expand Up @@ -866,13 +865,12 @@ export class ChangeStream<
*
* @throws MongoChangeStreamError if the underlying cursor or the change stream is closed
*/
stream(options?: CursorStreamOptions): Readable & AsyncIterable<TChange> {
stream(): Readable & AsyncIterable<TChange> {
if (this.closed) {
throw new MongoChangeStreamError(CHANGESTREAM_CLOSED_ERROR);
}

this.streamOptions = options;
return this.cursor.stream(options);
return this.cursor.stream();
}

/** @internal */
Expand Down
35 changes: 2 additions & 33 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Readable, Transform } from 'stream';
import { Readable } from 'stream';

import { type BSONSerializeOptions, type Document, Long, pluckBSONSerializeOptions } from '../bson';
import { type OnDemandDocumentDeserializeOptions } from '../cmap/wire_protocol/on_demand/document';
Expand Down Expand Up @@ -59,12 +59,6 @@ export const CURSOR_FLAGS = [
'partial'
] as const;

/** @public */
export interface CursorStreamOptions {
/** A transformation method applied to each document emitted by the stream */
transform?(this: void, doc: Document): Document;
}

/** @public */
export type CursorFlag = (typeof CURSOR_FLAGS)[number];

Expand Down Expand Up @@ -523,7 +517,7 @@ export abstract class AbstractCursor<
}
}

stream(options?: CursorStreamOptions): Readable & AsyncIterable<TSchema> {
stream(): Readable & AsyncIterable<TSchema> {
const readable = new ReadableCursorStream(this);
const abortListener = addAbortListener(this.signal, function () {
readable.destroy(this.reason);
Expand All @@ -532,31 +526,6 @@ export abstract class AbstractCursor<
abortListener?.[kDispose]();
});

if (options?.transform) {
const transform = options.transform;

const transformedStream = readable.pipe(
new Transform({
objectMode: true,
highWaterMark: 1,
transform(chunk, _, callback) {
try {
const transformed = transform(chunk);
callback(undefined, transformed);
} catch (err) {
callback(err);
}
}
})
);

// Bubble errors to transformed stream, because otherwise no way
// to handle this error.
readable.on('error', err => transformedStream.emit('error', err));

return transformedStream;
}

return readable;
}

Expand Down
3 changes: 1 addition & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,7 @@ export type {
export type {
AbstractCursorEvents,
AbstractCursorOptions,
CursorFlag,
CursorStreamOptions
CursorFlag
} from './cursor/abstract_cursor';
export type {
CursorTimeoutContext,
Expand Down
3 changes: 2 additions & 1 deletion test/integration/change-streams/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,8 @@ describe('Change Streams', function () {

const transform = doc => ({ doc: JSON.stringify(doc) });
changeStream
.stream({ transform })
.stream()
.map(transform)
.on('error', () => null)
.pipe(outStream)
.on('error', () => null);
Expand Down
7 changes: 2 additions & 5 deletions test/integration/crud/misc_cursors.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1823,9 +1823,7 @@ describe('Cursor', function () {

const filename = path.join(os.tmpdir(), '_nodemongodbnative_stream_out.txt');
const out = fs.createWriteStream(filename);
const stream = collection.find().stream({
transform: doc => JSON.stringify(doc)
});
const stream = collection.find().stream().map(JSON.stringify);

stream.pipe(out);
// Wait for output stream to close
Expand Down Expand Up @@ -3746,14 +3744,13 @@ describe('Cursor', function () {
{ _id: 2, a: { b: 1, c: 0 } }
];
const resultSet = new Set();
const transformParam = transformFunc != null ? { transform: transformFunc } : null;
Promise.resolve()
.then(() => db.createCollection(collectionName))
.then(() => (collection = db.collection(collectionName)))
.then(() => collection.insertMany(docs))
.then(() => {
cursor = collection.find();
return cursor.stream(transformParam);
return cursor.stream().map(transformFunc ?? (doc => doc));
})
.then(stream => {
stream.on('data', function (doc) {
Expand Down
38 changes: 0 additions & 38 deletions test/integration/node-specific/abstract_cursor.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { expect } from 'chai';
import { once } from 'events';
import * as sinon from 'sinon';
import { Transform } from 'stream';
import { inspect } from 'util';

import {
Expand All @@ -16,7 +15,6 @@ import {
type MongoClient,
MongoCursorExhaustedError,
MongoOperationTimeoutError,
MongoServerError,
TimeoutContext
} from '../../mongodb';
import { clearFailPoint, configureFailPoint } from '../../tools/utils';
Expand Down Expand Up @@ -317,42 +315,6 @@ describe('class AbstractCursor', function () {
});
});

describe('transform stream error handling', function () {
let client: MongoClient;
let collection: Collection;
const docs = [{ count: 0 }];

beforeEach(async function () {
client = this.configuration.newClient();

collection = client.db('abstract_cursor_integration').collection('test');

await collection.insertMany(docs);
});

afterEach(async function () {
await collection.deleteMany({});
await client.close();
});

it('propagates errors to transform stream', async function () {
const transform = new Transform({
transform(data, encoding, callback) {
callback(null, data);
}
});

// MongoServerError: unknown operator: $bar
const stream = collection.find({ foo: { $bar: 25 } }).stream({ transform });

const error: Error | null = await new Promise(resolve => {
stream.on('error', error => resolve(error));
stream.on('end', () => resolve(null));
});
expect(error).to.be.instanceof(MongoServerError);
});
});

describe('cursor end state', function () {
let client: MongoClient;
let cursor: FindCursor;
Expand Down