Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JBTM-2982 Drive LRA participants from a separate thread #1332

Merged
merged 1 commit into from Jun 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -21,6 +21,8 @@
*/
package io.narayana.lra.client;

import javax.ws.rs.client.ClientRequestContext;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.core.MultivaluedMap;
import java.net.URL;
import java.util.HashMap;
Expand Down Expand Up @@ -123,13 +125,45 @@ public static void push(URL lraId) {
}
}

public static void updateLRAContext(MultivaluedMap<String, Object> headers) {
/**
* If there is an LRA context on the calling thread then add it to the provided headers
*
* @param responseContext the header map to add the KRA context to
*/
public static void updateLRAContext(ContainerResponseContext responseContext) {
URL lraId = Current.peek();

if (lraId != null)
headers.putSingle(LRA_HTTP_HEADER, lraId);
responseContext.getHeaders().putSingle(LRA_HTTP_HEADER, lraId);
else
headers.remove(LRA_HTTP_HEADER);
responseContext.getHeaders().remove(LRA_HTTP_HEADER);
}

public static void updateLRAContext(URL lraId, MultivaluedMap<String, String> headers) {
headers.putSingle(LRA_HTTP_HEADER, lraId.toString());
push(lraId);
}

/**
* If there is an LRA context on the calling thread then make it available as
* a header on outgoing JAX-RS invocations
*
* @param context the context for the JAX-RS request
*/
public static void updateLRAContext(ClientRequestContext context) {
URL lraId = Current.peek();

if (lraId != null) {
context.getHeaders().putSingle(LRA_HTTP_HEADER, lraId);
} else {
Object lraContext = context.getProperty(LRA_HTTP_HEADER);

if (lraContext != null) {
context.getHeaders().putSingle(LRA_HTTP_HEADER, lraContext);
} else {
context.getHeaders().remove(LRA_HTTP_HEADER);
}
}
}

public static void popAll() {
Expand All @@ -140,10 +174,4 @@ public static void clearContext(MultivaluedMap<String, String> headers) {
headers.remove(LRA_HTTP_HEADER);
popAll();
}

public static void updateLRAContext(URL lraId, MultivaluedMap<String, String> headers) {
headers.putSingle(LRA_HTTP_HEADER, lraId.toString());
push(lraId);
}

}
Expand Up @@ -239,8 +239,6 @@ private int doEnd(boolean compensate) {

private int tryDoEnd(boolean compensate) {
URL endPath;
boolean isRecovering = status != null &&
(status.equals(CompensatorStatus.Compensating) || status.equals(CompensatorStatus.Completing));

// cancel any timer associated with this participant
if (scheduledAbort != null) {
Expand Down Expand Up @@ -271,8 +269,6 @@ private int tryDoEnd(boolean compensate) {

// NB trying to compensate when already completed is allowed (for nested LRAs)

Current.push(lraId); // make sure the lra id is set so that it can be included in the headers

int httpStatus = -1;

if (accepted) {
Expand All @@ -293,23 +289,15 @@ private int tryDoEnd(boolean compensate) {
WebTarget target = client.target(URI.create(endPath.toExternalForm()));

try {
Response response;

// ask the participant to complete or compensate
if (isRecovering) {
Future<Response> asyncResponse = target.request()
.header(LRA_HTTP_HEADER, lraId.toExternalForm())
.header(LRA_HTTP_RECOVERY_HEADER, recoveryURL.toExternalForm())
.async()
.put(Entity.entity(compensatorData, MediaType.APPLICATION_JSON));
// the catch block below catches any Timeout exception
response = asyncResponse.get(PARTICIPANT_TIMEOUT, TimeUnit.SECONDS);
} else {
response = target.request()
.header(LRA_HTTP_HEADER, lraId.toExternalForm())
.header(LRA_HTTP_RECOVERY_HEADER, recoveryURL.toExternalForm())
.put(Entity.entity(compensatorData, MediaType.APPLICATION_JSON));
}
Future<Response> asyncResponse = target.request()
.header(LRA_HTTP_HEADER, lraId.toExternalForm())
.header(LRA_HTTP_RECOVERY_HEADER, recoveryURL.toExternalForm())
.property(LRA_HTTP_HEADER, lraId) // make the context available to the jaxrs filters
.async()
.put(Entity.entity(compensatorData, MediaType.APPLICATION_JSON));
// the catch block below catches any Timeout exception
Response response = asyncResponse.get(PARTICIPANT_TIMEOUT, TimeUnit.SECONDS);

httpStatus = response.getStatus();

Expand All @@ -332,8 +320,10 @@ private int tryDoEnd(boolean compensate) {
}

if (httpStatus == Response.Status.NOT_FOUND.getStatusCode()) {
updateStatus(compensate);
return TwoPhaseOutcome.FINISH_OK; // the participant must have finished ok but we lost the response
}

if (response.hasEntity())
responseData = response.readEntity(String.class);
} catch (Exception e) {
Expand Down Expand Up @@ -379,13 +369,17 @@ private int tryDoEnd(boolean compensate) {
return TwoPhaseOutcome.FINISH_ERROR;
}

updateStatus(compensate);

// if the the request is still in progress (ie accepted is true) let recovery finish it
return accepted ? TwoPhaseOutcome.HEURISTIC_HAZARD : TwoPhaseOutcome.FINISH_OK;
}

private void updateStatus(boolean compensate) {
if (compensate)
status = accepted ? CompensatorStatus.Compensating : CompensatorStatus.Compensated;
else
status = accepted ? CompensatorStatus.Completing : CompensatorStatus.Completed;

// if the the request is still in progress (ie accepted is true) let recovery finish it
return accepted ? TwoPhaseOutcome.HEURISTIC_HAZARD : TwoPhaseOutcome.FINISH_OK;
}

private int reportFailure(boolean compensate, String endPath) {
Expand Down Expand Up @@ -438,12 +432,12 @@ private int retryGetEndStatus(URL endPath, boolean compensate) {

try {
WebTarget target = client.target(statusURI.toURI());
Current.push(lraId);

// since this method is called from the recovery thread do not block
Future<Response> asyncResponse = target.request()
.header(LRA_HTTP_HEADER, lraId.toExternalForm())
.header(LRA_HTTP_RECOVERY_HEADER, recoveryURL.toExternalForm())
.property(LRA_HTTP_HEADER, lraId) // make the context available to the jaxrs filters
.async()
.get();

Expand Down Expand Up @@ -482,15 +476,17 @@ private int retryGetEndStatus(URL endPath, boolean compensate) {

if (forgetURI != null) {
try {
Current.push(lraId);
// let the participant know he can clean up
WebTarget target2 = client.target(forgetURI.toURI());
Response response2 = target2.request()
Future<Response> asyncResponse2 = target2.request()
.header(LRA_HTTP_HEADER, lraId.toExternalForm())
.header(LRA_HTTP_RECOVERY_HEADER, recoveryURL.toExternalForm())
.property(LRA_HTTP_HEADER, lraId) // make the context available to the jaxrs filters
.async()
.delete();

if (response2.getStatus() == Response.Status.OK.getStatusCode())
if (asyncResponse2.get(PARTICIPANT_TIMEOUT, TimeUnit.SECONDS).getStatus() ==
Response.Status.OK.getStatusCode())
return TwoPhaseOutcome.FINISH_OK;
} catch (Exception e) {
if (LRALogger.logger.isInfoEnabled()) {
Expand Down
Expand Up @@ -31,6 +31,6 @@ public class ClientLRARequestFilter implements ClientRequestFilter {
@Override
public void filter(ClientRequestContext context) throws IOException {
// NB the following overrides what the caller did with the LRA context header
Current.updateLRAContext(context.getHeaders());
Current.updateLRAContext(context);
}
}
Expand Up @@ -35,7 +35,10 @@ public class ClientLRAResponseFilter implements ClientResponseFilter {
@Override
public void filter(ClientRequestContext requestContext, ClientResponseContext responseContext) throws IOException {
Object incomingLRA = responseContext.getHeaders().getFirst(LRA_HTTP_HEADER);
// Object outgoingLRA = requestContext.getHeaders().getFirst(LRA_HTTP_HEADER);

if (incomingLRA == null) {
incomingLRA = requestContext.getProperty(LRA_HTTP_HEADER);
}

/*
* if the incoming response contains a context make it the current one
Expand Down
Expand Up @@ -118,6 +118,7 @@ private void checkForTx(LRA.Type type, URL lraId, boolean shouldNotBeNull) {

@Override
public void filter(ContainerRequestContext containerRequestContext) throws IOException {
// TODO filters for asynchronous JAX-RS motheods should not throw exceptions
Method method = resourceInfo.getResourceMethod();
MultivaluedMap<String, String> headers = containerRequestContext.getHeaders();
LRA.Type type = null;
Expand Down Expand Up @@ -164,8 +165,16 @@ public void filter(ContainerRequestContext containerRequestContext) throws IOExc
return; // not transactional
}

if (headers.containsKey(LRA_HTTP_HEADER))
incommingLRA = new URL(headers.getFirst(LRA_HTTP_HEADER)); // TODO filters for asynchronous JAX-RS motheods should not throw exceptions
// check the incomming request for an LRA context
if (headers.containsKey(LRA_HTTP_HEADER)) {
incommingLRA = new URL(headers.getFirst(LRA_HTTP_HEADER));
} else {
Object lraContext = containerRequestContext.getProperty(LRA_HTTP_HEADER);

if (lraContext != null) {
incommingLRA = (URL) lraContext;
}
}

if (endAnnotation && incommingLRA == null)
return;
Expand Down Expand Up @@ -391,7 +400,7 @@ public void filter(ContainerRequestContext requestContext, ContainerResponseCont
if (suspendedLRA != null)
Current.push((URL) suspendedLRA);

Current.updateLRAContext(responseContext.getHeaders());
Current.updateLRAContext(responseContext);

Current.popAll();
}
Expand Down
8 changes: 4 additions & 4 deletions rts/lra/lra-test/pom.xml
Expand Up @@ -32,17 +32,17 @@
<service.http.port>${swarm.http.port}</service.http.port>

<!-- config for the coordinator, for exampe to enable debugging use
-Xrunjdwp:transport=dt_socket,address=${coordinator.debug.port},server=y,suspend=y
<coordinator.debug.params>${coordinator.debug.jdwp}</coordinator.debug.params>
-->
<coordinator.debug.port>8787</coordinator.debug.port>
<coordinator.debug.jdwp>-Xrunjdwp:transport=dt_socket,address=${coordinator.debug.port},server=y,suspend=y</coordinator.debug.jdwp>
<coordinator.debug.jdwp>-Xrunjdwp:transport=dt_socket,address=${coordinator.debug.port},server=y,suspend=n</coordinator.debug.jdwp>
<coordinator.debug.params></coordinator.debug.params>

<!-- config for the test JAX-RS resource, for exampe to enable debugging use
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need here this comment when the swarm.debug.jdwp attribute covers this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean, do you want me to add an explanation of how the debugging config works?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. I'm sorry for not being clear. I just meant that if there is declared coordinator.debug.jdwp with debug parameters the comment could be removed completely. The information is doubled.

-Xrunjdwp:transport=dt_socket,address=${swarm.debug.port},server=y,suspend=y
<swarm.debug.params>${swarm.debug.jdwp}</swarm.debug.params>
-->
<swarm.debug.port>5006</swarm.debug.port>
<swarm.debug.jdwp>-Xrunjdwp:transport=dt_socket,address=${swarm.debug.port},server=y,suspend=y</swarm.debug.jdwp>
<swarm.debug.jdwp>-Xrunjdwp:transport=dt_socket,address=${swarm.debug.port},server=y,suspend=n</swarm.debug.jdwp>
<swarm.debug.params></swarm.debug.params>
<swarm.logging.params></swarm.logging.params>

Expand Down
Expand Up @@ -560,8 +560,31 @@ private void endCheck(Activity activity) {
activity.setHow(null);
activity.setArg(null);

if ("wait".equals(how) && arg != null && "recovery".equals(arg)) {
lraClient.getRecoveringLRAs();
if ("wait".equals(how)) {
if (arg != null) {
if ("recovery".equals(arg)) {
/*
* during end processing we delay the response by triggering a recovery scan
* which tests that the coordinator can handle slow participants and is able
* to tolerate recovery running when there are outstanding participant
* completion calls.
*/
lraClient.getRecoveringLRAs(); // run a recovery scan
} else {
int ms = 0;

try {
ms = Integer.getInteger(arg, 0);
} catch (Exception ignore) {
}

try {
Thread.sleep(ms <= 0 ? Long.MAX_VALUE : ms); // delay the end call
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to understand the rationalization of giving to user a way of delaying end call?
It's seems as some workaround to something. I can be a wrong in understanding but letting the client to drive timing from outside sounds to me supportive of race conditions.
I know we discussed some timeouts to be involved but I can't connect this with the discussion.

Copy link
Contributor Author

@mmusgrov mmusgrov Jun 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a test. The intent is to show that LRA can cope with participants that are slow in responding to the complete/compensate calls or, worse, participants that hang.

See the following tests that inject problems in participant end processing:

SpecIT#closeLRAWaitForRecovery
SpecIT#closeLRAWaitIndefinitely
SpecIT#connectionHangup

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ouch, I see. Even I was checking the package I missed that. I'm sorry, this is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I push a commit for the indentation and remove the jdwp debug comment will that fix your issues with this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that I also added a comment to indicate what the recovery scan is testing here.

} catch (InterruptedException e) {
LRALogger.logger.info("endCheck wait interrupted");
}
}
}
} else if ("exception".equals(how)) {
Exception cause = null;

Expand Down