Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bullmq with redis cluster error #1232

Closed
amirAlamian opened this issue May 5, 2022 · 6 comments
Closed

bullmq with redis cluster error #1232

amirAlamian opened this issue May 5, 2022 · 6 comments

Comments

@amirAlamian
Copy link

amirAlamian commented May 5, 2022

Hi I have this configuration for redis cluster :

import { Queue } from "bullmq";
import  Cluster } from 'ioredis';
 const queue = new Queue("test", {
      connection: new Cluster(
        [
          { host: '127.0.0.1', port: 6379 },
          { host: '127.0.0.1', port: 6380 },
          { host: '127.0.0.1', port: 6381 },
          { host: '127.0.0.1', port: 6382 },
          { host: '127.0.0.1', port: 6383 },
          { host: '127.0.0.1', port: 6384 },
        ],
        { enableOfflineQueue: false },
      ),
      prefix: '{BULLMQ}',
    });

and this is my redis cluster health check result :

192.176.0.3:6379 (c3ced8bc...) -> 1 keys | 5462 slots | 1 slaves.
192.176.0.2:6379 (6116dd14...) -> 0 keys | 5461 slots | 1 slaves.
192.176.0.4:6379 (21a96a96...) -> 0 keys | 5461 slots | 1 slaves.
[OK] 1 keys in 3 masters.
0.00 keys per slot on average.
>>> Performing Cluster Check (using node 192.176.0.3:6379)
M: c3ced8bcc60a43c1832c8e38555cb2abe2eced40 192.176.0.3:6379
   slots:[5461-10922] (5462 slots) master
   1 additional replica(s)
M: 6116dd145795322f3fade7c8c48e250c1cb95ac4 192.176.0.2:6379
   slots:[0-5460] (5461 slots) master
   1 additional replica(s)
S: 03494c6c5d634c4c5db40a358178066280ac937a 192.176.0.5:6379
   slots: (0 slots) slave
   replicates 21a96a96bd9c3b20147225062fc0f1c1e8873ebf
S: d25cb78eadb13070fb2a850d8a7f44fcff08f776 192.176.0.7:6379
   slots: (0 slots) slave
   replicates c3ced8bcc60a43c1832c8e38555cb2abe2eced40
M: 21a96a96bd9c3b20147225062fc0f1c1e8873ebf 192.176.0.4:6379
   slots:[10923-16383] (5461 slots) master
   1 additional replica(s)
S: 6e7e0449aadb82559a0278af9020f2174c90c681 192.176.0.6:6379
   slots: (0 slots) slave
   replicates 6116dd145795322f3fade7c8c48e250c1cb95ac4
[OK] All nodes agree about slots configuration.
>>> Check for open slots...
>>> Check slots coverage...
[OK] All 16384 slots covered.

all the redis instances are in docker and I get this error when I try to create a queue:

ReplyError: CROSSSLOT Keys in request don't hash to the same slot
    at parseError (/node_modules/redis-parser/lib/parser.js:179:12)
    at parseType (/node_modules/redis-parser/lib/parser.js:302:14) {
  command: {
    name: 'eval',
    args: [
      '--[[\n' +
        '  Updates the delay set, by picking a delayed job that should\n' +
        '  be processed now.\n' +
        '    Input:\n' +
        "      KEYS[1] 'delayed'\n" +
        "      KEYS[2] 'wait'\n" +
        "      KEYS[3] 'priority'\n" +
        "      KEYS[4] 'paused'\n" +
        "      KEYS[5] 'meta'\n" +
        "      KEYS[6] event's stream\n" +
        '      KEYS[7] delayed stream\n' +
        "      ARGV[1] queue.toKey('')\n" +
        '      ARGV[2] delayed timestamp\n' +
        '     Events:\n' +
        "      'waiting'\n" +
        ']]\n' +
        'local rcall = redis.call\n' +
        '-- Try to get as much as 1000 jobs at once\n' +
        'local jobs = rcall("ZRANGEBYSCORE", KEYS[1], 0, tonumber(ARGV[2]) * 0x1000,\n' +
        '                   "LIMIT", 0, 1000)\n' +
        'if (#jobs > 0) then\n' +
        '    rcall("ZREM", KEYS[1], unpack(jobs))\n' +
        '    -- check if we need to use push in paused instead of waiting\n' +
        '    local target\n' +
        '    if rcall("HEXISTS", KEYS[5], "paused") ~= 1 then\n' +
        '        target = KEYS[2]\n' +
        '    else\n' +
        '        target = KEYS[4]\n' +
        '    end\n' +
        '    for _, jobId in ipairs(jobs) do\n' +
        '        local priority =\n' +
        '            tonumber(rcall("HGET", ARGV[1] .. jobId, "priority")) or 0\n' +
        '        if priority == 0 then\n' +
        '            -- LIFO or FIFO\n' +
        '            rcall("LPUSH", target, jobId)\n' +
        '        else\n' +
        '            -- Priority add\n' +
        '            rcall("ZADD", KEYS[3], priority, jobId)\n' +
        '            local count = rcall("ZCOUNT", KEYS[3], 0, priority)\n' +
        '            local len = rcall("LLEN", target)\n' +
        '            local id = rcall("LINDEX", target, len - (count - 1))\n' +
        '            if id then\n' +
        '                rcall("LINSERT", target, "BEFORE", id, jobId)\n' +
        '            else\n' +
        '                rcall("RPUSH", target, jobId)\n' +
        '            end\n' +
        '        end\n' +
        '        -- Emit waiting event\n' +
        '        rcall("XADD", KEYS[6], "*", "event", "waiting", "jobId", jobId, "prev",\n' +
        '              "delayed")\n' +
        '        rcall("HSET", ARGV[1] .. jobId, "delay", 0)\n' +
        '    end\n' +
        'end\n' +
        'local nextTimestamp = rcall("ZRANGE", KEYS[1], 0, 0, "WITHSCORES")[2]\n' +
        'local id\n' +
        'if (nextTimestamp ~= nil) then\n' +
        '    nextTimestamp = nextTimestamp / 0x1000\n' +
        '    id = rcall("XADD", KEYS[7], "*", "nextTimestamp", nextTimestamp)\n' +
        'end\n' +
        'return {nextTimestamp, id}\n',
      '7',
      'bull:googleNotificationQueue:delayed',
      'bull:googleNotificationQueue:wait',
      'bull:googleNotificationQueue:priority',
      'bull:googleNotificationQueue:paused',
      'bull:googleNotificationQueue:meta',
      'bull:googleNotificationQueue:events',
      'bull:googleNotificationQueue:delay',
      'bull:googleNotificationQueue:',
      '1651755709752'
    ]
  }
}

Can you tell me what is the problem? Thanks

@amirAlamian
Copy link
Author

amirAlamian commented May 5, 2022

I fix this problem with adding prefix to new QueueScheduler like this:

    this.queueScheduler[queueName] = new QueueScheduler(queueName, {
        connection: this.connectionToRedis,
        prefix: '{BULLMQ}',
      });

@bartoszhernas
Copy link

I just learned after debugging for hours, any classes initialized from bullmq you should check if they accept "prefix" option. Eg. new Worker() also needs it

@bartoszhernas
Copy link

Additionally from other issue, it may happen that you get errors like Missing lock for job X. This can be fixed by removing keyPrefix option from ioredis. In my case I've made separate connections for bullmq (no keyPrefix) and normal connection for rest of my backend (I share redis cluster between projects, so I need keyPrefix)

@manast
Copy link
Contributor

manast commented Aug 3, 2022

The keyPrefix feature in ioredis is not compatible with BullMQ unfortunatelly.

@bartoszhernas
Copy link

Good to know ! This should be like big red warning in Docs and maybe even runtime error if keyPrefix is provided to ioredis? It's not mentioned anywhere as fair as I can tell, and the issues it introduces are hard to debug (and connect to keyPrefix being present).

@manast
Copy link
Contributor

manast commented Aug 3, 2022

I updated the documentation, this was documented in older Bull but not in BullMQ. However the best would be to raise an exception if we detect that the connection is using keyPrefix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants