diff --git a/lib/storage/metadata/mongoclient/MongoClientInterface.js b/lib/storage/metadata/mongoclient/MongoClientInterface.js index 403f708ed..8bf6cf10c 100644 --- a/lib/storage/metadata/mongoclient/MongoClientInterface.js +++ b/lib/storage/metadata/mongoclient/MongoClientInterface.js @@ -1640,10 +1640,13 @@ class MongoClientInterface { internalListObject(bucketName, params, extension, vFormat, log, cb) { const c = this.getCollection(bucketName); const getLatestVersion = this.getLatestVersion; + const cbOnce = jsutil.once(cb); let stream; + let baseStream; if (!params.secondaryStreamParams) { // listing masters only (DelimiterMaster) stream = new MongoReadStream(c, params.mainStreamParams, params.mongifiedSearch); + baseStream = stream; if (vFormat === BUCKET_VERSIONS.v1) { /** * When listing masters only in v1 we can't just skip PHD @@ -1685,6 +1688,22 @@ class MongoClientInterface { }, }); stream = stream.pipe(resolvePhdKey); + // Propagate the 'end' event from resolvePhdKey to stream + // to properly cleanup resources. + resolvePhdKey.on('end', () => { + baseStream.emit('end'); + }); + baseStream.on('error', err => { + const logObj = { + rawError: err, + error: err.message, + errorStack: err.stack, + }; + log.error( + 'internalListObjectV1: error listing objects', logObj); + baseStream.destroy(); + return cbOnce(err); + }); } } else { // listing both master and version keys (delimiterVersion Algo) @@ -1699,7 +1718,6 @@ class MongoClientInterface { extension, gte: gteParams, }); - const cbOnce = jsutil.once(cb); skip.setListingEndCb(() => { stream.emit('end'); stream.destroy(); @@ -1736,10 +1754,14 @@ class MongoClientInterface { }; log.error( 'internalListObjectV1: error listing objects', logObj); + // call explicitly the destroy method to clean the mongodb cursor + stream.destroy(); cbOnce(err); }) .on('end', () => { const data = extension.result(); + // call explicitly the destroy method to clean the mongodb cursor + stream.destroy(); cbOnce(null, data); }); return undefined; diff --git a/package.json b/package.json index a58c587ca..ce46184df 100644 --- a/package.json +++ b/package.json @@ -3,7 +3,7 @@ "engines": { "node": ">=16" }, - "version": "8.1.112", + "version": "8.1.113", "description": "Common utilities for the S3 project components", "main": "build/index.js", "repository": { diff --git a/tests/functional/metadata/mongodb/listObject.spec.js b/tests/functional/metadata/mongodb/listObject.spec.js index 6dcaa3df1..676e82001 100644 --- a/tests/functional/metadata/mongodb/listObject.spec.js +++ b/tests/functional/metadata/mongodb/listObject.spec.js @@ -8,6 +8,8 @@ const MetadataWrapper = require('../../../../lib/storage/metadata/MetadataWrapper'); const { versioning } = require('../../../../index'); const { BucketVersioningKeyFormat } = versioning.VersioningConstants; +const sinon = require('sinon'); +const MongoReadStream = require('../../../../lib/storage/metadata/mongoclient/readStream'); const IMPL_NAME = 'mongodb'; const DB_NAME = 'metadata'; @@ -477,6 +479,50 @@ describe('MongoClientInterface::metadata.listObject', () => { }), ], done); }); + + it('Should properly destroy the MongoDBReadStream', done => { + // eslint-disable-next-line func-names + const destroyStub = sinon.stub(MongoReadStream.prototype, 'destroy').callsFake(function (...args) { + // You can add extra logic here if needed + MongoReadStream.prototype.destroy.wrappedMethod.apply(this, ...args); + }); + const params = { + listingType: 'DelimiterMaster', + maxKeys: 100, + }; + return metadata.listObject(BUCKET_NAME, params, logger, err => { + assert.ifError(err); + assert(destroyStub.called, 'Destroy should have been called on MongoReadStream'); + // Restore original destroy method + destroyStub.restore(); + return done(); + }); + }); + + + it('Should properly destroy the MongoDBReadStream on error', done => { + // eslint-disable-next-line func-names + const destroyStub = sinon.stub(MongoReadStream.prototype, 'destroy').callsFake(function (...args) { + // You can add extra logic here if needed + MongoReadStream.prototype.destroy.wrappedMethod.apply(this, ...args); + }); + // stub the cursor creation to emit an error + // eslint-disable-next-line func-names + const readStub = sinon.stub(MongoReadStream.prototype, '_read').callsFake(function () { + this.emit('error', new Error('error')); + }); + const params = { + listingType: 'DelimiterMaster', + maxKeys: 100, + }; + return metadata.listObject(BUCKET_NAME, params, logger, err => { + assert(err, 'Expected an error'); + assert(destroyStub.called, 'Destroy should have been called on MongoReadStream'); + destroyStub.restore(); + readStub.restore(); + return done(); + }); + }); }); }); });