Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: how to connect redis cluster using BullMQ ? #409

Closed
restuwahyu13 opened this issue Feb 23, 2021 · 0 comments
Closed

Question: how to connect redis cluster using BullMQ ? #409

restuwahyu13 opened this issue Feb 23, 2021 · 0 comments

Comments

@restuwahyu13
Copy link

restuwahyu13 commented Feb 23, 2021

please check my code, my problem is I can't connect to the redis cluster, how do I get connected using the redis cluster, can you give me a simple example? thank you

Custom Queque

import { Queue } from 'bullmq'
import Redis from 'ioredis'
import { IPublisher } from '../interface/interface.publisher'

export class Publisher {
	private serviceName: string
	private speakerName: string
	private connections: Array<Record<string, any>>

	constructor(option: Readonly<IPublisher>) {
		this.serviceName = option.serviceName
		this.speakerName = option.speakerName
		this.connections = option.connections
	}

	queue(): InstanceType<typeof Queue> {
		const clusterConnection = new Redis.Cluster(this.connections)
		const serviceName = new Queue(this.serviceName, { connection: clusterConnection, limiter: { duration: 1000, max: 25 }  }) as Queue<any, any, string>
		return serviceName
	}

	async speaker(data: Record<string, any>, options?: Record<string, any>): Promise<void> {
		await this.queue().add(this.speakerName, { ...data }, { ...options })
	}
}

Custom Worker

import Redis from 'ioredis'
import { Worker, QueueEvents } from 'bullmq'
import consola from 'consola'
import { ISubscriber } from '../interface/interface.subscriber'

export class Subscriber {
	private serviceName: string
	private listenerName: string
	private queueEvent: InstanceType<typeof QueueEvents>
	private connections: Array<Record<string, any>>

	constructor(option: Readonly<ISubscriber>) {
		this.serviceName = option.serviceName
		this.listenerName = option.listenerName
		this.connections = option.connections
		this.queueEvent = new QueueEvents(this.serviceName)
	}

	private _worker(): void {
		const clusterConnection = new Redis.Cluster(this.connections)
		new Worker(
			this.serviceName,
			async (job) => {
				if (job.name == this.listenerName) {
					await this.queueEvent.emit(this.listenerName, JSON.stringify({ data: job.data }))
					return job.name
				}
			},
			{ connection: clusterConnection, limiter: { duration: 1000, max: 25 } }
		) as Worker<any, any, string>
	}

	private async _notifications(): Promise<void> {
		await this._worker()
		this.queueEvent.on('completed', (job) => consola.success(`${this.listenerName} completed ${job.jobId}`))
		this.queueEvent.on('waiting', (job) => consola.info(`${this.listenerName} waiting ${job.jobId}`))
		this.queueEvent.on('active', (job) => consola.info(`${this.listenerName} active ${job.jobId}`))
		this.queueEvent.on('failed', (job) => consola.error(`${this.listenerName} failed ${job.jobId}`))
	}

	async listener(): Promise<Record<string, any>> {
		await this._notifications()
		return new Promise((resolve, _) => {
			this.queueEvent.on(this.listenerName, async (data) => {
				resolve(JSON.parse(data).data)
			})
		})
	}
}

Login Queue

import { Publisher } from '../../utils/util.publisher'

export const loginPublisher = new Publisher({
	serviceName: 'login',
	speakerName: 'login:speaker',
	connections: [
		{ host: '127.0.0.1', port: 6379 },
		{ host: '127.0.0.1', port: 6380 },
		{ host: '127.0.0.1', port: 6381 }
	]
})

export const setLoginPublisher = async (data: Record<string, any>): Promise<any> => {
	if (Object.keys(data).length > 0 && data) {
		await loginPublisher.speaker({ ...data }, { removeOnComplete: 1000, removeOnFail: 1000 })
	} else {
		await loginPublisher.speaker({}, {})
	}
}

Login Worker

import { Subscriber } from '../../utils/util.subscriber'
import { userSchema } from '../../models/model.user'
import { UsersDTO } from '../../dto/dto.users'
import { IUser } from '../../interface/interface.user'

const loginSubscriber = new Subscriber({
	serviceName: 'login',
	listenerName: 'login:speaker',
	connections: [
		{ host: '127.0.0.1', port: 6379 },
		{ host: '127.0.0.1', port: 6380 },
		{ host: '127.0.0.1', port: 6381 }
	]
})

export const getLoginSubscriber = (): Promise<Record<string, any>> => {
	return new Promise((resolve, reject) => {
		loginSubscriber.listener().then(async (res: IUser) => {
			try {
				const checkUser: UsersDTO = await userSchema.findOne({ email: res.email })

				if (!checkUser) {
					resolve({
						statusCode: 404,
						message: 'user account is not exist, please register new account'
					})
				}

				if (checkUser.active == false) {
					resolve({
						statusCode: 400,
						message: 'user account is not active, please resend new activation token'
					})
				}

				await userSchema.findByIdAndUpdate(checkUser._id, {
					firstLogin: new Date(),
					updatedAt: new Date()
				})

				resolve({
					statusCode: 200,
					message: 'login successfully',
					data: checkUser
				})
			} catch (err) {
				reject({
					statusCode: 500,
					message: 'internal server error'
				})
			}
		})
	})
}

i tried redis cluster with bull and it works how about bullmq ?, I tried the same thing but it doesn't work

const Queue = require('bull')
const Redis = require('ioredis')

const clusterQueue = new Queue('Name', {
	prefix: '{clusterQueue}',
	createClient: () =>
		new Redis.Cluster([
			{ host: '127.0.0.1', port: 6379 },
			{ host: '127.0.0.1', port: 6380 },
			{ host: '127.0.0.1', port: 6381 }
		])
})

clusterQueue.process((job, done) => {
	console.log(job.data)
	done()
})

clusterQueue.add({ heroe: 'superman' })
@manast manast closed this as completed Feb 23, 2021
@taskforcesh taskforcesh locked and limited conversation to collaborators Feb 23, 2021

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants