-
Notifications
You must be signed in to change notification settings - Fork 15
Async message processing logic #59
Async message processing logic #59
Conversation
| * @return | ||
| */ | ||
| HttpProxyServerBootstrap withMessageProcessingExecutor( | ||
| ExecutorService messageProcessorExecutor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be provided from outside, possibly in a wrapper with added metrics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i like the idea of a specialized wrapper.
| Channel channel) { | ||
| super(AWAITING_INITIAL, proxyServer, false); | ||
|
|
||
| this.channel = channel; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initial the channel is assigned on channelRegistered which happens a bit later but we need the channel right now to create global state wrapper event loop
| } | ||
|
|
||
| @Sharable | ||
| protected class ClientToProxyMessageProcessor extends ChannelInboundHandlerAdapter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this handler contains authentication and payload transformation logic which is executed in a separate executor service
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we move it to a separate class / java file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this class encapsulate some code of client to proxy connection.. it is easier to have it here..
| } | ||
|
|
||
| if (ProxyUtils.isChunked(httpRequest)) { | ||
| process(ctx, httpRequest); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can not do async call when chunks are used. It is due to https://github.com/netty/netty/blob/6fdd7fcddbe964b2f30d7492a926f4f0bf0f083f/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java#L321.
The app does not use chunks as they are aggregated by https://github.com/verygoodsecurity/LittleProxy/blob/vgs-edition/src/main/java/org/littleshoot/proxy/impl/ClientToProxyConnection.java#L817
|
|
||
| pipeline.addLast("bytesReadMonitor", bytesReadMonitor); | ||
| pipeline.addLast("bytesWrittenMonitor", bytesWrittenMonitor); | ||
| EventExecutorGroup globalStateWrapperEvenLoop = new GlobalStateWrapperEvenLoop(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this event loop will not always be used. Only when a call to fireChannel.. will be executed from another thread (when message processing thread was used). The main context is still handler by InboundGlobalStateHandler .
The reason is that in the same loop fireChannel.. calls are executed like recursion and this recursion starts before we have data in the context (from the request) so GlobalStateWrapperEvenLoop is not executed
| import io.netty.util.concurrent.Promise; | ||
| import io.netty.util.concurrent.ScheduledFuture; | ||
|
|
||
| public class GlobalStateWrapperEvenLoop implements EventExecutor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very simple wrapper without any modification to the logic of the main event loop. Any attempt to do that made the proxy unstable
| import io.netty.handler.codec.http.HttpRequest; | ||
| import io.netty.handler.codec.http.HttpResponse; | ||
|
|
||
| public class UpstreamConnectionHandler extends SimpleChannelInboundHandler<Object> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this handler uses SimpleChannelInboundHandler which automatically releases the msg. I am still testing that
| try { | ||
| payloadProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS); | ||
| } catch (InterruptedException e) { | ||
| log.warn("Failed to shutdown payload processing executor properly", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you must restore the interruption flag here. missing to do so will have have an adverse effect on the further shutdown code below
try {
payloadProcessingExecutor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.warn("Failed to shutdown payload processing executor properly", e);
}
for (EventLoopGroup group : allEventLoopGroups) {
if (graceful) {
group.shutdownGracefully();
} else {
group.shutdownGracefully(0, 0, TimeUnit.SECONDS);
}
}
if (graceful) {
for (EventLoopGroup group : allEventLoopGroups) {
try {
group.awaitTermination(60, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while shutting down event loop");
}
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, this shutdown sequence violates the recommended approach, as described in https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html
void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
src/main/java/org/littleshoot/proxy/impl/DefaultHttpProxyServer.java
Outdated
Show resolved
Hide resolved
|
|
||
| @Override | ||
| protected ConnectionState readHTTPInitial(HttpRequest httpRequest) { | ||
| protected ConnectionState readHTTPInitial(ChannelHandlerContext ctx, Object httpRequestObj) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did we change HttpRequest httpRequest to Object httpRequestObj ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I had some weird behavior with generic here..
osklyarenko
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job 👍
Fixes https://app.clubhouse.io/vgs/story/12753/refactor-littleproxy
Problem:
Solutions:
Chosen solution: 3
Comments:
In Netty all the tasks related to the same channel must be executed strictly in order. Event loop executes tasks which are any possible work done by Netty. For example executing
fireChannelReadfrom a separate thread will create a task and put it into main event loop task queue.Also event loop is shared between channels and all the tasks (for different channels) are in one queue and executed in one loop iteration.
Hacking event loop by adding some sort of cached executor makes LP unstable in some cases. A task may be executed in incorrect order leading to issues which are hard to debug.
And the main issue with that is that it goes opposite to Netty principles.