Skip to content

Commit

Permalink
Fix #153 - handle nats connections that are already connected (#154)
Browse files Browse the repository at this point in the history
* FIX #153 - refactored the connection handler setup so as to be aware of a nats connection that is already connected.

In that case the handlers will need to be added as they are, but the protocol initialization will have to be triggered by the library.
  • Loading branch information
aricart committed Jan 24, 2020
1 parent fec6cbe commit c7a288c
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 104 deletions.
217 changes: 115 additions & 102 deletions lib/stan.js
Original file line number Diff line number Diff line change
Expand Up @@ -271,116 +271,104 @@ Stan.prototype.initState = function () {
* Stan#error
*/

/**
* Connect to a NATS Streaming subsystem
* @fires Stan#connect, Stan#close, Stan#reconnecting, Stan#error
*/
Stan.prototype.createConnection = function () {
if (typeof this.options.nc === 'object') {
if (this.options.nc.encoding !== 'binary') {
throw new Error(BINARY_ENCODING_REQUIRED)
} else {
this.nc = this.options.nc
}
}
if (this.nc === undefined) {
const encoding = this.options.encoding
if (encoding && encoding !== 'binary') {
throw new Error(BINARY_ENCODING_REQUIRED)
} else {
this.options.encoding = 'binary'
}
this.nc = nats.connect(this.options)
this.ncOwned = true
}

this.nc.on('connect', () => {
// heartbeat processing
const hbInbox = nats.createInbox()
this.hbSubscription = this.nc.subscribe(hbInbox, (msg, reply) => {
this.nc.publish(reply)
})
// @private
Stan.prototype.initConnection = function () {
// heartbeat processing
const hbInbox = nats.createInbox()
this.hbSubscription = this.nc.subscribe(hbInbox, (msg, reply) => {
this.nc.publish(reply)
})

this.pingInbox = nats.createInbox()
this.pingSubscription = this.nc.subscribe(this.pingInbox, (msg) => {
if (msg) {
const pingResponse = proto.pb.PingResponse.deserializeBinary(Buffer.from(msg, 'binary'))
const err = pingResponse.getError()
if (err) {
this.closeWithError('connection_lost', err)
return
}
this.pingInbox = nats.createInbox()
this.pingSubscription = this.nc.subscribe(this.pingInbox, (msg) => {
if (msg) {
const pingResponse = proto.pb.PingResponse.deserializeBinary(Buffer.from(msg, 'binary'))
const err = pingResponse.getError()
if (err) {
this.closeWithError('connection_lost', err)
return
}
this.pingOut = 0
})
}
this.pingOut = 0
})

this.ackSubscription = this.nc.subscribe(this.ackSubject, this.processAck())
this.ackSubscription = this.nc.subscribe(this.ackSubject, this.processAck())

const discoverSubject = this.options.discoverPrefix + '.' + this.clusterID
// noinspection JSUnresolvedFunction
this.connId = Buffer.from(nuid.next(), 'utf8')
const req = new proto.pb.ConnectRequest()
req.setClientId(this.clientID)
req.setHeartbeatInbox(hbInbox)
req.setProtocol(PROTOCOL_ONE)
req.setConnId(this.connId)
req.setPingInterval(Math.ceil(this.options.stanPingInterval / 1000))
req.setPingMaxOut(this.options.stanMaxPingOut)

this.nc.requestOne(discoverSubject, Buffer.from(req.serializeBinary()), this.options.connectTimeout, (msg) => {
if (msg instanceof nats.NatsError) {
let err = msg
if (msg.code === nats.REQ_TIMEOUT) {
err = new nats.NatsError(CONNECT_REQ_TIMEOUT, CONNECT_REQ_TIMEOUT, err)
}
this.closeWithError('error', err)
return
const discoverSubject = this.options.discoverPrefix + '.' + this.clusterID
// noinspection JSUnresolvedFunction
this.connId = Buffer.from(nuid.next(), 'utf8')
const req = new proto.pb.ConnectRequest()
req.setClientId(this.clientID)
req.setHeartbeatInbox(hbInbox)
req.setProtocol(PROTOCOL_ONE)
req.setConnId(this.connId)
req.setPingInterval(Math.ceil(this.options.stanPingInterval / 1000))
req.setPingMaxOut(this.options.stanMaxPingOut)

this.nc.requestOne(discoverSubject, Buffer.from(req.serializeBinary()), this.options.connectTimeout, (msg) => {
if (msg instanceof nats.NatsError) {
let err = msg
if (msg.code === nats.REQ_TIMEOUT) {
err = new nats.NatsError(CONNECT_REQ_TIMEOUT, CONNECT_REQ_TIMEOUT, err)
}
this.closeWithError('error', err)
return
}

const cr = proto.pb.ConnectResponse.deserializeBinary(Buffer.from(msg, 'binary'))
if (cr.getError() !== '') {
this.closeWithError('error', cr.getError())
return
}
this.pubPrefix = cr.getPubPrefix()
this.subRequests = cr.getSubRequests()
this.unsubRequests = cr.getUnsubRequests()
this.subCloseRequests = cr.getSubCloseRequests()
this.closeRequests = cr.getCloseRequests()

let unsubPingSub = true
if (cr.getProtocol() >= PROTOCOL_ONE) {
if (cr.getPingInterval() !== 0) {
unsubPingSub = false

this.pingRequests = cr.getPingRequests()
this.stanPingInterval = cr.getPingInterval() * 1000
this.stanMaxPingOut = cr.getPingMaxOut()

const ping = new proto.pb.Ping()
ping.setConnId(this.connId)
this.pingBytes = Buffer.from(ping.serializeBinary())

this.pingOut = 0
const that = this
this.pingTimer = setTimeout(function pingFun () {
that.pingOut++
if (that.pingOut > that.stanMaxPingOut) {
that.closeWithError('connection_lost', new Error(MAX_PINGS_EXCEEDED))
return
}
that.nc.publish(that.pingRequests, that.pingBytes, that.pingInbox)
that.pingTimer = setTimeout(pingFun, that.stanPingInterval)
}, this.stanPingInterval)
}
}
if (unsubPingSub) {
this.nc.unsubscribe(this.pingSubscription)
this.pingSubscription = null
const cr = proto.pb.ConnectResponse.deserializeBinary(Buffer.from(msg, 'binary'))
if (cr.getError() !== '') {
this.closeWithError('error', cr.getError())
return
}
this.pubPrefix = cr.getPubPrefix()
this.subRequests = cr.getSubRequests()
this.unsubRequests = cr.getUnsubRequests()
this.subCloseRequests = cr.getSubCloseRequests()
this.closeRequests = cr.getCloseRequests()

let unsubPingSub = true
if (cr.getProtocol() >= PROTOCOL_ONE) {
if (cr.getPingInterval() !== 0) {
unsubPingSub = false

this.pingRequests = cr.getPingRequests()
this.stanPingInterval = cr.getPingInterval() * 1000
this.stanMaxPingOut = cr.getPingMaxOut()

const ping = new proto.pb.Ping()
ping.setConnId(this.connId)
this.pingBytes = Buffer.from(ping.serializeBinary())

this.pingOut = 0
const that = this
this.pingTimer = setTimeout(function pingFun () {
that.pingOut++
if (that.pingOut > that.stanMaxPingOut) {
that.closeWithError('connection_lost', new Error(MAX_PINGS_EXCEEDED))
return
}
that.nc.publish(that.pingRequests, that.pingBytes, that.pingInbox)
that.pingTimer = setTimeout(pingFun, that.stanPingInterval)
}, this.stanPingInterval)
}
}
if (unsubPingSub) {
this.nc.unsubscribe(this.pingSubscription)
this.pingSubscription = null
}

this.emit('connect', this)
})
this.emit('connect', this)
})
}

// @private
Stan.prototype.setupHandlers = function () {
// if they gave us a connection, it could be connected, so the `connect` event may never fire.
if (!this.ncOwned && this.nc.connected) {
this.initConnection()
}
this.nc.on('connect', () => {
this.initConnection()
})

this.nc.on('close', () => {
Expand All @@ -406,6 +394,31 @@ Stan.prototype.createConnection = function () {
})
}

/**
* Connect to a NATS Streaming subsystem
* @fires Stan#connect, Stan#close, Stan#reconnecting, Stan#error
*/
Stan.prototype.createConnection = function () {
if (typeof this.options.nc === 'object') {
if (this.options.nc.encoding !== 'binary') {
throw new Error(BINARY_ENCODING_REQUIRED)
} else {
this.nc = this.options.nc
}
}
if (this.nc === undefined) {
const encoding = this.options.encoding
if (encoding && encoding !== 'binary') {
throw new Error(BINARY_ENCODING_REQUIRED)
} else {
this.options.encoding = 'binary'
}
this.nc = nats.connect(this.options)
this.ncOwned = true
}
this.setupHandlers()
}

/**
* Close stan invoking the event notification with the
* specified error, followed by a close notification.
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "node-nats-streaming",
"version": "0.3.0-2",
"version": "0.3.0-3",
"description": "Node.js client for NATS Streaming, a lightweight, high-performance cloud native messaging system",
"keywords": [
"nats",
Expand Down
18 changes: 18 additions & 0 deletions test/stan_connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,24 @@ describe('Stan Connect', () => {
})
})

it('assigning already connected nats-connection should work', (done) => {
const nc = NATS.connect({
uri: uri,
encoding: 'binary'
})
nc.on('connect', () => {
const opts = {
nc: nc
}
const stan = STAN.connect(cluster, nuid.next(), opts)
stan.on('connect', () => {
stan.close()
nc.close()
done()
})
})
})

it('non-binary nats connection should fail', (done) => {
const nc = NATS.connect({
uri: uri,
Expand Down

0 comments on commit c7a288c

Please sign in to comment.