Skip to content
This repository has been archived by the owner on Aug 31, 2021. It is now read-only.

Commit

Permalink
Merge 084dccd into 7f7f770
Browse files Browse the repository at this point in the history
  • Loading branch information
jpwilliams committed Jan 27, 2018
2 parents 7f7f770 + 084dccd commit 2ffd2c2
Showing 1 changed file with 71 additions and 8 deletions.
79 changes: 71 additions & 8 deletions lib/Endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,67 @@ class Endpoint {
return this._started
}

async pause (cold) {
if (!this._started || this._paused) {
if (this._resuming) {
console.warn('Tried to pause endpoint whilst busy resuming')
}

return this._paused
}

this._paused = new Promise((resolve, reject) => {
const ops = [this._consumer.cancel(this._consumerTag)]

if (cold) {
// cold pause requsted, so let's push all messages
// back in to the queue rather than handling them
this._cold = true
ops.push(this._consumer.recover())
}

return Promise.all(ops)
.then(resolve)
.catch(reject)
})

return this._paused
}

async resume () {
if (this._resuming) return this._resuming
if (!this._started) return this.start()
if (!this._starting && !this._paused) return

this._resuming = new Promise(async (resolve, reject) => {
let consumeResult

try {
consumeResult = await this._consumer.consume(
this._options.queue,
bubble.bind(this._incoming.bind(this)),
{
noAck: true,
exclusive: false
}
)
} catch (e) {
delete this._resuming

return reject(e)
}

this._consumerTag = consumeResult.consumerTag
delete this._resuming
delete this._paused
delete this._cold

return resolve()
})

return this._resuming
}

async _incoming (message) {
if (!message) {
throwAsException(new Error('Consumer cancelled unexpectedly; this was most probably done via RabbitMQ\'s management panel'))
Expand Down Expand Up @@ -106,6 +167,10 @@ class Endpoint {
await resultOp
} else {
let finalData = await resultOp

// if a cold pause has been requested, don't process this
if (this._cold) return

finalData = serializeData(finalData)

const worker = await this
Expand Down Expand Up @@ -138,6 +203,8 @@ class Endpoint {
}

async _setup ({ queue, event, prefetch = 48 }) {
this._starting = true

try {
const worker = await this._remit._workers.acquire()

Expand All @@ -151,6 +218,7 @@ class Endpoint {

this._remit._workers.release(worker)
} catch (e) {
delete this._starting
this._remit._workers.destroy(worker)
throw e
}
Expand All @@ -172,17 +240,12 @@ class Endpoint {
event
)

await this._consumer.consume(
queue,
bubble.bind(this._incoming.bind(this)),
{
noAck: true,
exclusive: false
}
)
await this.resume()
delete this._starting

return this
} catch (e) {
delete this._starting
throwAsException(e)
}
}
Expand Down

0 comments on commit 2ffd2c2

Please sign in to comment.