Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection to Redis #53

Closed
root-io opened this issue Oct 28, 2019 · 21 comments
Closed

Connection to Redis #53

root-io opened this issue Oct 28, 2019 · 21 comments
Labels
question Further information is requested

Comments

@root-io
Copy link
Contributor

root-io commented Oct 28, 2019

bullmq: v1.0.1

I'm not sure how to pass the redis connection. I tried different opts like from the doc (https://docs.bullmq.io/guide/connections) but it either crash with error or connects to 127.0.0.1
am I doing something wrong ?

const IORedis = require('ioredis');

const queue = new Queue('myqueue', new IORedis(process.env.REDIS_URL));
const queueEvents = new QueueEvents('myqueueevents', new IORedis(process.env.REDIS_URL));

const worker = new Worker('myworker', async job => { ... }, new IORedis(process.env.REDIS_URL));
@stansv
Copy link
Contributor

stansv commented Oct 28, 2019

Hi, yes,

new Queue('myqueue', new IORedis(process.env.REDIS_URL));

should be

new Queue('myqueue', { redis: new IORedis(process.env.REDIS_URL) });

@stansv stansv added the question Further information is requested label Oct 28, 2019
@root-io
Copy link
Contributor Author

root-io commented Oct 28, 2019

Hey, thanks for your quick answer Stanislav!

I already tried with:

const IORedis = require('ioredis');

const queue = new Queue('myqueue', { redis: new IORedis(process.env.REDIS_URL) });
const queueEvents = new QueueEvents('myqueueevents', { redis: new IORedis(process.env.REDIS_URL) });

const worker = new Worker('myworker', async job => { ... }, { redis: new IORedis(process.env.REDIS_URL) });

Unfortunatly I get Error: connect ECONNREFUSED 127.0.0.1:6379

I'm not trying to connect to 127.0.0.1 but to a remote url.

@stansv
Copy link
Contributor

stansv commented Oct 28, 2019

I've also tried and can confirm this is a bug. I cannot find a way to set custom connection host/port, it always tries to connect to default 127.0.0.1:6379.

Oops, I did a mistake in my test, I'll update this message with new results. 😕

UPD So yes, this should work

  const queue = new Queue('myqueue', { connection: new IORedis(process.env.REDIS_URL) });
  const queueEvents = new QueueEvents('myqueueevents', { connection: new IORedis(process.env.REDIS_URL) });

  const worker = new Worker('myworker', async job => { }, { connection: new IORedis(process.env.REDIS_URL) });

We need to fix docs with correct examples.

@stansv stansv added bug Something isn't working question Further information is requested and removed question Further information is requested bug Something isn't working labels Oct 28, 2019
@root-io
Copy link
Contributor Author

root-io commented Oct 28, 2019

Awesome, it works with connection, thanks! (I thought I tested this one, damn)

@root-io root-io closed this as completed Oct 29, 2019
@euanmillar
Copy link

Hi. With the greatest respect to you for the library, I just wasted a long time on this trying to debug my system as the docs are incorrect. I would really appreciate it if you could update the docs with this correct example. So I think you need to reopen this issue.

@manast
Copy link
Contributor

manast commented Sep 11, 2020

@euanmillar can you please point out where in the documentation the example incorrect is, and I will fix it asap.
Also I am wondering, didn't you get any typescript error or are you using the library from javascript?

@euanmillar
Copy link

euanmillar commented Sep 11, 2020

Hi @manast thanks for your reply. The docs say this:

import { Queue, Worker } from 'bullmq';
import IORedis from 'ioredis';

const connection = new IORedis();

// Reuse the ioredis instance
const myQueue = new Queue('myqueue', { connection });
const myWorker = new Worker('myworker', async (job)=>{}, { connection });

But @stansv says it should be:

{ connection: new IORedis(process.env.REDIS_URL) }

So I think that this line:

const connection = new IORedis();

should be changed to:

const connection = new IORedis(process.env.REDIS_URL);

But to be honest, I am following @stansv example and doing this:

const newQueue = new Queue("myqueue", {
    connection: new IORedis(process.env.REDIS_URL)
  })

and I am still getting the: ECONNREFUSED 127.0.0.1:6379 error. My env variable is correctly set to "redis"

I still cannot get it to connect. Also I am not getting any TypeScript error. I am using TypeScript

@euanmillar
Copy link

Here is my code FYI:

import { REDIS_HOST, QUEUE_NAME } from '@webhooks/constants'
import { Queue, QueueEvents } from 'bullmq'
import { EventEmitter } from 'events'
import { logger } from '@webhooks/logger'
import * as IORedis from 'ioredis'

type QueueEventType = {
  jobId: string
  delay?: number
  data?: string
  returnvalue?: string
  prev?: string
  failedReason?: string
} // couldnt find a type for this so mad it myself

let webhookQueue: Queue

export interface IQueueConnector {
  getQueue: () => Promise<Queue | null>
}

export const getQueue = () => {
  return webhookQueue
}

export async function startQueue() {
  try {
    webhookQueue = initQueue()
  } catch (error) {
    logger.error(`Can't init webhook queue: ${error}`)
    throw Error(error)
  }
}

async function removeJob(myQueue: Queue, id: string) {
  const job = await myQueue.getJob(id)
  if (job) {
    job.remove()
  }
}

export function initQueue(): Queue {
  logger.info(`Initialising queue on REDIS_HOST: ${REDIS_HOST}`)
  const newQueue = new Queue(QUEUE_NAME, {
    connection: new IORedis(REDIS_HOST)
  })

  EventEmitter.defaultMaxListeners = 50

  const queueEvents = new QueueEvents(QUEUE_NAME, {
    connection: new IORedis(REDIS_HOST)
  })

  queueEvents.on('waiting', ({ jobId }: QueueEventType) => {
    logger.info(`A job with ID ${jobId} is waiting`)
  })

  queueEvents.on('active', ({ jobId, prev }: QueueEventType) => {
    logger.info(`Job ${jobId} is now active; previous status was ${prev}`)
  })

  queueEvents.on('completed', ({ jobId, returnvalue }: QueueEventType) => {
    logger.info(`${jobId} has completed and returned ${returnvalue}`)
    removeJob(newQueue, jobId)
  })

  queueEvents.on('failed', ({ jobId, failedReason }: QueueEventType) => {
    logger.info(`${jobId} has failed with reason ${failedReason}`)
  })
  return newQueue
}

@manast
Copy link
Contributor

manast commented Sep 11, 2020

whats the output of this line in the code above?

logger.info(`Initialising queue on REDIS_HOST: ${REDIS_HOST}`)

@euanmillar
Copy link

euanmillar commented Sep 11, 2020

It outputs this: Initialising queue on REDIS_HOST: redis

Looking at the ioredis docs:
https://github.com/luin/ioredis

I think that the connection should be like this:

const connection = new IORedis({
  port: 6379, // Redis port
  host: process.env.REDIS_URL
});

@manast
Copy link
Contributor

manast commented Sep 11, 2020

It can also be a string, but maybe you need to format it correctly like "redis://redis:6379"

@euanmillar
Copy link

euanmillar commented Sep 11, 2020

@manast In the end I need to pass an IP as a string "0.0.0.0" rather than 'redis' and it seems to be connecting without errors now.

So, back to the documentation ... I think that the bullmq docs on this page, are incorrect and should be changed:

This:

import { Queue, Worker } from 'bullmq'

const connection1 =  {
  host: myredis.taskforce.run,
  port: 32856
}

// Create a new connection in every instance
const myQueue = new Queue('myqueue', { connection: {
  host: myredis.taskforce.run,
  port: 32856
}});

const myWorker = new Worker('myworker', async (job)=>{}, { connection: {
  host: myredis.taskforce.run,
  port: 32856
}});

...should be changed to:

import { Queue, Worker } from 'bullmq'
import IORedis from 'ioredis'

// Create a new connection in every instance
// Refer to the https://github.com/luin/ioredis docs for your connection settings
// new Redis(); // Connect to 127.0.0.1:6379
// new Redis(6380); // 127.0.0.1:6380
// new Redis(6379, "192.168.1.1"); // 192.168.1.1:6379
// new Redis("/tmp/redis.sock");
// new Redis({
//   port: 6379, // Redis port
//   host: "127.0.0.1", // Redis host
//   family: 4, // 4 (IPv4) or 6 (IPv6)
//   password: "auth",
//   db: 0,
// });

const myQueue = new Queue('myqueue', { connection: new IORedis({
  host: "0.0.0.0",
  port: 6379
})});

const myWorker = new Worker('myworker', async (job)=>{}, { connection: new IORedis({
  host: "0.0.0.0",
  port: 6379
})});

... and this:

import { Queue, Worker } from 'bullmq';
import IORedis from 'ioredis';

const connection = new IORedis();

// Reuse the ioredis instance
const myQueue = new Queue('myqueue', { connection });
const myWorker = new Worker('myworker', async (job)=>{}, { connection });

... should be changed to:

import { Queue, Worker } from 'bullmq';
import IORedis from 'ioredis';


// Create a connection to be re-used

const connection = new IORedis({
  host: "0.0.0.0",
  port: 6379
});

// Refer to the https://github.com/luin/ioredis docs for your connection settings
// new Redis(); // Connect to 127.0.0.1:6379
// new Redis(6380); // 127.0.0.1:6380
// new Redis(6379, "192.168.1.1"); // 192.168.1.1:6379
// new Redis("/tmp/redis.sock");
// new Redis({
//   port: 6379, // Redis port
//   host: "127.0.0.1", // Redis host
//   family: 4, // 4 (IPv4) or 6 (IPv6)
//   password: "auth",
//   db: 0,
// });

// Reuse the ioredis instance
const myQueue = new Queue('myqueue', { connection });
const myWorker = new Worker('myworker', async (job)=>{}, { connection });

@manast
Copy link
Contributor

manast commented Sep 11, 2020

I do not think the current documentation is wrong actually... if you pass an instance of IORedis then you need to read in ioredis documentation on how to create an instance that works for your setup. Otherwise you can pass the redis settings directly to the connection and it works.
And using 0.0.0.0 as your redis instance address is definitely wrong: https://serverfault.com/questions/876698/whats-the-difference-in-localhost-vs-0-0-0-0

@euanmillar
Copy link

euanmillar commented Sep 11, 2020

Huge thanks @manast !

I think the docs can be improved if you explicitly state "if you pass an instance of IORedis then you need to read in ioredis documentation on how to create an instance that works for your setup." as I am suggesting in the comments in my code example above. I thought 0.0.0.0 seemed to be right for my setup as I am running in a Docker Swarm and while it worked once it has stopped working since. Thanks for the right URL: redis://redis:6379 . The below is also fine but note the quotes required around the host string. Without them, you will get a Type error:

{
  host: "myredis.taskforce.run",
  port: 32856
}

Handlers for the connection errors also stop the logs going crazy and you might want to additionally document the connection like this TypeScript example:

let redisConnection: IORedis.Redis

export function getRedis(): IORedis.Redis {
  return redisConnection
}

const connect = async (): Promise<void> => {
  try {
    redisConnection = new IORedis({
  host: "myredis.taskforce.run",
  port: 32856
})

    redisConnection.on('error', error => {
      logger.error('Redis connection error', error)
      process.exit(1)
    })

    redisConnection.on('exit', () => {
      logger.error(
        'Exiting...listener count',
        redisConnection.listenerCount('error')
      )
    })
  } catch (err) {
    logger.error(`Cant create Redis instance: ${err}`)
  }
}
type QueueEventType = {
  jobId: string
  delay?: number
  data?: string
  returnvalue?: string
  prev?: string
  failedReason?: string
}

export function initWorker(name: string, connection: IORedis.Redis): Worker {
  return new Worker(
    name,
    async job => {
   // do stuff
    },
    { connection }
  )
}

export function initQueue(): Queue {
  const connection = getRedis()
  logger.info(
    `Initialising queue on REDIS_HOST: ${REDIS_HOST} with connection: ${connection}`
  )
  const newQueue = new Queue(QUEUE_NAME, {
    connection
  })

  EventEmitter.defaultMaxListeners = 50

  const queueEvents = new QueueEvents(QUEUE_NAME, {
    connection
  })

  queueEvents.on('waiting', ({ jobId }: QueueEventType) => {
    logger.info(`A job with ID ${jobId} is waiting`)
  })

  queueEvents.on('active', ({ jobId, prev }: QueueEventType) => {
    logger.info(`Job ${jobId} is now active; previous status was ${prev}`)
  })

  queueEvents.on('completed', ({ jobId, returnvalue }: QueueEventType) => {
    logger.info(`${jobId} has completed and returned ${returnvalue}`)
    removeJob(newQueue, jobId)
  })

  queueEvents.on('failed', ({ jobId, failedReason }: QueueEventType) => {
    logger.info(`${jobId} has failed with reason ${failedReason}`)
  })

const myWorker: Worker = initWorker(QUEUE_NAME, connection)

  myWorker.on('drained', (job: Job) => {
    logger.info(`Queue is drained, no more jobs left`)
  })

  myWorker.on('completed', (job: Job) => {
    logger.info(`job ${job.id} has completed`)
  })

  myWorker.on('failed', (job: Job) => {
    logger.info(`job ${job.id} has failed`)
  })
  return newQueue
}

@oallanmendes
Copy link

I've also tried and can confirm this is a bug. I cannot find a way to set custom connection host/port, it always tries to connect to default 127.0.0.1:6379.

Oops, I did a mistake in my test, I'll update this message with new results. 😕

UPD So yes, this should work

  const queue = new Queue('myqueue', { connection: new IORedis(process.env.REDIS_URL) });
  const queueEvents = new QueueEvents('myqueueevents', { connection: new IORedis(process.env.REDIS_URL) });

  const worker = new Worker('myworker', async job => { }, { connection: new IORedis(process.env.REDIS_URL) });

We need to fix docs with correct examples.

This solved my problem! November 2020.
Thanks!!!

@grassynull33
Copy link

FYI, I fixed this issue by passing a Redis connection to the Worker, which I somehow forgot to do. Everything worked as expected after!

@dan003400
Copy link

Is there any reason connection does not accept just a redis connection string without the need of importing ioredis?

This seems very odd that you can pass in host / port but not only a string with full connection string...

@lewispham
Copy link

lewispham commented Mar 21, 2021

I think the docs should mention that QueueEvents and Worker does not reuse connections passed into Queue. So you need to pass the initialized connection to all QueueEvents, Worker and Queue if you want to reuse it. This behavior is actually quite weird imho. Shouldn't QueueEvents and Worker reuse the connection passed into Queue by default? New connections could be internally created but their configurations should be reused right?

@AdamMomen
Copy link

@lewispham You are right, thanks so much for mentioning this, the docs were not clean about this point.

@hardcodet
Copy link

hardcodet commented Jan 30, 2022

Migrating from Bull to BullMQ, and ran into the same issue as people in this thread. We have our Redis connection string in our config settings, and TBH, I'd rather have BullMQ manage its IORedis connection than creating it myself - as a consumer of BullMQ, those are internals that shouldn't be my business. A URL is a very simple approach that works for local development and our setups in the cloud. So maybe it's worth following up on the question from @dan003400 :

Is there any reason connection does not accept just a redis connection string without the need of importing ioredis?
This seems very odd that you can pass in host / port but not only a string with full connection string...

Could we place that as a feature request, @manast ?

@manast
Copy link
Contributor

manast commented Jan 30, 2022

@hardcodet sure, if you create a feature request we can add this in a couple of days (I hope).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

10 participants