Skip to content

Commit

Permalink
fix: bad channel state on server close (#18)
Browse files Browse the repository at this point in the history
this change corrects several flow state issues when channel would be closed by the server, additionally it introduces a 1 microtask tick delay on opening the channel to ensure that the logic, which determines whether the channel was closed or not is correctly processing channel reassignments
  • Loading branch information
AVVS committed Mar 26, 2024
1 parent f5dde07 commit 4c0fb3a
Show file tree
Hide file tree
Showing 15 changed files with 1,092 additions and 923 deletions.
18 changes: 9 additions & 9 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
"compile": "pnpm -r clean; tsc -b tsconfig.build.json"
},
"devDependencies": {
"@makeomatic/deploy": "^13.0.6",
"@node-rs/xxhash-linux-arm64-musl": "^1.5.1",
"@node-rs/xxhash-linux-x64-musl": "^1.5.1",
"@swc-node/register": "^1.6.8",
"@swc/core": "1.3.102",
"@swc/core-linux-arm64-musl": "^1.3.102",
"@swc/core-linux-x64-musl": "^1.3.102",
"@swc/helpers": "^0.5.3",
"@makeomatic/deploy": "^13.0.7",
"@node-rs/xxhash-linux-arm64-musl": "^1.7.0",
"@node-rs/xxhash-linux-x64-musl": "^1.7.0",
"@swc-node/register": "^1.9.0",
"@swc/core": "1.4.11",
"@swc/core-linux-arm64-musl": "^1.4.11",
"@swc/core-linux-x64-musl": "^1.4.11",
"@swc/helpers": "^0.5.7",
"multi-semantic-release": "^3.0.2",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.3.3"
"typescript": "^5.4.3"
}
}
8 changes: 4 additions & 4 deletions packages/amqp-codec/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
"author": "Vitaly Aminev <v@makeomatic.ca>",
"license": "MIT",
"devDependencies": {
"@swc-node/register": "^1.6.8",
"@swc-node/register": "^1.9.0",
"@types/debug": "^4.1.12",
"@types/node": "^20.10.5",
"eslint": "^8.56.0",
"@types/node": "^20.11.30",
"eslint": "^8.57.0",
"eslint-config-makeomatic": "^6.0.0",
"rimraf": "^5.0.5",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.3.3"
"typescript": "^5.4.3"
},
"dependencies": {
"debug": "^4.3.4"
Expand Down
28 changes: 14 additions & 14 deletions packages/amqp-coffee/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
"dependencies": {
"@microfleet/amqp-codec": "workspace:^",
"async": "^3.2.5",
"bl": "^6.0.9",
"bson": "^6.2.0",
"bl": "^6.0.12",
"bson": "^6.5.0",
"bytes": "^3.1.2",
"debug": "^4.3.4",
"fastq": "^1.16.0",
"fastq": "^1.17.1",
"lodash": "^4.17.21",
"read-pkg": "^5.2.0"
},
Expand All @@ -47,30 +47,30 @@
"@makeomatic/ref-napi": "^3.0.6"
},
"devDependencies": {
"@makeomatic/deploy": "^13.0.5",
"@makeomatic/deploy": "^13.0.7",
"@types/async": "^3.2.24",
"@types/bytes": "^3.1.4",
"@types/debug": "^4.1.12",
"@types/lodash": "^4.14.202",
"@types/lodash": "^4.17.0",
"@types/mocha": "^10.0.6",
"@types/node": "^20.10.5",
"@types/readable-stream": "^4.0.10",
"@types/sinon": "^17.0.2",
"@types/uuid": "^9.0.7",
"@types/node": "^20.11.30",
"@types/readable-stream": "^4.0.11",
"@types/sinon": "^17.0.3",
"@types/uuid": "^9.0.8",
"bluebird": "^3.7.2",
"c8": "^8.0.1",
"c8": "^9.1.0",
"coffeescript": "^2.7.0",
"cross-env": "^7.0.3",
"eslint": "^8.56.0",
"eslint": "^8.57.0",
"eslint-config-makeomatic": "^6.0.0",
"mocha": "^10.2.0",
"mocha": "^10.4.0",
"rimraf": "^5.0.5",
"semantic-release": "^22.0.12",
"semantic-release": "^23.0.6",
"should": "13.2.3",
"sinon": "^17.0.1",
"source-map-support": "^0.5.21",
"tsconfig-paths": "^4.2.0",
"typescript": "^5.3.3",
"typescript": "^5.4.3",
"underscore": "^1.13.6",
"uuid": "^9.0.1"
},
Expand Down
37 changes: 20 additions & 17 deletions packages/amqp-coffee/src/lib/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ export abstract class Channel extends EventEmitter {
super({ captureRejections: true })
this.taskPush = this.taskPush.bind(this)
this.openAsync = promisify(this.open)
this.open()
queueMicrotask(() => this.open())
}

temporaryChannel() {
Expand Down Expand Up @@ -119,8 +119,17 @@ export abstract class Channel extends EventEmitter {
this.state = ChannelState.opening

if (cb) this.waitForMethod(methods.channelOpenOk, cb)
this.connection._sendMethod(this.channel, methods.channelOpen, {})
this.connection.channelManager.channelCount += 1

const { connection } = this
const { channelManager } = connection

// channelManager assigns this after constructor
// so open must always be called in the next tick
if (channelManager.isChannelClosed(this.channel)) {
channelManager.channelReassign(this)
}

connection._sendMethod(this.channel, methods.channelOpen, {})

if (this.transactional) this.temporaryChannel()
} else if (cb) {
Expand Down Expand Up @@ -291,7 +300,9 @@ export abstract class Channel extends EventEmitter {
}

async _taskWorker(task: Task): Promise<void> {
if (this.transactional) {
const { transactional, state, connection, channel } = this

if (transactional) {
this.lastChannelAccess = performance.now()
}

Expand All @@ -303,28 +314,20 @@ export abstract class Channel extends EventEmitter {
return
}

if (this.state === ChannelState.closed && this.connection.state === 'open') {
this.connection.channelManager.channelReassign(this)
if (state === ChannelState.closed && connection.state === ConnectionState.open) {
await this.openAsync().catch(noopErr)
debug(4, () => ['openAsync done: channel number', this.channel])
this.queue.unshift(task)
return
}

const { connection } = this

if (this.state !== ChannelState.open) {
if (state !== ChannelState.open) {
// if our connection is closed that ok, but if its destroyed it will not reopen
if (connection.state === ConnectionState.destroyed) {
cb?.(new Error('Connection is destroyed'))
return
}

if (this.state !== ChannelState.opening && connection.channelManager.isChannelClosed(this.channel)) {
debug(4, () => ['channel', this.channel, 'in state', this.state, 'marked as closed'])
connection.channelManager.channelReassign(this)
}

await once(this, 'open')
this.queue.unshift(task)
return
Expand All @@ -335,11 +338,11 @@ export abstract class Channel extends EventEmitter {
: null

if (type === TaskType.method) {
connection._sendMethod(this.channel, method, task.args)
connection._sendMethod(channel, method, task.args)
} else if (type === TaskType.publish) {
connection.connection.cork()
this.connection._sendMethod(this.channel, method, task.options)
this.connection._sendBody(this.channel, data, task.options)
this.connection._sendMethod(channel, method, task.options)
this.connection._sendBody(channel, data, task.options)
process.nextTick(() => connection.connection.uncork())
} else {
throw new Error(`a task was queue with an unknown type of ${type}`)
Expand Down
15 changes: 11 additions & 4 deletions packages/amqp-coffee/src/lib/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,13 @@ export class Connection extends EventEmitter {
}

_connectionClosedEvent(had_error?: boolean) {
const previousState = this.state

// set to closed for channels
if (previousState !== ConnectionState.closed && previousState !== ConnectionState.destroyed) {
this.state = ConnectionState.closed
}

debug(1, () => ['received connection closed event', had_error])

// go through all of our channels and close them
Expand All @@ -422,9 +429,9 @@ export class Connection extends EventEmitter {
}

if (this._connectTimeout) clearTimeout(this._connectTimeout)
if (this.state === ConnectionState.open) this.emit('close')
if (previousState === ConnectionState.open) this.emit('close')

if (this.state !== ConnectionState.destroyed) {
if (previousState !== ConnectionState.destroyed) {
if (!this.connectionOptions.reconnect) {
debug(1, () => 'Connection closed not reconnecting...')
return
Expand Down Expand Up @@ -534,7 +541,7 @@ export class Connection extends EventEmitter {

// called directly in tests to simulate missed heartbeat
_missedHeartbeat() {
if (this.state === 'open') {
if (this.state === ConnectionState.open) {
debug(1, () => 'We missed a heartbeat, destroying the connection.')
this.connection.destroy()
}
Expand Down Expand Up @@ -595,7 +602,7 @@ export class Connection extends EventEmitter {

_sendMethod<T extends Methods>(channel: number, method: T, args: Partial<InferOptions<T>>) {

debug(3, () => [`_sendMethod`, `state=${this.state}`, `OpeningStates=${JSON.stringify(OpeningStates)}`])
debug(3, () => [`_sendMethod`, `state=${this.state}`, `OpeningStates=${JSON.stringify(OpeningStates)}`, `channel=${channel}`])

if (channel !== 0 && OpeningStates.includes(this.state)) {
// TODO: introduce queue instead of spawning promises
Expand Down
10 changes: 9 additions & 1 deletion packages/amqp-coffee/src/lib/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,15 @@ export class Consumer extends Channel {
return { consumerTag: this.consumerTag }
}

return this._consume()
try {
return await this._consume()
} catch (err: any) {
if (err instanceof ConnectionResetError) {
throw err
} else {
throw new ServerClosedError(err)
}
}
}

async flow(active: boolean): Promise<BasicConsumeResponse | BasicCancelResponse> {
Expand Down
32 changes: 25 additions & 7 deletions packages/amqp-coffee/src/lib/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import { methods, MethodFrame, ContentHeader } from '@microfleet/amqp-codec'
import { once } from 'events'

import { Channel } from './channel'
import { Channel, ChannelState } from './channel'
import * as defaults from './defaults'

import { BasicReturnError } from './errors/basic-return-error'
Expand Down Expand Up @@ -87,11 +87,20 @@ export class Publisher extends Channel {

this.confirm = confirm != null ? confirm : false
if (this.confirm) {
this.confirmMode()
queueMicrotask(() => this.confirmMode())
}
}

confirmMode(cb?: () => void): void {
if (this.confirmState === ConfirmState.open) {
return cb?.()
}

if (this.confirmState === ConfirmState.opening) {
if (cb) this.once('confirm', cb)
return
}

this.confirmState = ConfirmState.opening
this.taskPush(methods.confirmSelect, { noWait: false }, methods.confirmSelectOk, () => {
this.confirmState = ConfirmState.open
Expand All @@ -108,6 +117,8 @@ export class Publisher extends Channel {
}

_channelClosed(message = new Error('Channel closed, try again')): void {
debug(4, () => ['_channelClosed', this.channel, 'err', message])

this.confirmState = ConfirmState.closed

for (const cb of this.seqCallbacks.values()) {
Expand All @@ -128,13 +139,13 @@ export class Publisher extends Channel {
}

_inoperableState(): boolean {
return this.state !== 'open' || (this.confirm && this.confirmState !== 'open')
return this.state !== ChannelState.open || (this.confirm && this.confirmState !== ConfirmState.open)
}

_recoverableState(): boolean {
return this.state === 'opening'
|| this.state === 'closed'
|| (this.confirm && this.confirmState === 'opening')
return this.state === ChannelState.opening
|| this.state === ChannelState.closed
|| (this.confirm && this.confirmState === ConfirmState.opening)
}

async _wait(eventName: string) {
Expand Down Expand Up @@ -175,11 +186,18 @@ export class Publisher extends Channel {
throw new Error(`Channel ${this.channel} is closed and will not re-open? ${this.state} ${this.confirm} ${this.confirmState}`)
}

debug(4, () => ['state recoverable, wait for:', this.confirm ? 'confirm' : 'open'])
debug(4, () => ['state recoverable, wait for:', this.confirm ? 'confirm' : 'open', 'on', this.channel])

// ensure we reopen the channel when requested
this.open()

// set it into confirm mode
if (this.confirm && (this.confirmState === ConfirmState.closed || this.confirmState === ConfirmState.noAck)) {
this.confirmMode()
}

await this._wait(this.confirm ? 'confirm' : 'open')
debug(4, () => ['state recovered', this.channel])
}

async publishMessageAsync(_data: unknown, options: PublishOptions) {
Expand Down
5 changes: 5 additions & 0 deletions packages/amqp-coffee/test/.eslintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"env": {
"mocha": true
}
}
35 changes: 21 additions & 14 deletions packages/amqp-coffee/test/consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -729,9 +729,27 @@ describe('Consumer', () => {

const messageProcessor = async (m: Message) => {
messagesRecieved += 1

thisproxy.interrupt()
await setTimeout(25)
await consumer.resume() // must call .close() on error

// as this won't bubble further
try {
await consumer.resume() // must call .close() on error
} catch (err) {
assert(err instanceof ServerClosedError)
err.reason.replyCode.should.eql(404)
errorCount += 1

await consumer.close()

if (errorCount === 1) {
await setTimeout(300)
errorCount.should.eql(1)
messagesRecieved.should.eql(1)
done = true
}
}
}

let queue = ''
Expand All @@ -741,19 +759,8 @@ describe('Consumer', () => {
await q.declare()
queue = q.queueOptions.queue

consumer = await amqp.consume(queue, {prefetchCount: 1}, messageProcessor)
consumer.on('error', async (error) => {
assert(error instanceof ServerClosedError)
error.reason.replyCode.should.eql(404)
errorCount += 1
await consumer.close()
if (errorCount === 1) {
await setTimeout(300)
errorCount.should.eql(1)
messagesRecieved.should.eql(1)
done = true
}
})
consumer = await amqp.consumer()
await consumer.consume(queue, messageProcessor, { prefetchCount: 1 })

await amqp.publish("", queue, testData, {confirm: true})
await verify(() => done)
Expand Down
Loading

0 comments on commit 4c0fb3a

Please sign in to comment.