Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding "priority ordering" feature to allow users specifying the order of precedence in a queue #183

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

FreCap
Copy link

@FreCap FreCap commented Feb 20, 2021

#181

No matter the execution_time, given execution_time>CURRENT_TIMESTAMP, the highest priority will always be executed first

e.g.

scheduler.schedule(onetimeTask.instanceBuilder("1").setPriority(100), Instant.now());
scheduler.schedule(onetimeTask.instanceBuilder("2").setPriority(200), Instant.now());

@kagkarlsson I wasn't sure how to bump a major version.
Please let me know if you have any feedback!

…r of precedence in a queue.

No matter the `execution_time`, given execution_time>CURRENT_TIMESTAMP, the highest priority will always be executed first

e.g.
```
scheduler.schedule(onetimeTask.instanceBuilder("1").setPriority(100), Instant.now());
scheduler.schedule(onetimeTask.instanceBuilder("2").setPriority(200), Instant.now());
```
…r of precedence in a queue.

No matter the `execution_time`, given execution_time>CURRENT_TIMESTAMP, the highest priority will always be executed first

e.g.
```
scheduler.schedule(onetimeTask.instanceBuilder("1").setPriority(100), Instant.now());
scheduler.schedule(onetimeTask.instanceBuilder("2").setPriority(200), Instant.now());
```
@kagkarlsson
Copy link
Owner

That was fast! I don't know I have yet fully wrapped my head around this feature.

One thing that I still have not thought of a good solution for, is how to index this design effectively. Do you have any thoughts there?

Fetch-due query:

select * from scheduled_tasks where picked = ? and execution_time <= ? order by priority desc, execution_time asc

For high-volume use-cases I have created an index like

create index on scheduled_tasks(execution_time asc)

However, adding priority to the mix, I don't think this index will work (since it now contains two "ranges"):

create index on scheduled_tasks(priority desc, execution_time asc)

I suspect the current solution will be a bit bad for high-throughput use-cases since they will not be able to index effectively. Hypothetically, let's say we have extreme cases where you have 1M executions due... 🤔

I am trying to think of a good solution here... one variant is to allow users to disable priority-sorting for high-volume cases, or going for a very basic priority feature LOW, NORMAL, HIGH and execute three different fetches, for priority HIGH->LOW..

@kagkarlsson
Copy link
Owner

Just to explore some other thoughts.

  • Do you require to be able to set priority on instance-level, or would task-level work? I.e., for a given task, all instances have the same priority
  • What do you need in terms of cardinality for the priority? Would for example 3, 5 or 10 different values be sufficient? (I am considering if there are any interesting query/index optimizations that can be done if cardinality of priority is low)

@FreCap
Copy link
Author

FreCap commented Feb 22, 2021

Do you require to be able to set priority on instance-level, or would task-level work? I.e., for a given task, all instances have the same priority
Both are useful.
Cardinality could even be 10, but it can be really up to the user. Do you think it makes a difference between 10 or 100?

I've seen that you have a pretty nice framework to test changes in #175, we might want to use that in case to evaluate

@kagkarlsson
Copy link
Owner

Yeah that could be used to evaluate this. Though I think we could simply create a table, populate with a couple of millions of rows, create the indices and run the selects to see what query-times we are getting..

@kagkarlsson
Copy link
Owner

If cardinality were as low as 3 (high, normal, low), then we might just issue 3 queries

select * from scheduled_tasks where ... execution_time <= ? and priority = HIGH order by execution_time asc;
select * from scheduled_tasks where ... execution_time <= ? and priority = NORMAL order by execution_time asc;
select * from scheduled_tasks where ... execution_time <= ? and priority = LOW order by execution_time asc;

In this case, this index would work well, since priority is locked to a single value for each query:

create index on scheduled_tasks(priority, execution_time asc)

The downside is that we need to issue 3 selects each time we poll, but if we poll for 50+ executions each time, then the overhead will still be pretty small, and all queries will be fast due to "perfect" index

@FreCap
Copy link
Author

FreCap commented Feb 22, 2021

I'm pretty sure that the index on two fields should work pretty well. Queries on two ranges shouldn't be a problem.

Multiple queries are for sure less performant than one query with index and it would also be a strong limitation to users

An index with create index on scheduled_tasks(execution_time asc, priority) would likely be better since the cardinality of execution_time is higher

@kagkarlsson
Copy link
Owner

For your use-case, what volumes are you expecting to use?

Also, I think we should do a local test of performance by just inserting say 5M records with slightly randomized execution-time and priority (max cardinality 10 for now) and see what query-times we are looking at to fetch say 100 executions. And try and optimize them by creating ideal indices

@FreCap
Copy link
Author

FreCap commented Feb 23, 2021

Sure, in theory the highest cardinality one should always be first in the index, which in this case is execution_time.

If you imagine a index execution_time, priority, where priority has cardinality 1, the index performance should match exactly an index that just has execution_time.
At that point a difference between cardinality 1 or 10, with a couple million messages should be un-existant.

At the moment I don't have the capacity to create a full fledged test system, would you have time to experiment? Or could we optimize later if needed?

@kagkarlsson
Copy link
Owner

I may get some time to run a couple of tests. I want to be sure that this feature will not make performance worse for those using it for high-throughput cases, or at least that there is an escape-hatch should they experience lower throughput.

Sure, in theory the highest cardinality one should always be first in the index, which in this case is execution_time.

I think for perfect results, the index should match the order by (pre-sorted), but that will not work for us, since we additionally have a where-condition on execution_time < now.
If you put priority as last column in the index, the database will have to read all "due" rows in order to sort by priority. So if 5M rows are due, it has to read them all before returning <limit> rows.

The nice thing about the current (master) select and index is that the database only has to read <limit> rows from the index

@kagkarlsson
Copy link
Owner

Should the results show that this feature may affect performance, then I think we could add it as an opt-in feature, where you explicitly enable it on the scheduler (e.g.enablePriority()). The getDue-method would only add order by priority desc if it is enabled.

@FreCap
Copy link
Author

FreCap commented Feb 24, 2021

Ok, so you would be comparing the current version with the new branch without using priorities (everything 0 by default), correct?

@kagkarlsson
Copy link
Owner

Yeah I suppose so.

master vs this branch where all priorities = null/0

@kagkarlsson
Copy link
Owner

Sorry, I haven't had time to give this PR attention yet. I really need to finish PR #175 and time is limited unfortunately

@tonyhkly
Copy link

Hi, I wanted to check if this feature was still something that was going forward? Its something that my team would also find really useful 🙂

@kagkarlsson
Copy link
Owner

I think it is an interesting feature, but there are a couple of things higher up on the list.

Could you describe your use-case?

@tonyhkly
Copy link

Sure, the scenario is that I am using the dbscheduler to publish to a few kafka topics. Some of these are just auditing topics and others are actual feature code, so I would like to prioritise feature code over auditing

@kagkarlsson
Copy link
Owner

Are you anticipating high volumes (how high?) such that executions will queue up?

@maths22
Copy link

maths22 commented Feb 2, 2023

At my company we run https://github.com/instructure/inst-jobs for rails applications which has a similar priority feature. It's pop query is (simplified) "SELECT * FROM jobs WHERE run_at < now() ORDER BY priority, run_at, id", which uses an index on priority, run_at, id and generates a query plan on postgres of

                                              QUERY PLAN                                               
-------------------------------------------------------------------------------------------------------
 Index Scan using get_delayed_jobs_index on delayed_jobs
   Index Cond: (run_at <= '2023-02-02 01:34:59.958217'::timestamp without time zone)

so at least on postgres such an index is usable without additional filtering. This queue has scaled to millions of jobs in queue and pop-able without performance issues on the pop query, so I'm reasonably confident that such an index works right.

@kagkarlsson
Copy link
Owner

Hi Jacob, thanks for the input! Would you please post the index-definition and the full query-plan? I am very sceptical to postgres being able to use an index on (priority, run_at, id) here

@maths22
Copy link

maths22 commented Feb 2, 2023

Here's the actual pop query we run in prod. Note that we also have concepts of multiple queues in one table and of stranded jobs, both of which aren't really relevant here:

WITH limited_jobs AS (
  SELECT
    id,
    ROW_NUMBER() OVER () AS row_number
  FROM
    (
      SELECT
        "delayed_jobs".*
      FROM
        "canvas"."delayed_jobs"
      WHERE
        (
          run_at <= '2023-02-02 16:23:24.112691'
          AND locked_at IS NULL
          AND next_in_strand = TRUE
        )
        AND "delayed_jobs"."priority" BETWEEN 0
        AND 1000000
        AND "delayed_jobs"."queue" = 'canvas_queue'
      ORDER BY
        "delayed_jobs"."priority" ASC,
        "delayed_jobs"."run_at" ASC,
        "delayed_jobs"."id" ASC
      LIMIT
        160 FOR
      UPDATE
        SKIP LOCKED
    ) subquery
)
UPDATE
  "canvas"."delayed_jobs"
SET
  locked_by = CASE row_number WHEN 1 THEN '<hostname>:106379' ELSE 'prefetch:<hostname>' END,
  locked_at = '2023-02-02 16:23:24.113589'
FROM
  limited_jobs
WHERE
  limited_jobs.id = "canvas"."delayed_jobs".id RETURNING "canvas"."delayed_jobs".*;

And here's the corresponding full plan (The innermost index scan is the real select from the jobs queue; as evidenced by the rows=160 on the other stuff that's all just operating on subset of jobs plucked by the subquery):

Update on delayed_jobs  (cost=78.78..525.03 rows=160 width=570)
   CTE limited_jobs
     ->  WindowAgg  (cost=0.42..78.23 rows=160 width=16)
           ->  Subquery Scan on subquery  (cost=0.42..76.23 rows=160 width=8)
                 ->  Limit  (cost=0.42..74.63 rows=160 width=2775)
                       ->  LockRows  (cost=0.42..79565.10 rows=171541 width=2775)
                             ->  Index Scan using get_delayed_jobs_index on delayed_jobs delayed_jobs_1  (cost=0.42..77849.69 rows=171541 width=2775)
                                   Index Cond: ((priority >= 0) AND (priority <= 1000000) AND (run_at <= '2023-02-02 16:23:24.112691'::timestamp without time zone))
                                   Filter: ((locked_at IS NULL) AND next_in_strand AND ((queue)::text = 'canvas_queue'::text))
   ->  Nested Loop  (cost=0.55..446.80 rows=160 width=570)
         ->  CTE Scan on limited_jobs  (cost=0.00..3.20 rows=160 width=56)
         ->  Index Scan using delayed_jobs_pkey on delayed_jobs  (cost=0.55..2.77 rows=1 width=14)
               Index Cond: (id = limited_jobs.id)

The index definition of the get_delayed_jobs_index:

"get_delayed_jobs_index" btree (priority, run_at, id) WHERE queue::text = 'canvas_queue'::text AND locked_at IS NULL AND next_in_strand

@amitonlentra
Copy link

@kagkarlsson - is it possible to plan a merge of this feature soon and release it? We need this capability in our application.

@kagkarlsson
Copy link
Owner

I might be able to pick this up soon, but cannot give an ETA for it

@kagkarlsson kagkarlsson added pri2 and removed pri2 labels May 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants