Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mutiny and Resteasy integration sometimes fails with "Attempted to do blocking IO from the IO thread" #8152

Closed
zeljkot opened this issue Mar 25, 2020 · 32 comments
Labels
kind/bug Something isn't working triage/out-of-date This issue/PR is no longer valid or relevant

Comments

@zeljkot
Copy link

zeljkot commented Mar 25, 2020

Describe the bug

Often Multi returned to Resteasy causes exception, although both database and Kafka are executed in a worker thread.

Code:

@POST
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Produces(MediaType.APPLICATION_JSON)
@RolesAllowed("file-upload")
public Uni<List<Long>> upload(
        @Nonnull @Context SecurityContext securityContext,
        Map<String, byte[]> files
) {
    return Multi
            .createFrom().iterable(files.entrySet())
            .onItem().produceMulti(entry -> extractFiles(securityContext.getUserPrincipal().getName(), entry))
            .concatenate()
            .onItem().invoke(fileEntity -> persist(fileEntity)).subscribeOn(Infrastructure.getDefaultWorkerPool())
            .emitOn(Infrastructure.getDefaultWorkerPool())
            .onItem().produceCompletionStage(fileEntity -> sendMessage(fileEntity)).concatenate()
            .collectItems().asList()
            .ifNoItem().after(Duration.ofSeconds(200)).failWith(
                    () -> new ServerErrorException("Kafka not responding", Response.Status.GATEWAY_TIMEOUT));
}

sendMessage returns CompletableFuture:

final CompletableFuture<Void> voidCompletableFuture = new CompletableFuture();
OutgoingKafkaRecord<Long, ProcessingStatus> message =
    KafkaRecord.of(fileEntity.getId(), status)
        .withAck(() -> {
          voidCompletableFuture.complete(null);
          return voidCompletableFuture;
        });
emitter.send(message);
return voidCompletableFuture.thenApply(void -> status.getId());

Exception:

[vert.x-eventloop-thread-2] [SynchronousDispatcher.java:545] - RESTEASY002020: Unhandled asynchronous exception, sending back 500: javax.ws.rs.ProcessingException: RESTEASY008205: JSON Binding serialization error java.lang.IllegalStateException: UT000126: Attempted to do blocking IO from the IO thread. This is prohibited as it may result in deadlocks

It seems that error does not happen when emmiter.send is used without the future.

Expected behavior

Resteasy should just execute the request as all blocking operations are moved to worker threads.

Actual behavior

Resteasy complaints about blocking operation.

To Reproduce
Steps to reproduce the behavior:

  1. Return Multi with emmiter.send waiting for future to complete

Configuration

# Add your application.properties here, if applicable.

Screenshots
(If applicable, add screenshots to help explain your problem.)

Environment (please complete the following information):

  • Output of uname -a or ver: Microsoft Windows [Version 10.0.19041.153]
  • Output of java -version: 1.8
  • GraalVM version (if different from Java):
  • Quarkus version or git rev: 1.3.0.Final
  • Build tool (ie. output of mvnw --version or gradlew --version): Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)

Additional context
@cescoffier https://groups.google.com/forum/#!topic/smallrye/J4fEeQYfM5w

@zeljkot zeljkot added the kind/bug Something isn't working label Mar 25, 2020
@gsmet
Copy link
Member

gsmet commented Mar 25, 2020

@cescoffier
Copy link
Member

First, I would simplify the code:

@POST
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Produces(MediaType.APPLICATION_JSON)
@RolesAllowed("file-upload")
public Uni<List<Long>> upload(
        @Nonnull @Context SecurityContext securityContext,
        Map<String, byte[]> files
) {
    return Multi
            .createFrom().iterable(files.entrySet())
            .emitOn(Infrastructure.getDefaultWorkerPool())
            .onItem().produceMulti(entry -> extractFiles(securityContext.getUserPrincipal().getName(), entry))
            .concatenate()
            .onItem().invoke(fileEntity -> persist(fileEntity))
            .onItem().produceCompletionStage(fileEntity -> sendMessage(fileEntity)).concatenate()
            .emitOn(Infrastructure.getDefaultWorkerPool()) // The ack is on the event loop.
            .collectItems().asList()
            .ifNoItem().after(Duration.ofSeconds(200)).failWith(
                    () -> new ServerErrorException("Kafka not responding", Response.Status.GATEWAY_TIMEOUT));
}

@FroMage
Copy link
Member

FroMage commented Mar 25, 2020

If we do blocking IO it means we must have a filter that disabled async IO. But I was pretty sure I would detect and reject it. Is this with quarkus-undertow or without?

@FroMage
Copy link
Member

FroMage commented Mar 25, 2020

Can we get the full stack trace?

@zeljkot
Copy link
Author

zeljkot commented Mar 25, 2020

Sorry, I forgot to attach it. Here it is.

trace.txt

@zeljkot
Copy link
Author

zeljkot commented Mar 25, 2020

First, I would simplify the code:

@cescoffier onItem instead of subscribeOn? Does it matter where I put it because in your example it is not immediately before persist?

@zeljkot
Copy link
Author

zeljkot commented Mar 25, 2020

If we do blocking IO it means we must have a filter that disabled async IO. But I was pretty sure I would detect and reject it. Is this with quarkus-undertow or without?

I have this dependency for WebSockets; it seems that is used for REST, too, although I do not need it.

<dependency>
  <groupId>io.quarkus</groupId>
  <artifactId>quarkus-undertow-websockets</artifactId>
</dependency>

@cescoffier
Copy link
Member

cescoffier commented Mar 25, 2020

onItem is about receiving items.
subscribeOn changes the thread used during the subscription.

I removed a few subscribeOn and emitOn.

And yes, location matters. emitOn constraints the thread used to dispatch the event downstream (next line).

@FroMage
Copy link
Member

FroMage commented Mar 25, 2020

Yeah, i see the gzip interceptor which forces blocking IO.

@zeljkot
Copy link
Author

zeljkot commented Mar 25, 2020

onItem is about receiving items.
subscribeOn changes the thread used during the subscription.

I removed a few subscribeOn and emitOn.

And yes, location matters. emitOn constraints the thread used to dispatch the event downstream (next line).

Isn't it needed for "persist" (database operation)? "extractFiles" just unzips from byte[] so it is probably not needed there.

@zeljkot
Copy link
Author

zeljkot commented Mar 27, 2020

Yeah, i see the gzip interceptor which forces blocking IO.

I removed @GZIP, but is still happens (without gzip part in stack trace).

@FroMage
Copy link
Member

FroMage commented Mar 30, 2020

Can you show me the new stack trace then?

@zeljkot
Copy link
Author

zeljkot commented Mar 30, 2020

I already changed it because of the issues, but I will try to reproduce it again, maybe later today.

@zeljkot
Copy link
Author

zeljkot commented Apr 5, 2020

@FroMage here is the log.
log.txt

@FroMage
Copy link
Member

FroMage commented Apr 6, 2020

I still see GZIP:

	at org.jboss.resteasy.plugins.interceptors.GZIPEncodingInterceptor.aroundWriteTo(GZIPEncodingInterceptor.java:103)

@zeljkot
Copy link
Author

zeljkot commented Apr 6, 2020

You are right, it disappeared just from the cause "java.lang.IllegalStateException" (java.util.zip.GZIPOutputStream).

@akoufa
Copy link

akoufa commented May 31, 2020

I am experiencing the same issue when I define in application.properties following : quarkus.resteasy.gzip.enabled=true quarkus.resteasy.gzip.max-input=10M . The stack trace is attached:
stacktrace.txt

My question is shouldn't the request be handled by a Worker Thread when using JaxRS ? Or is a Mutiny JaxRS Route being handled on a IO Thread like Reactive Routes ?

@cescoffier
Copy link
Member

Jax-RS method returning Mutiny types are still called on the worker thread pool. However, depending on what you do in the method, you may get back to an event loop (typically if you use a reactive client).

@FroMage
Copy link
Member

FroMage commented Jun 2, 2020

@stuartwdouglas can you remind me what you told me about being able to run gzip in an async fashion?

@akoufa
Copy link

akoufa commented Jun 2, 2020

@cescoffier Ok then, correct me if I am wrong, I have to use an emitOn to go back to a worker thread. Which executor do I have to use to get back to a JaxRS worker thread?

@stuartwdouglas
Copy link
Member

You can use java.util.zip.Deflater and java.util.zip.Inflater.

@cescoffier
Copy link
Member

@akoufa yiu can use ‘Infrastructure.getDefaultExecutor()’

@akoufa
Copy link

akoufa commented Jun 3, 2020

@FroMage @stuartwdouglas Can't Quarkus Gzip support via enabling it in application properties be changed to be async instead of blocking ?

@andreas-eberle
Copy link
Contributor

Shouldn't quarkus make sure that gzip is run on the correct thread? I don't think this should be something user's need to know about.

@FroMage
Copy link
Member

FroMage commented Jun 3, 2020

@stuartwdouglas thanks.

Shouldn't quarkus make sure that gzip is run on the correct thread? I don't think this should be something user's need to know about.

Well yeah, if we can make gzip work with async IO, it will be automatic.

@andreas-eberle
Copy link
Contributor

Maybe I don't get all the stuff that comes together here. But as far as I understood, it would be fixable by changing the thread to the jaxrs worker thread again, am I right? If so, couldn't quarkus switch the thread of the Uni/Multi back to the jaxrs thread before doing any more jaxrs/response handling/gzip stuff with it?

@vishalgoel1988
Copy link

Hi, Any update on this please? I also got the same issue. Is it possible to enhance quarkus to use gzip for aysnc also?

@FroMage
Copy link
Member

FroMage commented Aug 21, 2020

No update so far, I haven't had time to look at it. If you want to contribute I could help you?

@frankie1984
Copy link

We running into the same problem, has someone found a workaround yet? Were trying to stream about 9.000.000 json objects and gzip would be nice :D

@stuartwdouglas
Copy link
Member

Have you tried quarkus.http.enable-compression=true ? This will enable vert.x level compression, which is async.

@frankie1984
Copy link

We added gzip on the nginx layer

@cescoffier
Copy link
Member

Closing - out of date.

@cescoffier cescoffier added triage/out-of-date This issue/PR is no longer valid or relevant and removed priority/blocker labels Mar 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug Something isn't working triage/out-of-date This issue/PR is no longer valid or relevant
Projects
None yet
Development

No branches or pull requests

9 participants