Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

body content sometimes (!) missing in async content handler #299

Closed
afiller opened this issue Aug 14, 2023 · 9 comments
Closed

body content sometimes (!) missing in async content handler #299

afiller opened this issue Aug 14, 2023 · 9 comments

Comments

@afiller
Copy link

afiller commented Aug 14, 2023

What happens:

I have a Java content handler implementing "NginxJavaRingHandler" which has a async setup using a thread, channel and hijack. Source below. When the NGINX endpoint is called from some sources (the problem does not exist with all sources) with content in a POST request, the body (InputStream) is sometimes (!) empty, when accessing it from the thread. It's always full, wenn I read out the body before starting the thread with the channel. The content of the input stream seems to be cleared too early and I don't know why?!

I of course know, that the input stream cannot be read twice. The source below only shows the two positions where I read the in the input stream using Commons-IO, but I also tried it with Vanilla Java.

Thanks for helping!!!

PS: The "val" and "@cleanup" are from lombok.

My configuration:

Environment:

  • Linux (Ubuntu)
  • NGINX-clojure-0.5.3
  • GraalVM

NGINX:

        location ~ "^/PMCP/api/v00/external/(.*)$" {
            limit_req zone=external burst=20 delay=10;
            limit_req_log_level warn;
            limit_req_status 503;
            content_handler_type 'java';
            content_handler_name 'com.pathmate.nginx.handler.ExternalRESTCallHandler';
        }

JAVA (excerpt, relevant parts):

	/**
	 * The invocation handler for the asynchronous calls
	 */
	private class InvocationHandler implements Runnable {
		private final NginxJavaRequest			nginxRequest;

		private final NginxHttpServerChannel	channel;

		public InvocationHandler(final NginxJavaRequest nginxRequest,
				final NginxHttpServerChannel channel, final String bodyData) {
			this.nginxRequest = nginxRequest;
			this.channel = channel;
		}

		@Override
		public void run() {
			val requestURI = ((String) request.get("uri"))
					.substring(uriPrefixLength);

			log.info("Processing new request: " + requestURI);

			// Read body if available
			val body = request.get(MiniConstants.BODY);

			String bodyData;
			if ((body != null) && (body instanceof InputStream)) {
				@Cleanup
				val bodyInputStream = (InputStream) body;

				bodyData = IOUtils.toString(bodyInputStream,
					StandardCharsets.UTF_8);

				// ==> WHEN READING THE BODY (bodyData) HERE FOR THE FIRST TIME
				//         IT'S EMPTY ("") WHEN CALLING FROM SOME (!) SERVERS
			} else {
				bodyData = null;
			}

			try {
				channel.sendResponse(200);
				return;
			} catch (final IOException e1) {
				// Do nothing
			}
		}
	}

	@Override
	public Object[] invoke(final Map<String, Object> request)
			throws IOException {
		val nginxRequest = (NginxJavaRequest) request;
		nginxRequest.prefetchAll();

		// Read body if available
		val body = request.get(MiniConstants.BODY);

		String bodyData;
		if ((body != null) && (body instanceof InputStream)) {
			@Cleanup
			val bodyInputStream = (InputStream) body;

			bodyData = IOUtils.toString(bodyInputStream,
					StandardCharsets.UTF_8);

			// ==> WHEN READING THE BODY (bodyData) HERE FOR THE FIRST TIME
			//         IT'S ALWAYS AVAILABLE
		} else {
			bodyData = null;
		}

		val channel = nginxRequest.handler().hijack(nginxRequest, false);

		// Run thread in thread pool executor
		executorService.execute(new InvocationHandler(nginxRequest, channel));

		return null;
	}
@xfeep
Copy link
Member

xfeep commented Aug 16, 2023

Hi, Filler.
Could you please tell us what value is bodyInputStream.getClass() when the bodyInputStream is missing?
There're three kinds of bodyInputStream. The first is FileInputStream when Nginx uses tmp body file. The second is NativeInputStream and the last is SequenceInputStream where stores a list of NativeInputStream.

@afiller
Copy link
Author

afiller commented Aug 17, 2023

Hi xfeep,
thank you for your reply and - by the way - for your great project. The library is so amazing and I love using it, it fills such an important gap by realising dynamic NGINX redirects.

I just tested it out and it's in both cases (contains body/misses body) a SequenceInputStream:

2023-08-17 09:59:00[warn][21190][pool-1-thread-1]>>>java.io.SequenceInputStream
2023-08-17 09:59:00[warn][21190][pool-1-thread-1]<<<<

2023-08-17 09:59:02[warn][21186][pool-1-thread-1]>>>java.io.SequenceInputStream
2023-08-17 09:59:02[warn][21186][pool-1-thread-1]<<<<{"event": {"event_timestamp_ms": 1692259142008, "product_id":....

Best regards,

Andreas

@xfeep
Copy link
Member

xfeep commented Aug 20, 2023

Hi, Andreas,
I'm glad to see this library is useful to you.
Please show the two values of bodyInputStream.available(). The first one is got at ExternalRESTCallHandler.invoke(request). The second one is got at InvocationHandler.run().
BTW I find thead pool mode is used (viz. jvm_workers > 0) and requst.hijack(..) / request,handler().hijack(..) is used together with thead pool mode. In most general cases requst.hijack(..) is used with plain mode to avoid blocking nginx.

@afiller
Copy link
Author

afiller commented Aug 24, 2023

Hi xfeep,

we never had problems with thread pool mode and NGINX and we are using this configuration quite long already. But I will keep an eye on it after your input.

The result is interesting:

In NOT successful cases the .available() is 401 and later 0.

In successful cases (so the content can also be read in run()) it is 1241 in both methods, so before starting the thread and inside the thread.

Summarized: When the stream is already at 1241 it is also possible to read it later. If it's not (401) it's empty later (0).

Hope it helps a bit in investigating. I keep my setup as it currently is for further testing.

@afiller
Copy link
Author

afiller commented Aug 24, 2023

PS:

We are not using "jvm_workers" (so the default of 0 should be used). The thread name comes from the thread pool executor I use in my class NginxJavaRingHandler for handling the parallel request:

Constructor:
ExecutorService executorService = Executors.newCachedThreadPool();

Usage:
// Run thread in thread pool executor
executorService.execute(new InvocationHandler(nginxRequest, channel));

...so this is simply internally in my class to be able to handle the hijacked requests in parallel.

@xfeep
Copy link
Member

xfeep commented Aug 27, 2023

Hi Andreas,
Thank you very much. You provided very useful information.
Now I can confirm that it will be a problem if we read body out of the sope of handler.invoke(req) because body will be closed at the end of handler.invoke(req).
So if your thread runs quickly before the end of handler.invoke(req) you can read the body successfully otherwise you will get an closed inputstream.
In the futerue version I will consider the tradeoff between resource leak risk and flexibility.

@xfeep
Copy link
Member

xfeep commented Aug 27, 2023

PS: java.io.SequenceInputStream.available() only returns the first stream's available(). If there're serveral chunks in the body the value which available() returns is not always the total length of the body. So both .available() is 401 and .available() is 1241 are OK.

@afiller
Copy link
Author

afiller commented Aug 30, 2023

Thank you for your explanation and clarification. I adjusted my implementation now that it reads the body before going starting into the thread. The body will be handed over in the constructor of the same. It's less threaded in the end, but working stable. Thanks again!

@xfeep
Copy link
Member

xfeep commented Aug 31, 2023

@afiller Hi Andreas,

Please try a workaround way for better performance.

// get a user mananged body reference
Inputstream  umbody = (Inputstream)MiniConstants.BODY_FETCHER.fetch(nginxRequest.nativeRequest(), null);
// access umbody in another thread and then close it.
...

I've also created a related issue for the future enhancement about the hijack API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants