Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create value property on io.smallrye.common.annotation.Blocking to allow the configuration of the worker thread pool to a specific ConsumeEvent #25259

Open
felipewind opened this issue Apr 29, 2022 · 7 comments
Labels
area/smallrye kind/enhancement New feature or request

Comments

@felipewind
Copy link
Contributor

Description

My problem

I want to be able to configure the thread pool size of a specific ConsumeEvent, because in my current project I must respect some throttling rules of one API that my application will access. The reason is that I want to parallelize my process, but I need to control the number of parallel threads running.

I posted this question on StackOverflow to know if this is possible.

I don't want to set the global worker thread pool, I only want to set the thread pool for this specific ConsumeEvent.

Setting the global size of the worker thread pool

If I understood correctly, today I can only set the global thread size of the worker thread pool using:

quarkus.vertx.worker-pool-size=2

Similar solution on Smallrye Reactive Messaging to configure a specific thread pool

On the SmallRye Reactive Messaging guide there's one example of what I want to do.

Here, I can use one Blocking annotation, define one name for it and configure the thread pool:

@Outgoing("Y")
@Incoming("X")
@Blocking("my-custom-pool")
public String process(String s) {
  return s.toUpperCase();
}

Specifying the concurrency for the above worker pool requires the following configuration property to be defined:

smallrye.messaging.worker.my-custom-pool.max-concurrency=3

In this example, I can configure the size of the thread pool that will process the messages from the my-custom-pool.

Implementation ideas

Based on the answer of tsegismont on my post on StackOverflow, the idea would be to create the possibility to set a value of the @io.smallrye.common.annotation.Blocking and then to set the thread size to this specific worker thread pool.

Example of the ConsumeEvent:

@io.quarkus.vertx.ConsumeEvent("my-consume-event")
@io.smallrye.common.annotation.Blocking("my-custom-pool")
public void start(String value) {
  // do the work
}

Example of the property configuring the size of "my-custom-pool":

quarkus.vertx.worker."my-custom-pool"-pool-size=2
@felipewind felipewind added the kind/enhancement New feature or request label Apr 29, 2022
@quarkus-bot
Copy link

quarkus-bot bot commented Apr 29, 2022

@cescoffier
Copy link
Member

@Ladicek @radcortez this would require to extend the @Blocking annotation from SmallRye or adding another annotation.
WDYT?

@Ladicek
Copy link
Contributor

Ladicek commented May 16, 2022

How many libraries/frameworks integrated in Quarkus use @Blocking? I remember RESTEasy Reactive, SmallRye Reactive Messaging, SmallRye GraphQL, SmallRye Fault Tolerance, apparently Quarkus's @ConsumeEvent, and I'm sure I'm missing some. Would all of them be able to use this configuration?

@radcortez
Copy link
Member

If this is to be supported it would need to be across the board to make it consistent.

@cescoffier
Copy link
Member

Yes, I totally agree, it will need to be consistent.

At the moment the SR Reactive messaging variant of @Blocking allows configuring whether the order must be preserved and the name of the pool.

@phillip-kruger
Copy link
Member

Why do we not rather introduce new annotations ? @PreserveOrder and @ConnectionPool("bla") ?

@stuartwdouglas
Copy link
Member

Note that you can also easily work around this by manually delegating to a Thread pool rather than relying on Quarkus to handle it for you. For example:

@ApplicationScoped
class ThreadPoolManager {
    ExecutorService service;

    @PostConstruct
    void setup() {
        service = Executors.newFixedThreadPool(3);
    }

    @PreDestroy
    void close() {
        service.shutdown();
    }

    public ExecutorService getService() {
        return service;
    }
}
@ApplicationScoped
public class EventBusConsumer {

    @Inject
    ThreadPoolManager manager;

    @ConsumeEvent("pets")
    public CompletionStage<String> sayHi(Pet pet) {
        return CompletableFuture.supplyAsync(() -> "Hello " + pet.getName() + " (" + pet.getKind() + ")", manager.getService());
    }

    @ConsumeEvent("persons")
    public void name(String name) {
        manager.getService().submit(new Runnable() {
            @Override
            public void run() {
                //do stuff
            }
        });
    }

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/smallrye kind/enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

7 participants