Skip to content

include priority ? #1

Closed
kennywest opened this issue Sep 30, 2019 · 10 comments
Closed

include priority ? #1

kennywest opened this issue Sep 30, 2019 · 10 comments

Comments

@kennywest
Copy link

Would you be interested in including priority in tasks (JMS also has this)? I have created a preliminary branch for this that seems to do what I need: https://github.com/kennywest/db-queue/tree/add_priority

Thanks for considering.

@f0y
Copy link
Contributor

f0y commented Oct 6, 2019

I think its not a good idea because in some cases low priority tasks wont be processed at all. Did you consider storing high priority tasks in another queue?
Could you provide your use case?

@everplays
Copy link
Contributor

I am also interested in having a priority-based scheduler for db-queue (willing to make a PR if the idea is accepted). However, definitely the sorting approach can cause bottle-neck for lower priority jobs if higher priority tasks continuously are added to the queue.

I think the solution here is a quota based scheduler. Imagine the following queues:

  • High priority: 10
  • Medium priority: 5
  • Low priority: 1

Now, assuming that we have infinite number of tasks in each queue, in each cycle 16 will be picked, 10 high priority, 5 medium, and 1 low priority. To make it more efficient, If it tried to pick a high priority and there were none, instead of trying 9 more times, it can just jump to medium priority.

Obviously, a simpler implementation of this is to start 10 threads for high, 5 threads for Medium, and 1 thread for low priority. However, I prefer the scheduler approach.

But why?

Not sure about @kennywest's case but we have few background tasks that have to be processed as soon as possible, a majority of tasks with medium priority, and relatively small number of low priority tasks. In theory, we could start three processes (or containers, VMs, ...) to handle the incoming tasks and potentially, scale them up and down depending on how many tasks we have for each queue. However, as you can guess, most of the time the high priority and low priority processes will not do anything. Yet, we can not turn them off because of the possibility of incoming task. Hence, it makes more sense to have one process that can execute the tasks from different queues but based on the priority that is provided.

Of course, not to mention that scaling up/down will be a lot easier as it only depends on the total number of unprocessed tasks (instead of the per queue number).

@f0y
Copy link
Contributor

f0y commented Oct 9, 2019

I like your idea of the quota-based scheduler, so I will implement scheduler functionality. It will be included in the next major release. I have plans to release it at the end of October. Thank you for contribution.

@everplays
Copy link
Contributor

Thanks @f0y. I am not sure if you want to plug it into execution pool somehow or you have a better idea but I would like to contribute if you tell me what you have in mind. I don't wanna make something that is unacceptable to you.

If you still want to implement it yourself, at least I need to thank you guys for the great work on the library.

@kennywest
Copy link
Author

@f0y thank you for considering this. @everplays reasoning is correct and it could lead to a service only processing high prio messages and the number of low prio messages not being processed will only grow. So implementing a quota based scheduler makes sense.

@everplays
Copy link
Contributor

@f0y as you've been busy, I can happily implement this feature if we discuss the implementation before hand (I really don't wanna spend my time on something that's not gonna be merged).

So far, we have managed the priorities as such:

  • scaling the high, medium, and low queues differently via containers.
  • added cli arguments to start the worker process with number of necessary threads. For example: worker -q high -q high -q high -q medium -q medium -q low would start 6 threads (3 high, 2 medium, and 1 low). To achieve this, we created a QueueService instance per queue so we could change the number of threads independently from other queues.

@f0y
Copy link
Contributor

f0y commented Jan 13, 2020

I've tried to figure out how to do that and I have no idea. Do you have one?
The idea to change thread count is not a bad one.
Is your current solution unacceptable for some reason?

@f0y
Copy link
Contributor

f0y commented Jan 13, 2020

I really don't wanna spend my time on something that's not gonna be merged

You can implement POC without tests, documentation, successful build and etc.

@everplays
Copy link
Contributor

The two solutions that we are using at the moment work fine with low number of workers. So maybe I am just trying to optimize something that doesn't need optimization. However, gonna give it a go in the weekend and see what I can come up with.

@everplays everplays mentioned this issue Jan 25, 2020
@everplays
Copy link
Contributor

Just to inform anyone who's potentially waiting for me to implement this: for our usecase, we've concluded that we've enough flexibility with thread based and container based scaling options that having a priority based scheduler has little to no benefit for us.

However, If anyone's interested in implementing this, from what I gathered, it can be implemented with:

  • QueueLoop
  • LoopPolicy

when the "core" functionality is ready, modifying QueueExecutionPool and QueueService would be in order to somehow allow users of the library to create a PriorityQueueLoop (or a better name if you have one).

Finally, if you want to be able to control how many threads are executed per queue, this is what we did:

public class Worker implements AutoCloseable {

	private static final Logger LOGGER = LoggerFactory.getLogger(Worker.class);

	private boolean started = false;

	private JobQueueProducer jobQueueProducer;
	private QueueCoreElements queueCoreElements;
	private Map<String, Long> queuesToRun;

	private Map<String, QueueService> queueToService = new HashMap<>();

	@Inject
	public Worker(
		QueueCoreElements queueCoreElements,
		JobQueueProducer jobQueueProducer
	) {
		this.queueCoreElements = queueCoreElements;
		this.jobQueueProducer = jobQueueProducer;
	}

	public void start(List<String> suggestedQueuesToRun) {
		this.setQueuesToRun(suggestedQueuesToRun);
		this.queuesToRun.forEach((queue, numberOfThreads) -> this.getQueueService(queue).start());
		this.started = true;
	}

	public void shutdown() {
		if (this.started) {
			this.queuesToRun.forEach((queue, numberOfThreads) -> this.getQueueService(queue).shutdown());
		}
	}

	@Override
	public void close() throws Exception {
		this.shutdown();
	}

	private QueueService getQueueService(String queue) {
		if (this.queueToService.containsKey(queue)) {
			return this.queueToService.get(queue);
		}

		QueueService queueService = new QueueService(
			List.of(this.getQueueShard(queue)),
			new ThreadListener(),
			new TaskListener()
		);
		this.registerQueueToService(queue, queueService);
		this.queueToService.put(queue, queueService);

		return queueService;
	}

	private void setQueuesToRun(List<String> suggestedQueuesToRun) {
		if (this.queuesToRun != null) {
			return;
		}
		if (suggestedQueuesToRun.size() == 0) {
			suggestedQueuesToRun = List.of("high", "medium");
		}
		this.queuesToRun = suggestedQueuesToRun.stream().collect(Collectors.groupingBy(e -> e, Collectors.counting()));
	}

	private QueueShard getQueueShard(String queue) {
		QueueShard queueShardTemplate = this.queueCoreElements.getQueueShard();
		return new QueueShard(
			queueShardTemplate.getDatabaseDialect(),
			queueShardTemplate.getQueueTableSchema(),
			new QueueShardId(queue),
			queueShardTemplate.getJdbcTemplate(),
			queueShardTemplate.getTransactionTemplate()
		);
	}

	private void registerQueueToService(String queue, QueueService queueService) {
		QueueConsumer<JsonNode> queueConsumer = new RunnableQueueConsumer(
			new QueueConfig(
				this.queueCoreElements.getQueueLocationBuilder().withQueueId(new QueueId(queue)).build(),
				this.queueCoreElements.getQueueSettingsBuilder().withThreadCount(this.queuesToRun.get(queue).intValue()).build()
			),
			jobQueueProducer
		);
		queueService.registerQueue(queueConsumer);
	}

}

Then, if the suggestedQueuesToRun parameter is set to "high", "high", "high", "medium", "medium", "low", you'll have six threads: 3 high, 2 medium, 1 low.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Development

No branches or pull requests

3 participants