Skip to content

Commit

Permalink
feat: introduce a class for tracking stream specific attributes
Browse files Browse the repository at this point in the history
Similar to the `ServerDescription` we now have a `StreamDescription`
which describes details specific to a single stream connected to a
MongoDB topology
  • Loading branch information
mbroadst committed Dec 30, 2019
1 parent 0cac125 commit f6bf82c
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 25 deletions.
34 changes: 10 additions & 24 deletions lib/cmap/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const MongoError = require('../core/error').MongoError;
const MongoNetworkError = require('../core/error').MongoNetworkError;
const MongoWriteConcernError = require('../core/error').MongoWriteConcernError;
const CommandResult = require('../core/connection/command_result');
const ServerDescription = require('../core/sdam/server_description').ServerDescription;
const StreamDescription = require('./stream_description').StreamDescription;
const wp = require('../core/wireprotocol');
const apm = require('../core/connection/apm');
const updateSessionFromResponse = require('../core/sessions').updateSessionFromResponse;
Expand Down Expand Up @@ -34,6 +34,7 @@ class Connection extends EventEmitter {
this.closed = false;
this.destroyed = false;

this[kDescription] = new StreamDescription(this.address, options);
this[kGeneration] = options.generation;
this[kLastUseTime] = Date.now();

Expand Down Expand Up @@ -69,12 +70,6 @@ class Connection extends EventEmitter {
// hook the message stream up to the passed in stream
stream.pipe(this[kMessageStream]);
this[kMessageStream].pipe(stream);

if (options.compression) {
this[kDescription] = { compression: options.compression };
} else {
this[kDescription] = undefined;
}
}

get description() {
Expand All @@ -87,20 +82,7 @@ class Connection extends EventEmitter {

// the `connect` method stores the result of the handshake ismaster on the connection
set ismaster(response) {
if (response.compression) {
const compression = this[kDescription].compression;
const compressors = compression.compressors;
response.compression = {
compressor: compressors.filter(c => response.compression.indexOf(c) !== -1)[0]
};

if (compression.zlibCompressionLevel) {
response.compression.zlibCompressionLevel = compression.zlibCompressionLevel;
}
}

// TODO: This should be using a `StreamDescription`
this[kDescription] = new ServerDescription(this.address, response);
this[kDescription].receiveResponse(response);

// TODO: remove this, and only use the `StreamDescription` in the future
this[kIsMaster] = response;
Expand Down Expand Up @@ -284,15 +266,19 @@ function write(command, options, callback) {
noResponse: typeof options.noResponse === 'boolean' ? options.noResponse : false,
documentsReturnedIn: options.documentsReturnedIn,

// For BSON parsing
// for BSON parsing
promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false,
raw: typeof options.raw === 'boolean' ? options.raw : false
};

if (this[kDescription] && this[kDescription].compression) {
operationDescription.agreedCompressor = this[kDescription].compression.compressor;
if (this[kDescription] && this[kDescription].compressor) {
operationDescription.agreedCompressor = this[kDescription].compressor;

if (this[kDescription].zlibCompressionLevel) {
operationDescription.zlibCompressionLevel = this[kDescription].zlibCompressionLevel;
}
}

if (typeof options.socketTimeout === 'number') {
Expand Down
45 changes: 45 additions & 0 deletions lib/cmap/stream_description.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
'use strict';
const parseServerType = require('../core/sdam/server_description').parseServerType;

const RESPONSE_FIELDS = [
'minWireVersion',
'maxWireVersion',
'maxBsonObjectSize',
'maxMessageSizeBytes',
'maxWriteBatchSize',
'__nodejs_mock_server__'
];

class StreamDescription {
constructor(address, options) {
this.address = address;
this.type = parseServerType(null);
this.minWireVersion = 1;
this.maxWireVersion = 2;
this.maxBsonObjectSize = 16777216;
this.maxMessageSizeBytes = 48000000;
this.maxWriteBatchSize = 100000;
this.compressors =
options && options.compression && Array.isArray(options.compression.compressors)
? options.compression.compressors
: [];
}

receiveResponse(response) {
this.type = parseServerType(response);

RESPONSE_FIELDS.forEach(field => {
if (typeof response[field] !== 'undefined') {
this[field] = response[field];
}
});

if (response.compression) {
this.compressor = this.compressors.filter(c => response.compression.indexOf(c) !== -1)[0];
}
}
}

module.exports = {
StreamDescription
};
3 changes: 2 additions & 1 deletion lib/core/sdam/server_description.js
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,6 @@ function parseServerType(ismaster) {
}

module.exports = {
ServerDescription
ServerDescription,
parseServerType
};

0 comments on commit f6bf82c

Please sign in to comment.