Skip to content

Commit

Permalink
Merge 048d1c8 into 511d88b
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Feb 11, 2020
2 parents 511d88b + 048d1c8 commit 2daf2df
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 38 deletions.
57 changes: 48 additions & 9 deletions lib/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ function Client (opts) {
events.EventEmitter.call(this)
this.parseOptions(opts)
this.initState()
// Select a server to connect to.
this.selectServer()
this.createConnection()
}

Expand Down Expand Up @@ -437,6 +439,7 @@ function Server (url) {
this.url = url
this.didConnect = false
this.reconnects = 0
this.lastConnect = 0
}

/**
Expand Down Expand Up @@ -484,6 +487,7 @@ Client.prototype.selectServer = function () {
}
}
this.servers.push(server)
return server
}

Client.prototype.checkNoEchoMismatch = function () {
Expand Down Expand Up @@ -724,9 +728,7 @@ Client.prototype.setupHandlers = function () {
if (stream.bytesRead > 0) {
this.emit('disconnect')
}
if (this.closed === true ||
this.options.reconnect === false ||
((this.reconnects >= this.options.maxReconnectAttempts) && this.options.maxReconnectAttempts !== -1)) {
if (this.closed === true || this.options.reconnect === false || this.servers.length === 0) {
this.cleanupTimers()
this.emit('close')
} else {
Expand Down Expand Up @@ -887,9 +889,6 @@ Client.prototype.createConnection = function () {
this.info = null
this.infoReceived = false

// Select a server to connect to.
this.selectServer()

// See #45 if we have a stream release the listeners otherwise in addition
// to the leaking of events, the old events will still fire.
if (this.stream) {
Expand All @@ -916,7 +915,6 @@ Client.prototype.initState = function () {
this.connected = false
this.wasConnected = false
this.reconnecting = false
this.server = null
this.pending = []
this.pout = 0
}
Expand Down Expand Up @@ -1235,6 +1233,8 @@ Client.prototype.processInbound = function () {
// send any subscriptions, and strip pending
this.sendSubscriptions()
this.stripPendingSubs()
// reset the reconnects for this server
this.currentServer.reconnects = 0
// invoke the callback
this.connectCB()
})
Expand Down Expand Up @@ -2173,11 +2173,13 @@ Client.prototype.reconnect = function () {
if (this.closed) {
return
}
this.currentServer.reconnects += 1
this.reconnects += 1
this.createConnection()
if (this.currentServer.didConnect === true) {
this.emit('reconnecting')
}
this.currentServer.lastConnect = Date.now()
}

/**
Expand All @@ -2200,7 +2202,44 @@ Client.prototype.scheduleReconnect = function () {
if (this.servers[0].didConnect === true) {
wait = this.options.reconnectTimeWait
}
// Select a server to connect to - this will be
// the first server that meets the reconnectTimeWait criteria
const now = Date.now()
let maxWait = wait
for (let i = 0; i < this.servers.length; i++) {
const srv = this.selectServer()
if (srv.reconnects >= this.options.maxReconnectAttempts && this.options.maxReconnectAttempts !== -1) {
// remove the server - we already tried connecting max number of times
this.servers.pop()
continue
}
if (srv.lastConnect === undefined) {
// never connected here, try it right away
this.reconnect()
return
}
if (srv.lastConnect + wait <= now) {
// tried before, but after the min wait, try right away
this.reconnect()
return
} else {
// find the smallest amount of time we have to wait to maybe reconnect
const m = (srv.lastConnect + wait - now)
if (maxWait > m) {
maxWait = m
}
}
}

if (this.servers.length === 0) {
// we have no more servers
this.cleanupTimers()
this.emit('close')
this.close()
return
}
// if we are here, we cannot yet reconnect, but can at maxWait
setTimeout(() => {
this.reconnect()
}, wait)
this.scheduleReconnect()
}, maxWait)
}
8 changes: 5 additions & 3 deletions test/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,16 @@ describe('Cluster', () => {
s2 = null
})
})
nc.on('reconnecting', client => {
nc.on('reconnecting', () => {
const elapsed = new Date() - startTime
elapsed.should.be.within(WAIT, 5 * WAIT)
if (nc.currentServer.lastConnect !== 0) {
elapsed.should.be.within(nc.currentServer.lastConnect - Date.now(), 2 * WAIT)
}
startTime = new Date()
numAttempts += 1
})
nc.on('close', () => {
numAttempts.should.equal(ATTEMPTS)
numAttempts.should.equal(ATTEMPTS * 2)
nc.close()
done()
})
Expand Down
12 changes: 6 additions & 6 deletions test/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ describe('Basic Connectivity', () => {
connectingEvents++
})
setTimeout(() => {
connectingEvents.should.equal(5)
connectingEvents.should.be.within(5, 7)
nc.close()
done()
}, 550)
Expand All @@ -108,12 +108,12 @@ describe('Basic Connectivity', () => {
servers: natsServers
})
let recvMsg = ''
ua.subscribe('topic1', (msg, reply, subject) => {
ua.subscribe('topic1', (msg) => {
recvMsg = msg
})
setTimeout(() => {
ub.publish('topic1', 'hello')
}, 100 * 1)
}, 100)
setTimeout(() => {
recvMsg.should.equal('hello')
ua.close()
Expand All @@ -132,13 +132,13 @@ describe('Basic Connectivity', () => {
servers: natsServers,
noRandomize: true
})
let recvMsg = ''
ua.subscribe('topic1', (msg, reply, subject) => {
let recvMsg
ua.subscribe('topic1', (msg) => {
recvMsg = msg
})
setTimeout(() => {
ub.publish('topic1', 'hello')
}, 100 * 1)
}, 100)
setTimeout(() => {
recvMsg.should.equal('hello')
ua.close()
Expand Down
37 changes: 17 additions & 20 deletions test/reconnect.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ describe('Reconnect functionality', () => {
nc.on('connect', () => {
nsc.stopServer(server)
})
nc.on('reconnecting', (client) => {
nc.on('reconnecting', () => {
done(new Error('Reconnecting improperly called'))
})
nc.on('close', () => {
Expand All @@ -70,11 +70,14 @@ describe('Reconnect functionality', () => {
startTime = new Date()
})
})
nc.on('reconnecting', (client) => {
const elapsed = new Date() - startTime
elapsed.should.be.within(WAIT, 5 * WAIT)
nc.close()
done()
// first time it tries immediately, second time it waits
nc.once('reconnecting', () => {
nc.on('reconnecting', () => {
const elapsed = new Date() - startTime
elapsed.should.be.within(WAIT, WAIT * 2)
nc.close()
done()
})
})
nc.on('disconnect', () => {
const elapsed = new Date() - startTime
Expand Down Expand Up @@ -122,17 +125,11 @@ describe('Reconnect functionality', () => {
reconnectTimeWait: WAIT,
maxReconnectAttempts: ATTEMPTS
})
let startTime
let numAttempts = 0
nc.on('connect', () => {
nsc.stopServer(server, () => {
startTime = new Date()
})
nsc.stopServer(server, () => {})
})
nc.on('reconnecting', (client) => {
const elapsed = new Date() - startTime
elapsed.should.be.within(WAIT, 5 * WAIT)
startTime = new Date()
nc.on('reconnecting', () => {
numAttempts += 1
})
nc.on('close', () => {
Expand Down Expand Up @@ -163,7 +160,7 @@ describe('Reconnect functionality', () => {
server = null
})
})
nc.on('reconnecting', (client) => {
nc.on('reconnecting', () => {
numAttempts += 1
// attempt indefinitely to reconnect
nc.reconnects.should.equal(numAttempts)
Expand Down Expand Up @@ -192,7 +189,7 @@ describe('Reconnect functionality', () => {
server = null
})
})
nc.on('reconnecting', (client) => {
nc.on('reconnecting', () => {
// restart server and make sure next flush works ok
if (server === null) {
server = nsc.startServer(PORT)
Expand Down Expand Up @@ -221,7 +218,7 @@ describe('Reconnect functionality', () => {
nc.close()
done()
})
nc.on('reconnecting', (client) => {
nc.on('reconnecting', () => {
// restart server and make sure next flush works ok
if (server === null) {
server = nsc.startServer(PORT)
Expand Down Expand Up @@ -253,7 +250,7 @@ describe('Reconnect functionality', () => {
queue: 'myReconnectQueue'
}, cb)
}
nc.on('reconnecting', (client) => {
nc.on('reconnecting', () => {
// restart server and make sure next flush works ok
if (server === null) {
server = nsc.startServer(PORT)
Expand Down Expand Up @@ -325,7 +322,7 @@ describe('Reconnect functionality', () => {
port: PORT,
reconnectTimeWait: 100
})
nc.on('reconnecting', (client) => {
nc.on('reconnecting', () => {
// restart server
if (server === null) {
nc.publish('foo', () => {
Expand Down Expand Up @@ -354,7 +351,7 @@ describe('Reconnect functionality', () => {
nc.close()
done()
})
nc.on('reconnecting', (client) => {
nc.on('reconnecting', () => {
// restart server
if (server === null) {
nc.publish('foo')
Expand Down

0 comments on commit 2daf2df

Please sign in to comment.