Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative to named processors? #448

Closed
prescience-data opened this issue Apr 3, 2021 · 11 comments
Closed

Alternative to named processors? #448

prescience-data opened this issue Apr 3, 2021 · 11 comments

Comments

@prescience-data
Copy link

prescience-data commented Apr 3, 2021

Currently migrating from Bull 3, and not certain I understand what the new/alternative implementation of named processors is.

In the existing codebase, there is a server that receives jobs over http, parses the payload and then dispatches the job on a named process ("channel").

// Setup job routing.
const { zone, hardware, action, data } = request
// Define channel, eg "us-west:c5.xlarge:process-data" / "us-east:process-data"
const channel = [ zone || "", hardware || "", action || "process-data" ].filter(part => part !== "").join(":")
const options = {  attempts: 1, timeout: 30000 }

// Dispatch job.
const job: BullJob<SocketPayload> = await bullQueue.add(
  channel,
  data,
  options
)

// Await result.
const result = await job.finished()

In this case, there are workers running on hot instances that subscribe to a channel stack that represent their capability and scope such as:

export class DataProcessor {
 
  public readonly queue: BullQueue = new Queue()
  public readonly channels: string[] = [
    "us-west:c5.xlarge:process-data", 
    "us-west:process-data", 
    "c5.xlarge:process-data", 
    "process-data"
  ]
  // ....
  public async subscribe(hook) {
    for(const channel of this.channels) {
      this.queue.process(channel, 1, hook)
     }
  }
  // ...
}

This means I can send a job to the "us-west:c5.xlarge:process-data" channel to target a specific region and hardware, or just "process-data" if I don't care how the processing is done, all of which run on the on the single "foo-queue" queue.

I can't see a way to use individual queues for this, as that would require an individual worker for each "channel as a queue" which breaks my "concurrency of 1" per instance.

Just want to see what the "new" way to achieve this would be with BullMQ?

PS: Note that the architecture described is just an abstract shorthand, in this above case there are probably ways to refactor it but for the sake of this problem assume that the use-case is very specific and needs this "one long-running worker, one queue, many channels, single concurrency per worker" pattern.)

Additionally, naturally the "named processors" in Bull 3 break the "single concurrency" requirement, but currently we immediately set a flag on the Processor class which rejects/returns any intermediate jobs while the worker pauses itself on the queue, so that might be useful for anyone kind enough to give feedback on how to achieve this.

@manast
Copy link
Contributor

manast commented Apr 6, 2021

Functionality wise, Bullv3 and BullMQ provides the same, it just that the syntax is different. In BullMQ you only define 1 processor, and then inside the processor you can choose different code paths depending on the "job.name" property. In your case this will mean a bit more of code, but named processors where causing a lot of confusion so we decided to remove the syntax sugar and made it more explicit.

@prescience-data
Copy link
Author

prescience-data commented Apr 6, 2021

Thanks so much for the reply!

So if a job comes in on queue foo-queue that matches none of the target job names ("channels"), wouldn't it just fail? Or is there a way to have all workers "check" the job prior to accepting it somehow?

factories.ts

// ./factories.ts
export const makeWorker = (
  processor,
  { name, connection }: QueueConfig
): BullWorker => {
  return new Worker(name, processor, {
    connection,
    concurrency: 1
  })
}

hooks.ts

This is the part I'm not sure I fully understand how to handle?

// ./hooks.ts
export const processData = (channels: string[]): JobProcessorHook => async (job: Job): Promise<SomeAsyncResult> => {
  if (!channels.includes(job.name)) {
    throw new Error(`Worker not assigned job name ${job.name}`)
  }

  // ... Handle job logic
  return await doSomeAsyncProcessing()
}

worker.ts

// ./worker.ts
import { channels } from "./channels"
import { processData } from "./hooks"
import { makeWorker } from "./factories"

const name: string = "foo-queue"
const channels: string[] = [
  "us-west:c5.xlarge:process-data",
  "us-west:process-data",
  "c5.xlarge:process-data",
  "process-data"
]

const myDataWorker = makeWorker(processData(channels), { name, connection })

@acro5piano
Copy link

acro5piano commented Apr 7, 2021

I had a similar confusion to @prescience-data .

In BullMQ you only define 1 processor, and then inside the processor you can choose different code paths depending on the "job.name" property

So single processor (worker) and single queue with full type-safety code should be like this?

import { Queue, Worker, Job } from 'bullmq'

type Job_A = { name: 'JOB_A', payload: { foo: string } }
type Job_B = { name: 'JOB_B', payload: { bar: number } }
type JobArgument = 
  | Job_A
  | Job_B

// Create main queue and main worker
const mainQueue = new Queue('main');
const mainWorker = new Worker('main', async (job: Job<JobArgument>) => {
  switch (job.name) {
    case 'JOB_A':
      return processJobA(job)
    case 'JOB_B':
      return processJobB(job)
    default:
      throw new Error(`Invalid job name: ${job.name}`)
  }
}, { concurrency: 50 })

// Create processors
async function processJobA(job: Job<Job_A>) {
  // process with the job
}

export function addTaskToQueue(arg: JobArgument) {
  return mainQueue.add(arg.name, arg.payload)
}

// Somewhere in code
await addTaskToQueue({ name: 'JOB_A', payload: { foo: 'pink' } });

@prescience-data
Copy link
Author

My concern is:

"If the worker throws an exception during its processing, the job will automatically be moved to the "failed" status."

I don't think that's what I actually want to happen in this case (moving the job to failed), it seems like I need the worker to simply not receive the job in the first place and let and let the correct worker assigned to that job name pick it up?

@manast
Copy link
Contributor

manast commented Apr 7, 2021

I don't think that's what I actually want to happen in this case (moving the job to failed), it seems like I need the worker to simply not receive the job in the first place and let and let the correct worker assigned to that job name pick it up?

Thats not possible, neither with BullMQ nor Bull v3.

@prescience-data
Copy link
Author

prescience-data commented Apr 7, 2021

It seemed to work in Bull 3?
But I'm definitely not clear on what was actually happening under the hood...
The flow I had was each worker "subscribed" to the named processors it was responsible for, then paused itself locally on the queue immediately after receiving a job.

@manast
Copy link
Contributor

manast commented Apr 7, 2021

It seemed to work in Bull 3?
But I'm definitely not clear on what was actually happening under the hood...
The flow I had was each worker "subscribed" to the named processors it was responsible for, then paused itself locally on the queue immediately after receiving a job.

I think in bull you get an exception but the job does not fail, so the job probably gets stalled and then picked up later. If this mechanism worked for you it was most of pure luck than by bull's design.

@prescience-data
Copy link
Author

prescience-data commented Apr 7, 2021

😂
Ok so is there a potential solution by adding more queues?

I've switched to manual job fetching, but is there a way to interrupt all the (await worker.getNextJob(token)) simultaneously when one worker (which would now be a cluster of workers I suppose?) accepts a job?

My only concern with this approach is it exponentially increases the number of queues, so if I'm understanding the Taskforce.sh pricing it will be $9.90/m * n queues (which might make it unfeasible very fast lol 😅)... (I just re-read and misunderstood the pricing)

@manast
Copy link
Contributor

manast commented Apr 7, 2021

@prescience-data not sure I have a complete picture of your use case. Basically you want to have 1 queue but depending on the type of job process it in one or another worker ? and the question that follows is why is it important that they run in different workers depending on types?

@prescience-data
Copy link
Author

prescience-data commented Apr 7, 2021

so the use case in a bit sharper detail is

  • a worker represents a specific device in a specific region with a specific role
  • (ideally) I can send a job to:
    • a) the fully qualified signature (ie "hardware-1:region-2:role-5")
    • b) a partial of the signature (ie "hardware-1:role-5" or "region-2:role-5")
    • c) the global queue (ie "all")
      and the worker should respond to any of those calls (but not "hardware-4" if that is not a match)

Currently, in Bull 3, I connect to the main "company-name" queue, then subscribe to all the named processors:

  /**
   * Connects to the all the required channels and sets a process listener on each queue to handle incoming jobs.
   * Subscriptions are logged as an array of objects showing the channel and the Queue processor.
   *
   * @return {Promise<number>}
   */
  public async subscribeToChannels(): Promise<number> {
    const channels: string[] = await this.getChannels()
    let counter = 0

    channels.forEach((channel) => {
      logger.info(
        `Subscribing device ${this.serial} ${this.instance} to ${channel}`
      )
      try {
        // Do not await.
        queue.process(channel, 1, this._handleJob.bind(this))
        counter++
      } catch (err) {
        logger.error(err.message)
      }
    })

    // Report the to administrator how many queues were connected.
    logger.success(`Subscribed to ${counter} queues.`)
    // Return the number of connect channels.
    return counter
  }

and then pause when a job is received:

  /**
   * Pauses all subscriptions while a worker is processing a job on another channel.
   * Usually called by the server via a service job request.
   *
   * @return {Promise<void>}
   */
  public async pause(): Promise<void> {
    // Immediately locks job acceptance to reject new jobs while attempting to pause.
    this.startWorking()
    // Only attempt a disconnect if subscriptions have been booted.
    if (!!queue) {
      logger.wait(`Pausing all subscriptions...`)
      // Pause locally (just this worker) by passing true. Do not await.
      queue.pause(true)
    } else {
      // Unlocks job acceptance.
      this.stopWorking()
      logger.warn(`No subscriptions to pause...`)
    }
  }

@prescience-data
Copy link
Author

Ok for future reference, I found the way to achieve the desired outcomes in this use case was with the "Manually Fetch Jobs" pattern.

This way you can control the worker "channel" listeners centrally without needing to pause and unpause locally, which turns out to be far cleaner and atomic than the flow I described above.

See this discussion for additional context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants