Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only run one job at a time per queue name (multiple worker instances) #274

Closed
phips28 opened this issue Oct 1, 2021 · 17 comments
Closed

Comments

@phips28
Copy link
Contributor

phips28 commented Oct 1, 2021

Is it possible to only run one job at a time per queue name, in a distributed worker environment (multiple servers subscribe/listening).

For example:
We have a queue called: „send-mail“.
Now we push jobs to that queue.
We run multiple servers subscribing to that queue (and a lot more ofc).
Only one job of one queue should/must be active at a time.

Curently when we start this, each server takes one job. teamConcurrecy: 1 is for each node process, not for a queue.

Can we somehow configure that? Any hint?

@timgit
Copy link
Owner

timgit commented Oct 2, 2021

This is what I refer to as the "distributed mutex" problem. I have this use case in our containers for schema migrations during container bootstrapping, for example. I don't use pg-boss for that directly, but I did build it based on how pg-boss handles its own internal schema migrations.

I think this technique could find its way into a feature, since the building blocks are in place in postgres via transactional advisory locks.

I'm just brainstorming here, but I think we'd have to add a new table to track the mutexes and add monitoring to prevent a dead worker from blocking others.

@timgit
Copy link
Owner

timgit commented Oct 2, 2021

Another idea would be to block fetching from a queue if the active count is > 0. I need to spend more time thinking about that as well

@phips28
Copy link
Contributor Author

phips28 commented Oct 2, 2021

I also thought about changing the query to take active count into account. (but wanted to ask first if I maybe missed something)
I will try that tomorrow in a quick and dirty way to test this 🙂

@phips28
Copy link
Contributor Author

phips28 commented Oct 2, 2021

A quickfix would be the following in the fetchNextJob query:

WITH nextJob as (
    SELECT id
    FROM pgboss.job
    WHERE state < 'active'
      AND name LIKE 'fetch_ftx_16'
      AND startAfter < now()
+     AND (SELECT count(*) FROM pgboss.job WHERE name LIKE 'fetch_ftx_16' AND state = 'active') = 0
    ORDER BY priority desc, createdOn, id
    LIMIT 1 FOR UPDATE SKIP LOCKED
)
UPDATE pgboss.job j
SET state      = 'active',
    startedOn  = now(),
    retryCount = CASE WHEN state = 'retry' THEN retryCount + 1 ELSE retryCount END
FROM nextJob
WHERE j.id = nextJob.id
RETURNING j.*, EXTRACT(epoch FROM expireIn) as expire_in_seconds;

With some options to enable this.

Not sure about performance impacts on larger tables.

Costs are going from:

+------------------------------------------------------------------------------------------------------------------------+
|QUERY PLAN                                                                                                              |
+------------------------------------------------------------------------------------------------------------------------+
|Limit  (cost=6.47..6.47 rows=1 width=28)                                                                                |
|  ->  Sort  (cost=6.47..6.47 rows=1 width=28)                                                                           |
|        Sort Key: priority DESC, createdon, id                                                                          |
|        ->  Seq Scan on job  (cost=0.00..6.46 rows=1 width=28)                                                          |
|              Filter: ((state < 'active'::pgboss.job_state) AND (name ~~ 'fetch_ftx_16'::text) AND (startafter < now()))|
+------------------------------------------------------------------------------------------------------------------------+

to

+------------------------------------------------------------------------------------------------------------------------------+
|QUERY PLAN                                                                                                                    |
+------------------------------------------------------------------------------------------------------------------------------+
|Limit  (cost=12.83..12.84 rows=1 width=28)                                                                                    |
|  InitPlan 1 (returns $0)                                                                                                     |
|    ->  Aggregate  (cost=6.35..6.36 rows=1 width=8)                                                                           |
|          ->  Seq Scan on job job_1  (cost=0.00..6.34 rows=1 width=0)                                                         |
|                Filter: ((name ~~ 'fetch_ftx_16'::text) AND (state = 'active'::pgboss.job_state))                             |
|  ->  Sort  (cost=6.47..6.48 rows=1 width=28)                                                                                 |
|        Sort Key: job.priority DESC, job.createdon, job.id                                                                    |
|        ->  Result  (cost=0.00..6.46 rows=1 width=28)                                                                         |
|              One-Time Filter: ($0 = 0)                                                                                       |
|              ->  Seq Scan on job  (cost=0.00..6.46 rows=1 width=28)                                                          |
|                    Filter: ((state < 'active'::pgboss.job_state) AND (name ~~ 'fetch_ftx_16'::text) AND (startafter < now()))|
+------------------------------------------------------------------------------------------------------------------------------+

@timgit
Copy link
Owner

timgit commented Oct 2, 2021

Based on my understanding of SKIP LOCKED, it won't be as simple as excluding active state during fetch since 2 workers may arrive at the same instant and will select 2 different jobs. I don't think we will be able to avoid adding a new mutex tracking table for this yet.

In regards to your sample above, adding an aggregation subquery to the fetch logic would likely harm perf too much on large tables.

@phips28
Copy link
Contributor Author

phips28 commented Oct 2, 2021

We are now testing this since a few hours in production. We have 147 queues. But "only" < 5000 jobs in the job table.
And it works. We fetch every 2 sec (newJobCheckInterval: 2000) at least when the worker is idle.
So no performance impacts on our side, and also no workers asking in at the same time.

But I have no idea how to make it safe to really get only one job with that subquery. But when a second worker gets a second job of a queue, it does not really matter for our system. It should not be 10 jobs running at once :D

@phips28
Copy link
Contributor Author

phips28 commented Oct 2, 2021

Another approach I am testing is the advisory lock:

pg_advisory_xact_lock(key bigint) obtains exclusive transaction level advisory lock. It works the same as pg_advisory_lock, except the lock is automatically released at the end of the current transaction and cannot be released explicitly.

Use of string/queuename for the lock func: https://stackoverflow.com/a/29360766

so we could obtain a lock in the fetchNextJob.

One question I have here: does pg-boss start a new transaction before each new fetch?
like:

  1. BEGIN transaction_fetch
  2. FetchNextJob()…
  3. If job found, execute it and wait for done
  4. update state
  5. COMMIT (saves everything)

Then we could aquire the advisory lock after step 1. and it gets released automatically on commit.

@timgit
Copy link
Owner

timgit commented Oct 3, 2021

Advisory locks are currently used only for schema migrations and internal maintenance commands. See locked() in plans.js.

The architecture of a queuing system is scalable if concurrency is not limited and merely a factor of available workers. SKIP LOCKED was added to postgres specifically to address this need. At the time, only advisory locks were available, and they have their own limitations and challenges. I refer to this post to help remind me sometimes.

Where I'm coming from in my thinking about this is "Distributed mutexes are not distributed queues". The goal is not to distribute work, but rather guarantee concurrency=1 on a queue by queue basis.

Give all of this, I don't yet feel like the standard job fetch command should be responsible for both use cases. This may be a good reason to finally create a queue table to represent the mutex.

@phips28
Copy link
Contributor Author

phips28 commented Oct 3, 2021

Yes I understand 🙂 What about the transaction? Does it create one for each fetch or is it autocommit?

I will create a MVP tofay for us. Lets see how far we get 🚀

@phips28
Copy link
Contributor Author

phips28 commented Oct 3, 2021

My first try was to get an advisory lock before executing the nextJob query.

    BEGIN;
    SELECT pg_advisory_xact_lock('hashtext($1)');

    WITH nextJob as (
      SELECT id
      FROM pgboss.job
      WHERE state < 'active'
        AND name LIKE $1
        AND startAfter < now()
        AND (SELECT count(*) FROM pgboss.job WHERE name LIKE $1 AND state = 'active') = 0
      ORDER BY priority desc, createdOn, id
      LIMIT $2
      FOR UPDATE SKIP LOCKED
    )
    UPDATE pgboss.job j SET
      state = 'active',
      startedOn = now(),
      retryCount = CASE WHEN state = 'retry' THEN retryCount + 1 ELSE retryCount END
    FROM nextJob
    WHERE j.id = nextJob.id
    RETURNING j.id, name, data, EXTRACT(epoch FROM expireIn) as expire_in_seconds;

    COMMIT;

But that results in an error: cannot insert multiple commands into a prepared statement.
As I've read in the docs, pg_prepare does not support this (as the error says).

Now comes an ugly part 🙈 (just for MVP)

I changed the fetch function to do my own var-replace (I know, without a pg_prepare the db is at risk, its just for the sake of testing - but not really risky, because its not user input, just my own config of the queue names ;) so should be safe here, just ugly). And then I am able to execute the advisory lock + query inside a transaction. And I also need to get the correct result (UPDATE) of the query.

  async fetch (name, batchSize, options = {}) {
    const values = Attorney.checkFetchArgs(name, batchSize, options)

    let command = this.nextJobCommand(options.includeMetadata || false, options.onlyOneJobActivePerQueue || false);
    let preparedValues = [values.name, batchSize || 1];
    if(options.onlyOneJobActivePerQueue) {
      command = command.replace(/\$1/g, '\'' + values.name + '\'')
      command = command.replace(/\$2/g, batchSize || 1)
      console.log(command);
      // remove values, otherwise we would get `cannot insert multiple commands into a prepared statement`
      preparedValues = undefined;
    }

    let result = await this.db.executeSql(
      command,
      preparedValues
    )

    // in case of onlyOneJobActivePerQueue=true, we will receive an array of results (BEGIN, SELECT, UPDATE)
    // the UPDATE is the result we want
    if (options.onlyOneJobActivePerQueue) {
      result = result.find((r) => r.command === 'UPDATE');
    }

    if (!result || result.rows.length === 0) {
      return null
    }

    const jobs = result.rows.map(job => {
      job.done = async (error, response) => {
        if (error) {
          await this.fail(job.id, error)
        } else {
          await this.complete(job.id, response)
        }
      }
      return job
    })

    return jobs.length === 1 && !batchSize ? jobs[0] : jobs
  }

This proof of concept works for me. Now the question, how do we make it not-ugly :D ?

@timgit
Copy link
Owner

timgit commented Oct 3, 2021

I like what you're thinking with hashtext(). Since advisory locks exist in global scope, we'd need to add both the database and schema name as a prefix to guarantee isolation just in case multiple instances are running on the same server.

A subquery for counting the job table during each fetch is going to perform poorly for large tables. I want to avoid this. I'm still leaning towards a new table to hold this state. Having an opt-in config means we could conditionally look this up at fetch time and it would solve the aggregation perf issue. We have to trade in more state management and monitoring of course, but it can be easily added to the current maintenance steps.

For example, a queue table would have id, last job id, job started timestamp, job completed timestamp. If started > completed, a job is currently active and we'd immediately return null and skip the job fetch. The advisory lock would basically limit concurrency of mutating this table to 1 at a time.

@phips28
Copy link
Contributor Author

phips28 commented Oct 4, 2021

You are right, I added the schema to the hashtext(). How can I access database in plans.js?

@phips28
Copy link
Contributor Author

phips28 commented Oct 4, 2021

I did another test on our system. Added all possible jobs and queues.
Queues: 674
Jobs Overall: 25110

Original Query:

EXPLAIN SELECT id
FROM pgboss.job
WHERE state < 'active'
  AND name LIKE 'fetch_ftx_16'
  AND startAfter < now()
ORDER BY priority desc, createdOn, id
LIMIT 1;

image

With Count:

EXPLAIN SELECT id
FROM pgboss.job
WHERE state < 'active'
  AND name LIKE 'fetch_ftx_16'
  AND startAfter < now()
  AND (SELECT count(*) FROM pgboss.job WHERE name LIKE 'fetch_ftx_16' AND state = 'active') = 0
ORDER BY priority desc, createdOn, id
LIMIT 1;

image

Cost doubles because we have no index for it.

create index job_name_state_index
	on pgboss.job (name) where state = 'active';

Then we are coming down to 2217 cost (when no job is in active state):
image

Then I updated all jobs to state = active - of course this has a similiar cost as the version without the index, because it needs to check all rows in the index.
image


We can live with this possible performance issue, as our table is pretty small.

@williamoliveira
Copy link

Maybe instead of choosing the concurrency between infinity (limited by the quantity of workers) or 1 (serial) there could be a way to set the concurrency number per queue?

@phips28
Copy link
Contributor Author

phips28 commented Nov 4, 2021

We now changed our logic how we subscribe to queues. and we introduced some locking mechanism with redis. I think we have a really special usecase 🙈

@phips28 phips28 closed this as completed Nov 4, 2021
@mziwisky
Copy link

hi @timgit, earlier in this thread it sounded like you were interested in adding this feature, but it seems to have hit a dead-end without an official solution. is this still something you're thinking about and/or working on?

@timgit
Copy link
Owner

timgit commented Jul 14, 2022

This popped up in discussions as well in #334 (comment)

Yes, I'm thinking about it but not currently working on it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants