Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package oracle.nosql.driver.httpclient;

import static oracle.nosql.driver.util.LogUtil.isFineEnabled;
import static oracle.nosql.driver.util.LogUtil.logFine;
import static oracle.nosql.driver.util.LogUtil.logInfo;
import static oracle.nosql.driver.util.LogUtil.logWarning;

Expand Down Expand Up @@ -309,6 +311,18 @@ public Channel getChannel(int timeoutMs)
* Ensure that the channel is in good shape
*/
if (fut.isSuccess() && retChan.isActive()) {
/*
* Clear out any previous state. The channel should not
* have any state associated with it, but this code is here
* just in case it does.
*/
if (retChan.attr(STATE_KEY).get() != null) {
if (isFineEnabled(logger)) {
logFine(logger, "HttpClient acquired a channel with " +
"a still-active state: clearing.");
}
retChan.attr(STATE_KEY).set(null);
}
return retChan;
}
logInfo(logger,
Expand All @@ -319,6 +333,9 @@ public Channel getChannel(int timeoutMs)
}

public void releaseChannel(Channel channel) {
/* Clear any response handler state from channel before releasing it */
channel.attr(STATE_KEY).set(null);

/*
* If channel is not healthy/active it will be closed and removed
* from the pool. Don't wait for completion.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@

package oracle.nosql.driver.httpclient;

import static oracle.nosql.driver.util.HttpConstants.REQUEST_ID_HEADER;
import static oracle.nosql.driver.util.LogUtil.isFineEnabled;
import static oracle.nosql.driver.util.LogUtil.logFine;
import static oracle.nosql.driver.util.LogUtil.logWarning;

import java.io.IOException;
Expand Down Expand Up @@ -43,6 +46,26 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {

if (msg instanceof FullHttpResponse) {
FullHttpResponse fhr = (FullHttpResponse) msg;

if (state == null) {
/*
* This message came in after the client was done processing
* a request in a different thread.
* The client may have timed out waiting for this message.
* Discard the message by releasing it and not calling receive().
*/
if (isFineEnabled(logger)) {
String requestId = fhr.headers().get(REQUEST_ID_HEADER);
if (requestId == null) {
requestId = "(none)";
}
logFine(logger, "Discarding message with no response " +
"handler. requestId=" + requestId);
}
fhr.release();
return;
}

state.setResponse(fhr);

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,14 +180,24 @@ void receive(RequestState requestState) {
String resReqId = requestState.getHeaders().get(REQUEST_ID_HEADER);
if (resReqId == null || !resReqId.equals(requestId)) {
logFine(logger,
"Discards unpaired response: expect for request " +
"Discarding unpaired response: expect for request " +
requestId + ", but for request " + resReqId);
if (requestState.getResponse() != null) {
ReferenceCountUtil.release(requestState.getResponse());
}
return;
}
}

/*
* We got a valid message: don't accept any more for this handler.
* This logic may change if we enable full async and allow multiple
* messages to be processed by the same channel for the same client.
* This clears the response handler from this channel so that any
* additional messages on this channel will be properly discarded.
*/
channel.attr(HttpClient.STATE_KEY).set(null);

state = requestState;
try {
responseReceived(state.getStatus(),
Expand Down