Skip to content

Commit

Permalink
Merge 1a101de into 2c1806c
Browse files Browse the repository at this point in the history
  • Loading branch information
aricart committed Feb 10, 2020
2 parents 2c1806c + 1a101de commit 04f2bb7
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 1 deletion.
10 changes: 10 additions & 0 deletions lib/nats.js
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,16 @@ Client.prototype.sendSubscriptions = function () {
proto = [SUB, sub.subject, sid + CR_LF]
}
protos += proto.join(SPC)

if (sub.max) {
const max = sub.max - sub.received
if (max > 0) {
proto = [UNSUB, sid, max + CR_LF]
} else {
proto = [UNSUB, sid + CR_LF]
}
protos += proto.join(SPC)
}
}
}
if (protos.length > 0) {
Expand Down
71 changes: 70 additions & 1 deletion test/basics.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const after = require('mocha').after
const before = require('mocha').before
const describe = require('mocha').describe
const it = require('mocha').it
const net = require('net')

describe('Basics', () => {
const PORT = 1423
Expand Down Expand Up @@ -151,7 +152,7 @@ describe('Basics', () => {
nc.publish(reply, replyMsg)
})

const sub = nc.request('foo', initMsg, reply => {
const sub = nc.request('foo', initMsg, _ => {
nc.flush(() => {
received.should.equal(expected)
nc.close()
Expand Down Expand Up @@ -1014,4 +1015,72 @@ describe('Basics', () => {
done()
})
})

it('should resend unsubs', (done) => {
let conn
let unsubs = 0
const srv = net.createServer((c) => {
c.write('INFO ' + JSON.stringify({
server_id: 'TEST',
version: '0.0.0',
host: '127.0.0.1',
port: srv.address.port,
auth_required: false
}) + '\r\n')
c.on('data', (d) => {
const r = d.toString()
const lines = r.split('\r\n')
lines.forEach((line) => {
if (line === '\r\n') {
return
}
if (/^CONNECT\s+/.test(line)) {
} else if (/^PING/.test(line)) {
c.write('PONG\r\n')
} else if (/^SUB\s+/i.test(line)) {
c.write('MSG test 2 11\r\nHello World\r\n')
} else if (/^UNSUB\s+/i.test(line)) {
unsubs++
if (unsubs === 1) {
const args = line.split(' ')
args.length.should.equal(3)
// number of messages to when to unsub
args[2].should.equal('10')
// kick the client
c.destroy()
return
}
if (unsubs === 2) {
const args = line.split(' ')
args.length.should.equal(3)
args[2].should.equal('9')
conn.close()
srv.close(() => {
done()
})
}
} else if (/^MSG\s+/i.test(line)) {
} else if (/^INFO\s+/i.test(line)) {
} else {
// unknown
}
})
})
c.on('error', () => {
// we are messing with the server so this will raise connection reset
})
})
srv.listen(0, () => {
const p = srv.address().port
const nc = NATS.connect('nats://localhost:' + p, {
reconnect: true,
reconnectTimeWait: 250
})
conn = nc
nc.on('connect', () => {
const opts = { max: 10 }
nc.subscribe('test', opts, () => {})
})
})
})
})

0 comments on commit 04f2bb7

Please sign in to comment.