Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-3272)!: emit correct event type when SRV Polling #2825

Merged
merged 7 commits into from Jun 3, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/sdam/srv_polling.ts
Expand Up @@ -59,6 +59,9 @@ export class SrvPoller extends TypedEventEmitter<SrvPollerEvents> {
generation: number;
_timeout?: NodeJS.Timeout;

/** @event */
static readonly SRV_RECORD_DISCOVERY = 'srvRecordDiscovery' as const;

constructor(options: SrvPollerOptions) {
super();

Expand Down Expand Up @@ -110,7 +113,7 @@ export class SrvPoller extends TypedEventEmitter<SrvPollerEvents> {
success(srvRecords: dns.SrvRecord[]): void {
this.haMode = false;
this.schedule();
this.emit('srvRecordDiscovery', new SrvPollingEvent(srvRecords));
this.emit(SrvPoller.SRV_RECORD_DISCOVERY, new SrvPollingEvent(srvRecords));
}

failure(message: string, obj?: NodeJS.ErrnoException): void {
Expand Down
100 changes: 43 additions & 57 deletions src/sdam/topology.ts
Expand Up @@ -128,8 +128,8 @@ export interface TopologyPrivate {

/** related to srv polling */
srvPoller?: SrvPoller;
detectTopologyDescriptionChange?: (event: TopologyDescriptionChangedEvent) => void;
handleSrvPolling?: (event: SrvPollingEvent) => void;
detectShardedTopology: (event: TopologyDescriptionChangedEvent) => void;
detectSrvRecords: (event: SrvPollingEvent) => void;
}

/** @public */
Expand Down Expand Up @@ -319,36 +319,57 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
clusterTime: undefined,

// timer management
connectionTimers: new Set<NodeJS.Timeout>()
connectionTimers: new Set<NodeJS.Timeout>(),

detectShardedTopology: ev => this.detectShardedTopology(ev),
detectSrvRecords: ev => this.detectSrvRecords(ev)
};

if (options.srvHost) {
this.s.srvPoller =
options.srvPoller ||
options.srvPoller ??
new SrvPoller({
heartbeatFrequencyMS: this.s.heartbeatFrequencyMS,
srvHost: options.srvHost
});

this.s.detectTopologyDescriptionChange = (ev: TopologyDescriptionChangedEvent) => {
const previousType = ev.previousDescription.type;
const newType = ev.newDescription.type;
this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);
}
}

if (previousType !== TopologyType.Sharded && newType === TopologyType.Sharded) {
this.s.handleSrvPolling = srvPollingHandler(this);
if (this.s.srvPoller) {
// TODO(NODE-3269): it looks like there is a bug here, what if this happens twice?
this.s.srvPoller.on('srvRecordDiscovery', this.s.handleSrvPolling);
this.s.srvPoller.start();
}
}
};
private detectShardedTopology(event: TopologyDescriptionChangedEvent) {
const previousType = event.previousDescription.type;
const newType = event.newDescription.type;

const transitionToSharded =
previousType !== TopologyType.Sharded && newType === TopologyType.Sharded;
const srvListeners = this.s.srvPoller?.listeners(SrvPoller.SRV_RECORD_DISCOVERY);
const listeningToSrvPolling = !!srvListeners?.includes(this.s.detectSrvRecords);

if (transitionToSharded && !listeningToSrvPolling) {
this.s.srvPoller?.on(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
this.s.srvPoller?.start();
}
}

this.on(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectTopologyDescriptionChange);
private detectSrvRecords(ev: SrvPollingEvent) {
const previousTopologyDescription = this.s.description;
this.s.description = this.s.description.updateFromSrvPollingEvent(ev);
if (this.s.description === previousTopologyDescription) {
// Nothing changed, so return
return;
}

// NOTE: remove this when NODE-1709 is resolved
this.setMaxListeners(Infinity);
updateServers(this);

this.emit(
Topology.TOPOLOGY_DESCRIPTION_CHANGED,
new TopologyDescriptionChangedEvent(
this.s.id,
previousTopologyDescription,
this.s.description
)
);
}

/**
Expand Down Expand Up @@ -456,20 +477,10 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {

if (this.s.srvPoller) {
this.s.srvPoller.stop();
if (this.s.handleSrvPolling) {
this.s.srvPoller.removeListener('srvRecordDiscovery', this.s.handleSrvPolling);
delete this.s.handleSrvPolling;
}
this.s.srvPoller.removeListener(SrvPoller.SRV_RECORD_DISCOVERY, this.s.detectSrvRecords);
}

if (this.s.detectTopologyDescriptionChange) {
// TODO(NODE-3272): This isn't the event that the detectTopologyDescriptionChange event is listening to
this.removeListener(
Topology.SERVER_DESCRIPTION_CHANGED,
this.s.detectTopologyDescriptionChange
);
delete this.s.detectTopologyDescriptionChange;
}
this.removeListener(Topology.TOPOLOGY_DESCRIPTION_CHANGED, this.s.detectShardedTopology);

for (const session of this.s.sessions) {
session.endSession();
Expand All @@ -478,7 +489,7 @@ export class Topology extends TypedEventEmitter<TopologyEvents> {
this.s.sessionPool.endAllPooledSessions(() => {
eachAsync(
Array.from(this.s.servers.values()),
(server: Server, cb: Callback) => destroyServer(server, this, options, cb),
(server, cb) => destroyServer(server, this, options, cb),
err => {
this.s.servers.clear();

Expand Down Expand Up @@ -924,31 +935,6 @@ function updateServers(topology: Topology, incomingServerDescription?: ServerDes
}
}

function srvPollingHandler(topology: Topology) {
return function handleSrvPolling(ev: SrvPollingEvent) {
const previousTopologyDescription = topology.s.description;
topology.s.description = topology.s.description.updateFromSrvPollingEvent(ev);
if (topology.s.description === previousTopologyDescription) {
// Nothing changed, so return
return;
}

updateServers(topology);

topology.emit(
Topology.SERVER_DESCRIPTION_CHANGED,
// TODO(NODE-3272): This server description changed event is emitting a TopologyDescriptionChangeEvent
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-expect-error
new TopologyDescriptionChangedEvent(
topology.s.id,
previousTopologyDescription,
topology.s.description
)
);
};
}

function drainWaitQueue(queue: Denque<ServerSelectionRequest>, err?: AnyError) {
while (queue.length) {
const waitQueueMember = queue.shift();
Expand Down
2 changes: 0 additions & 2 deletions src/sessions.ts
Expand Up @@ -85,9 +85,7 @@ class ClientSession extends TypedEventEmitter<ClientSessionEvents> {
hasEnded: boolean;
clientOptions?: MongoOptions;
supports: { causalConsistency: boolean };
/** @internal */
clusterTime?: ClusterTime;
/** @internal */
operationTime?: Timestamp;
explicit: boolean;
/** @internal */
Expand Down
10 changes: 9 additions & 1 deletion test/functional/apm.test.js
Expand Up @@ -114,7 +114,8 @@ describe('APM', function () {
}
});

it('should correctly receive the APM events for a listIndexes command', {
// NODE-3308
it.skip('should correctly receive the APM events for a listIndexes command', {
metadata: { requires: { topology: ['replicaset'], mongodb: '>=3.0.0' } },

test: function () {
Expand Down Expand Up @@ -908,6 +909,13 @@ describe('APM', function () {
it(test.description, {
metadata: { requires: requirements },
test: function () {
if (
test.description ===
'A successful find event with a getmore and the server kills the cursor'
) {
this.skip();
}

const client = this.configuration.newClient({}, { monitorCommands: true });
return client.connect().then(client => {
expect(client).to.exist;
Expand Down
3 changes: 2 additions & 1 deletion test/functional/unified-spec-runner/unified-runner.test.ts
Expand Up @@ -7,7 +7,8 @@ const SKIPPED_TESTS = [
// These two tests need to run against multiple mongoses
'Dirty explicit session is discarded',
// Will be implemented as part of NODE-2034
'Client side error in command starting transaction'
'Client side error in command starting transaction',
'A successful find event with a getmore and the server kills the cursor' // NODE-3308
];

describe('Unified test format runner', function unifiedTestRunner() {
Expand Down
15 changes: 14 additions & 1 deletion test/tools/utils.js
Expand Up @@ -246,12 +246,25 @@ class EventCollector {
}
}

function getSymbolFrom(target, symbolName, assertExists = true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool util! 🎉

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, and good catch on the other ticket I'll be sure to note it in the merge

const symbol = Object.getOwnPropertySymbols(target).filter(
s => s.toString() === `Symbol(${symbolName})`
)[0];

if (assertExists && !symbol) {
throw new Error(`Did not find Symbol(${symbolName}) on ${target}`);
}

return symbol;
}

module.exports = {
EventCollector,
makeTestFunction,
ensureCalledWith,
ClassWithLogger,
ClassWithoutLogger,
ClassWithUndefinedLogger,
visualizeMonitoringEvents
visualizeMonitoringEvents,
getSymbolFrom
};
108 changes: 108 additions & 0 deletions test/unit/sdam/topology.test.js
Expand Up @@ -7,6 +7,11 @@ const { Topology } = require('../../../src/sdam/topology');
const { Server } = require('../../../src/sdam/server');
const { ServerDescription } = require('../../../src/sdam/server_description');
const { ns, makeClientMetadata } = require('../../../src/utils');
const { TopologyDescriptionChangedEvent } = require('../../../src/sdam/events');
const { TopologyDescription } = require('../../../src/sdam/topology_description');
const { TopologyType } = require('../../../src/sdam/common');
const { SrvPoller, SrvPollingEvent } = require('../../../src/sdam/srv_polling');
const { getSymbolFrom } = require('../../tools/utils');

describe('Topology (unit)', function () {
describe('client metadata', function () {
Expand Down Expand Up @@ -314,5 +319,108 @@ describe('Topology (unit)', function () {
});
return p;
});

describe('srv event listeners', function () {
/** @type {Topology} */
let topology;

beforeEach(() => {
topology = new Topology('', { srvHost: 'fakeHost' });

expect(topology.s.detectSrvRecords).to.be.a('function');
expect(topology.s.detectShardedTopology).to.be.a('function');
});

function transitionTopology(topology, from, to) {
topology.emit(
Topology.TOPOLOGY_DESCRIPTION_CHANGED,
new TopologyDescriptionChangedEvent(
2,
new TopologyDescription(from),
new TopologyDescription(to)
)
);
// We don't want the SrvPoller to actually run
clearTimeout(topology.s.srvPoller._timeout);
}

describe('srvRecordDiscovery event listener', function () {
beforeEach(() => {
// fake a transition to Sharded
transitionTopology(topology, TopologyType.Unknown, TopologyType.Sharded);
expect(topology.s.srvPoller).to.be.instanceOf(SrvPoller);

const srvPollerListeners = topology.s.srvPoller.listeners(SrvPoller.SRV_RECORD_DISCOVERY);
expect(srvPollerListeners).to.have.lengthOf(1);
expect(srvPollerListeners[0]).to.equal(topology.s.detectSrvRecords);
const topologyChangeListeners = topology.listeners(Topology.TOPOLOGY_DESCRIPTION_CHANGED);
expect(topologyChangeListeners).to.have.lengthOf(1);
expect(topologyChangeListeners[0]).to.equal(topology.s.detectShardedTopology);
});

it('should emit topologyDescriptionChange event', function () {
topology.once(Topology.TOPOLOGY_DESCRIPTION_CHANGED, ev => {
// The first event we get here is caused by the srv record discovery event below
expect(ev).to.have.nested.property('newDescription.servers');
expect(ev.newDescription.servers.get('fake:2'))
.to.be.a('object')
.with.property('address', 'fake:2');
});

topology.s.srvPoller.emit(
SrvPoller.SRV_RECORD_DISCOVERY,
new SrvPollingEvent([{ priority: 1, weight: 1, port: 2, name: 'fake' }])
);

// The srv event starts a monitor that we don't actually want running
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move the cleanup into an afterEach and iterate over all servers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good idea 🚀

const server = topology.s.servers.get('fake:2');
const kMonitor = getSymbolFrom(server, 'monitor');
const kMonitorId = getSymbolFrom(server[kMonitor], 'monitorId');
server[kMonitor][kMonitorId].stop();
});

it('should clean up listeners on close', function (done) {
topology.s.state = 'connected'; // fake state to test clean up logic
topology.close(e => {
const srvPollerListeners = topology.s.srvPoller.listeners(
SrvPoller.SRV_RECORD_DISCOVERY
);
expect(srvPollerListeners).to.have.lengthOf(0);
const topologyChangeListeners = topology.listeners(
Topology.TOPOLOGY_DESCRIPTION_CHANGED
);
expect(topologyChangeListeners).to.have.lengthOf(0);
done(e);
});
});
});

describe('topologyDescriptionChange event listener', function () {
it('should not add more than one srvRecordDiscovery listener', function () {
// fake a transition to Sharded
transitionTopology(topology, TopologyType.Unknown, TopologyType.Sharded); // Transition 1

const srvListenersFirstTransition = topology.s.srvPoller.listeners(
SrvPoller.SRV_RECORD_DISCOVERY
);
expect(srvListenersFirstTransition).to.have.lengthOf(1);

transitionTopology(topology, TopologyType.Unknown, TopologyType.Sharded); // Transition 2

const srvListenersSecondTransition = topology.s.srvPoller.listeners(
SrvPoller.SRV_RECORD_DISCOVERY
);
expect(srvListenersSecondTransition).to.have.lengthOf(1);
});

it('should not add srvRecordDiscovery listener if transition is not to Sharded topology', function () {
// fake a transition to **NOT** Sharded
transitionTopology(topology, TopologyType.Unknown, TopologyType.ReplicaSetWithPrimary);

const srvListeners = topology.s.srvPoller.listeners(SrvPoller.SRV_RECORD_DISCOVERY);
expect(srvListeners).to.have.lengthOf(0);
});
});
});
});
});