-
-
Notifications
You must be signed in to change notification settings - Fork 15.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support handshake timeout in websocket handlers #8856
Merged
Merged
Changes from 7 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
d7ed167
Add handshake timeout options to websocket client protocol hand shake…
qeesung 233100d
Add handshake timeout options to websocket server protocol hand shake…
qeesung 3a2d018
Add handshake future getter in handshake handler
qeesung 0c1619d
Add the handshake timeout testcase and clean the code
qeesung b510819
Fixed the issue that handle user trigger events incorrectly in testcase
qeesung 478e3f3
Add optional constructor parameter handshakeTimeout
qeesung 2592097
Rename DEFAULT_HANDSHAKE_TIME to DEFAULT_HANDSHAKE_TIME_MS and remove…
qeesung c08e387
Use Future<?> instead of ScheduledFuture<?>
qeesung 9646430
Remove unnecessary checks
qeesung File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,13 +19,43 @@ | |
import io.netty.channel.ChannelFutureListener; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.channel.ChannelInboundHandlerAdapter; | ||
import io.netty.channel.ChannelPromise; | ||
import io.netty.handler.codec.http.FullHttpResponse; | ||
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler.ClientHandshakeStateEvent; | ||
import io.netty.util.concurrent.Future; | ||
import io.netty.util.concurrent.FutureListener; | ||
import io.netty.util.concurrent.ScheduledFuture; | ||
import io.netty.util.internal.ThrowableUtil; | ||
|
||
import java.util.concurrent.TimeUnit; | ||
|
||
import static io.netty.util.internal.ObjectUtil.*; | ||
|
||
class WebSocketClientProtocolHandshakeHandler extends ChannelInboundHandlerAdapter { | ||
private static final WebSocketHandshakeException HANDSHAKE_TIMED_OUT_EXCEPTION = ThrowableUtil.unknownStackTrace( | ||
new WebSocketHandshakeException("handshake timed out"), | ||
WebSocketClientProtocolHandshakeHandler.class, | ||
"channelActive(...)"); | ||
private static final long DEFAULT_HANDSHAKE_TIMEOUT_MS = 10000L; | ||
|
||
private final WebSocketClientHandshaker handshaker; | ||
private final long handshakeTimeoutMillis; | ||
private ChannelHandlerContext ctx; | ||
private ChannelPromise handshakePromise; | ||
|
||
WebSocketClientProtocolHandshakeHandler(WebSocketClientHandshaker handshaker) { | ||
this(handshaker, DEFAULT_HANDSHAKE_TIMEOUT_MS); | ||
} | ||
|
||
WebSocketClientProtocolHandshakeHandler(WebSocketClientHandshaker handshaker, long handshakeTimeoutMillis) { | ||
this.handshaker = handshaker; | ||
this.handshakeTimeoutMillis = checkPositive(handshakeTimeoutMillis, "handshakeTimeoutMillis"); | ||
} | ||
|
||
@Override | ||
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { | ||
this.ctx = ctx; | ||
handshakePromise = ctx.newPromise(); | ||
} | ||
|
||
@Override | ||
|
@@ -35,13 +65,15 @@ public void channelActive(final ChannelHandlerContext ctx) throws Exception { | |
@Override | ||
public void operationComplete(ChannelFuture future) throws Exception { | ||
if (!future.isSuccess()) { | ||
handshakePromise.tryFailure(future.cause()); | ||
ctx.fireExceptionCaught(future.cause()); | ||
} else { | ||
ctx.fireUserEventTriggered( | ||
WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_ISSUED); | ||
} | ||
} | ||
}); | ||
applyHandshakeTimeout(); | ||
} | ||
|
||
@Override | ||
|
@@ -55,6 +87,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception | |
try { | ||
if (!handshaker.isHandshakeComplete()) { | ||
handshaker.finishHandshake(ctx.channel(), response); | ||
handshakePromise.trySuccess(); | ||
ctx.fireUserEventTriggered( | ||
WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE); | ||
ctx.pipeline().remove(this); | ||
|
@@ -65,4 +98,43 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception | |
response.release(); | ||
} | ||
} | ||
|
||
private void applyHandshakeTimeout() { | ||
final ChannelPromise localHandshakePromise = handshakePromise; | ||
if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) { | ||
return; | ||
} | ||
|
||
final ScheduledFuture<?> timeoutFuture = ctx.executor().schedule(new Runnable() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this could be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done! |
||
@Override | ||
public void run() { | ||
if (localHandshakePromise.isDone()) { | ||
return; | ||
} | ||
|
||
if (localHandshakePromise.tryFailure(HANDSHAKE_TIMED_OUT_EXCEPTION)) { | ||
ctx.flush() | ||
.fireUserEventTriggered(ClientHandshakeStateEvent.HANDSHAKE_TIMEOUT) | ||
.close(); | ||
} | ||
} | ||
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS); | ||
|
||
// Cancel the handshake timeout when handshake is finished. | ||
localHandshakePromise.addListener(new FutureListener<Void>() { | ||
@Override | ||
public void operationComplete(Future<Void> f) throws Exception { | ||
timeoutFuture.cancel(false); | ||
} | ||
}); | ||
} | ||
|
||
/** | ||
* This method is visible for testing. | ||
* | ||
* @return current handshake future | ||
*/ | ||
ChannelFuture getHandshakeFuture() { | ||
return handshakePromise; | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we also need to check if
localHandshakePromise == null
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handshakePromise
is initialized when the handle is added. Is there a case wherehandshakePromise
is empty?