Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only every other message is pulled from the queue #681

Closed
mod35 opened this issue Aug 14, 2021 · 2 comments
Closed

Only every other message is pulled from the queue #681

mod35 opened this issue Aug 14, 2021 · 2 comments

Comments

@mod35
Copy link

mod35 commented Aug 14, 2021

Hello,
I've been trying to use this library and am having a bit a weird time with it. I've written a simple class and a test to show what I'm seeing. The long and short of it is that how I'm setting it up seems to make only every other message on the queue get polled. I'm assuming I've done something wrong and somebody amazing might be able to point it out 🙏

node version: v12.14.1
bullmq version: 1.40.4

The setup docker-compose.yml

version: '3'

services:
  redis:
    image: bitnami/redis
    container_name: redis
    environment:
      - ALLOW_EMPTY_PASSWORD=yes
    ports:
      - "6379:6379"

The simple implementation

import { Job, Queue, Worker } from 'bullmq'
import Redis from 'ioredis'
import * as uuid from 'uuid'

export type Connection = Redis.Redis
export interface MessagesReceived {
  check: (message: string) => void
}

export class BrokenQueue {
  private queueConnection: Connection
  private workerConnection: Connection
  private queue: Queue
  private worker: Worker
  private polling: boolean
  private messagesReceived: MessagesReceived
  private maxRetries: number

  constructor(messagesReceived: MessagesReceived) {
    this.messagesReceived = messagesReceived
  }

  async initialise() {
    console.log('initialising the queue')
    const configuration = {
      queueName: 'broken-test',
      connectionString: 'redis://127.0.0.1:6379',
    }
    this.maxRetries = 3
    this.queueConnection = new Redis(configuration.connectionString)
    this.workerConnection =new Redis(configuration.connectionString)

    this.queue = new Queue(configuration.queueName, {
      connection: this.queueConnection
    })

    this.worker = new Worker(configuration.queueName, undefined, {connection: this.workerConnection, concurrency: 1})
  }

  async dispose() {
    this.stopPollingForMessage()
    await this.queue.close()
    await this.worker.close()
    this.queueConnection.disconnect()
    this.workerConnection.disconnect()
    console.log('queue disposed')
  }

  async send (messageName: string, message: string, priority?: number): Promise<void> {
    await this.queue.add(messageName, message, {
      jobId: uuid.v4(),
      attempts: this.maxRetries,
      removeOnComplete: true,
      priority
    })
    console.log('Message added!', message)
  }

  stopPollingForMessage() {
    this.polling = false
  }

  async pollForMessages() {
    if (this.polling) {
     console.log('don\'t try to poll twice!')
    }
    this.polling = true
    while(this.polling) {
      const token = uuid.v4()
      const job = (await this.worker.getNextJob(token)) as Job<string> | undefined

      if (!job || !job.data) {
        console.log('no message received')
        continue
      }
      console.log('Received message from Redis', {redisMessage: job.data})
      this.messagesReceived.check(job.data)
      await job.moveToCompleted(job.data, token)
    }
  }
}

Test file

import { BrokenQueue } from './broken-queue'

jest.setTimeout(30000)

export async function sleep (timeoutMs: number): Promise<void> {
  return new Promise(resolve => setTimeout(resolve, timeoutMs))
}
// Just a simple function I can spy on
const messagesReceived = {
  check: (message: string) => {}
}

const spy = jest.spyOn(messagesReceived, 'check')
describe('broken queue integration', () => {
  let sut: BrokenQueue

  async function purgeQueue() {
    // Is there a better way to purge the various `sub queues` apart from obliterate?
    return Promise.all(
      ['completed','wait','active','paused','delayed','failed']
        .map(jobState => sut['queue'].clean(5000, 100, jobState as any)))
  }

  describe('when sending multiple messages', () => {
    beforeAll(async () => {
      sut = new BrokenQueue(messagesReceived)
      await sut.initialise()
    })
    afterAll(async () => {
      await sut.dispose()
      await purgeQueue()
      spy.mockReset()
    })

    it('it should receive all four messages in the correct order', async () => {
      const fourMessages = ['one', 'two', 'three', 'four']
      // send all four messages - one after another to try to ensure predictable order
      for (const message of fourMessages) {
        await sut.send('test', message)
      }
      // start polling for messages
      sut.pollForMessages()
      // wait generously
      await sleep(6000)
      expect(spy).toHaveBeenCalledTimes(4)
      expect(spy).toHaveBeenNthCalledWith(1, 'one')
      expect(spy).toHaveBeenNthCalledWith(2, 'two')
      expect(spy).toHaveBeenNthCalledWith(3, 'three')
      expect(spy).toHaveBeenNthCalledWith(4, 'four')
    })
  })
})

The strange every other message output.

image

Thanks for your time - also if you notice anything basically wrong with this implementation, how its being created and torn down I'd be keen to know also. Thanks again.

@mod35 mod35 changed the title Only ever other message is pulled from the queue Only every other message is pulled from the queue Aug 14, 2021
@manast
Copy link
Contributor

manast commented Aug 18, 2021

You must realize that job.moveToCompleted(job.data, token) returns the next job in order to eliminate an extra call to Redis.
Please check this: https://docs.bullmq.io/patterns/manually-fetching-jobs#looping-through-jobs

@manast manast closed this as completed Aug 18, 2021
@mod35
Copy link
Author

mod35 commented Aug 19, 2021

@manast Thanks for clearing that up

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants