Skip to content

Commit

Permalink
fix(NODE-3272)!: emit correct event type when SRV Polling (#2825)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken authored Jun 3, 2021
1 parent 8eb0081 commit 579119f
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 63 deletions.
5 changes: 4 additions & 1 deletion src/sdam/srv_polling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ export class SrvPoller extends TypedEventEmitter<SrvPollerEvents> {
generation: number;
_timeout?: NodeJS.Timeout;

/** @event */
static readonly SRV_RECORD_DISCOVERY = 'srvRecordDiscovery' as const;

constructor(options: SrvPollerOptions) {
super();

Expand Down Expand Up @@ -110,7 +113,7 @@ export class SrvPoller extends TypedEventEmitter<SrvPollerEvents> {
success(srvRecords: dns.SrvRecord[]): void {
this.haMode = false;
this.schedule();
this.emit('srvRecordDiscovery', new SrvPollingEvent(srvRecords));
this.emit(SrvPoller.SRV_RECORD_DISCOVERY, new SrvPollingEvent(srvRecords));
}

failure(message: string, obj?: NodeJS.ErrnoException): void {
Expand Down
100 changes: 43 additions & 57 deletions src/sdam/topology.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ export interface TopologyPrivate {

/** related to srv polling */
srvPoller?: SrvPoller;
detectTopologyDescriptionChange?: (event: TopologyDescriptionChangedEvent) => void;
handleSrvPolling?: (event: SrvPollingEvent) => void;
detectShardedTopology: (event: TopologyDescriptionChangedEvent) => void;
detectSrvRecords: (event: SrvPollingEvent) => void;
}

/** @public */
Expand Down Expand Up @@ -319,36 +319,57 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
clusterTime: undefined,

// timer management
connectionTimers: new Set<NodeJS.Timeout>()
connectionTimers: new Set<NodeJS.Timeout>(),

detectShardedTopology: ev => this.detectShardedTopology(ev),
detectSrvRecords: ev => this.detectSrvRecords(ev)
};

if (options.srvHost) {
this.s.srvPoller =
options.srvPoller ||
options.srvPoller ??
new SrvPoller({
heartbeatFrequencyMS: this.s.heartbeatFrequencyMS,
srvHost: options.srvHost
});

this.s.detectTopologyDescriptionChange = (ev: TopologyDescriptionChangedEvent) => {
const previousType = ev.previousDescription.type;
const newType = ev.newDescription.type;
this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
}
}

if (previousType !== TopologyType.Sharded && newType === TopologyType.Sharded) {
this.s.handleSrvPolling = srvPollingHandler(this);
if (this.s.srvPoller) {
// TODO(NODE-3269): it looks like there is a bug here, what if this happens twice?
this.s.srvPoller.on('srvRecordDiscovery', this.s.handleSrvPolling);
this.s.srvPoller.start();
}
}
};
private detectShardedTopology(event: TopologyDescriptionChangedEvent) {
const previousType = event.previousDescription.type;
const newType = event.newDescription.type;

const transitionToSharded =
previousType !== TopologyType.Sharded && newType === TopologyType.Sharded;
const srvListeners = this.s.srvPoller?.listeners(SrvPoller.SRV_RECORD_DISCOVERY);
const listeningToSrvPolling = !!srvListeners?.includes(this.s.detectSrvRecords);

if (transitionToSharded && !listeningToSrvPolling) {
this.s.srvPoller?.on(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
this.s.srvPoller?.start();
}
}

this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectTopologyDescriptionChange);
private detectSrvRecords(ev: SrvPollingEvent) {
const previousTopologyDescription = this.s.description;
this.s.description = this.s.description.updateFromSrvPollingEvent(ev);
if (this.s.description === previousTopologyDescription) {
// Nothing changed, so return
return;
}

// NOTE: remove this when NODE-1709 is resolved
this.setMaxListeners(Infinity);
updateServers(this);

this.emit(
Topology.TOPOLOGY_DESCRIPTION_CHANGED,
new TopologyDescriptionChangedEvent(
this.s.id,
previousTopologyDescription,
this.s.description
)
);
}

/**
Expand Down Expand Up @@ -456,20 +477,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {

if (this.s.srvPoller) {
this.s.srvPoller.stop();
if (this.s.handleSrvPolling) {
this.s.srvPoller.removeListener('srvRecordDiscovery', this.s.handleSrvPolling);
delete this.s.handleSrvPolling;
}
this.s.srvPoller.removeListener(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
}

if (this.s.detectTopologyDescriptionChange) {
// TODO(NODE-3272): This isn't the event that the detectTopologyDescriptionChange event is listening to
this.removeListener(
Topology.SERVER_DESCRIPTION_CHANGED,
this.s.detectTopologyDescriptionChange
);
delete this.s.detectTopologyDescriptionChange;
}
this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);

for (const session of this.s.sessions) {
session.endSession();
Expand All @@ -478,7 +489,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
this.s.sessionPool.endAllPooledSessions(() => {
eachAsync(
Array.from(this.s.servers.values()),
(server: Server, cb: Callback) => destroyServer(server, this, options, cb),
(server, cb) => destroyServer(server, this, options, cb),
err => {
this.s.servers.clear();

Expand Down Expand Up @@ -924,31 +935,6 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes
}
}

function srvPollingHandler(topology: Topology) {
return function handleSrvPolling(ev: SrvPollingEvent) {
const previousTopologyDescription = topology.s.description;
topology.s.description = topology.s.description.updateFromSrvPollingEvent(ev);
if (topology.s.description === previousTopologyDescription) {
// Nothing changed, so return
return;
}

updateServers(topology);

topology.emit(
Topology.SERVER_DESCRIPTION_CHANGED,
// TODO(NODE-3272): This server description changed event is emitting a TopologyDescriptionChangeEvent
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-expect-error
new TopologyDescriptionChangedEvent(
topology.s.id,
previousTopologyDescription,
topology.s.description
)
);
};
}

function drainWaitQueue(queue: Denque<ServerSelectionRequest>, err?: AnyError) {
while (queue.length) {
const waitQueueMember = queue.shift();
Expand Down
2 changes: 0 additions & 2 deletions src/sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,7 @@ class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
hasEnded: boolean;
clientOptions?: MongoOptions;
supports: { causalConsistency: boolean };
/** @internal */
clusterTime?: ClusterTime;
/** @internal */
operationTime?: Timestamp;
explicit: boolean;
/** @internal */
Expand Down
10 changes: 9 additions & 1 deletion test/functional/apm.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ describe('APM', function () {
}
});

it('should correctly receive the APM events for a listIndexes command', {
// NODE-3308
it.skip('should correctly receive the APM events for a listIndexes command', {
metadata: { requires: { topology: ['replicaset'], mongodb: '>=3.0.0' } },

test: function () {
Expand Down Expand Up @@ -908,6 +909,13 @@ describe('APM', function () {
it(test.description, {
metadata: { requires: requirements },
test: function () {
if (
test.description ===
'A successful find event with a getmore and the server kills the cursor'
) {
this.skip();
}

const client = this.configuration.newClient({}, { monitorCommands: true });
return client.connect().then(client => {
expect(client).to.exist;
Expand Down
3 changes: 2 additions & 1 deletion test/functional/unified-spec-runner/unified-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const SKIPPED_TESTS = [
// These two tests need to run against multiple mongoses
'Dirty explicit session is discarded',
// Will be implemented as part of NODE-2034
'Client side error in command starting transaction'
'Client side error in command starting transaction',
'A successful find event with a getmore and the server kills the cursor' // NODE-3308
];

describe('Unified test format runner', function unifiedTestRunner() {
Expand Down
15 changes: 14 additions & 1 deletion test/tools/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -246,12 +246,25 @@ class EventCollector {
}
}

function getSymbolFrom(target, symbolName, assertExists = true) {
const symbol = Object.getOwnPropertySymbols(target).filter(
s => s.toString() === `Symbol(${symbolName})`
)[0];

if (assertExists && !symbol) {
throw new Error(`Did not find Symbol(${symbolName}) on ${target}`);
}

return symbol;
}

module.exports = {
EventCollector,
makeTestFunction,
ensureCalledWith,
ClassWithLogger,
ClassWithoutLogger,
ClassWithUndefinedLogger,
visualizeMonitoringEvents
visualizeMonitoringEvents,
getSymbolFrom
};
111 changes: 111 additions & 0 deletions test/unit/sdam/topology.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ const { Topology } = require('../../../src/sdam/topology');
const { Server } = require('../../../src/sdam/server');
const { ServerDescription } = require('../../../src/sdam/server_description');
const { ns, makeClientMetadata } = require('../../../src/utils');
const { TopologyDescriptionChangedEvent } = require('../../../src/sdam/events');
const { TopologyDescription } = require('../../../src/sdam/topology_description');
const { TopologyType } = require('../../../src/sdam/common');
const { SrvPoller, SrvPollingEvent } = require('../../../src/sdam/srv_polling');
const { getSymbolFrom } = require('../../tools/utils');

describe('Topology (unit)', function () {
describe('client metadata', function () {
Expand Down Expand Up @@ -314,5 +319,111 @@ describe('Topology (unit)', function () {
});
return p;
});

describe('srv event listeners', function () {
/** @type {Topology} */
let topology;

beforeEach(() => {
topology = new Topology('', { srvHost: 'fakeHost' });

expect(topology.s.detectSrvRecords).to.be.a('function');
expect(topology.s.detectShardedTopology).to.be.a('function');
});

afterEach(() => {
// The srv event starts a monitor that we need to clean up
for (const [, server] of topology.s.servers) {
const kMonitor = getSymbolFrom(server, 'monitor');
const kMonitorId = getSymbolFrom(server[kMonitor], 'monitorId');
server[kMonitor][kMonitorId].stop();
}
});

function transitionTopology(topology, from, to) {
topology.emit(
Topology.TOPOLOGY_DESCRIPTION_CHANGED,
new TopologyDescriptionChangedEvent(
2,
new TopologyDescription(from),
new TopologyDescription(to)
)
);
// We don't want the SrvPoller to actually run
clearTimeout(topology.s.srvPoller._timeout);
}

describe('srvRecordDiscovery event listener', function () {
beforeEach(() => {
// fake a transition to Sharded
transitionTopology(topology, TopologyType.Unknown, TopologyType.Sharded);
expect(topology.s.srvPoller).to.be.instanceOf(SrvPoller);

const srvPollerListeners = topology.s.srvPoller.listeners(SrvPoller.SRV_RECORD_DISCOVERY);
expect(srvPollerListeners).to.have.lengthOf(1);
expect(srvPollerListeners[0]).to.equal(topology.s.detectSrvRecords);
const topologyChangeListeners = topology.listeners(Topology.TOPOLOGY_DESCRIPTION_CHANGED);
expect(topologyChangeListeners).to.have.lengthOf(1);
expect(topologyChangeListeners[0]).to.equal(topology.s.detectShardedTopology);
});

it('should emit topologyDescriptionChange event', function () {
topology.once(Topology.TOPOLOGY_DESCRIPTION_CHANGED, ev => {
// The first event we get here is caused by the srv record discovery event below
expect(ev).to.have.nested.property('newDescription.servers');
expect(ev.newDescription.servers.get('fake:2'))
.to.be.a('object')
.with.property('address', 'fake:2');
});

topology.s.srvPoller.emit(
SrvPoller.SRV_RECORD_DISCOVERY,
new SrvPollingEvent([{ priority: 1, weight: 1, port: 2, name: 'fake' }])
);
});

it('should clean up listeners on close', function (done) {
topology.s.state = 'connected'; // fake state to test clean up logic
topology.close(e => {
const srvPollerListeners = topology.s.srvPoller.listeners(
SrvPoller.SRV_RECORD_DISCOVERY
);
expect(srvPollerListeners).to.have.lengthOf(0);
const topologyChangeListeners = topology.listeners(
Topology.TOPOLOGY_DESCRIPTION_CHANGED
);
expect(topologyChangeListeners).to.have.lengthOf(0);
done(e);
});
});
});

describe('topologyDescriptionChange event listener', function () {
it('should not add more than one srvRecordDiscovery listener', function () {
// fake a transition to Sharded
transitionTopology(topology, TopologyType.Unknown, TopologyType.Sharded); // Transition 1

const srvListenersFirstTransition = topology.s.srvPoller.listeners(
SrvPoller.SRV_RECORD_DISCOVERY
);
expect(srvListenersFirstTransition).to.have.lengthOf(1);

transitionTopology(topology, TopologyType.Unknown, TopologyType.Sharded); // Transition 2

const srvListenersSecondTransition = topology.s.srvPoller.listeners(
SrvPoller.SRV_RECORD_DISCOVERY
);
expect(srvListenersSecondTransition).to.have.lengthOf(1);
});

it('should not add srvRecordDiscovery listener if transition is not to Sharded topology', function () {
// fake a transition to **NOT** Sharded
transitionTopology(topology, TopologyType.Unknown, TopologyType.ReplicaSetWithPrimary);

const srvListeners = topology.s.srvPoller.listeners(SrvPoller.SRV_RECORD_DISCOVERY);
expect(srvListeners).to.have.lengthOf(0);
});
});
});
});
});

0 comments on commit 579119f

Please sign in to comment.