Skip to content

Commit

Permalink
Add onError callback to DeferredResult
Browse files Browse the repository at this point in the history
Issue: SPR-15614
  • Loading branch information
violetagg authored and rstoyanchev committed Jun 26, 2017
1 parent 140542e commit e0678ba
Show file tree
Hide file tree
Showing 27 changed files with 769 additions and 39 deletions.
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,7 +36,7 @@
*
* Ensures the following:
* 1) The session is bound/unbound when "callable processing" is started
* 2) The session is closed if an async request times out
* 2) The session is closed if an async request times out or an error occurred
*
* @author Rossen Stoyanchev
* @since 4.2
Expand All @@ -51,6 +51,8 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple

private volatile boolean timeoutInProgress;

private volatile boolean errorInProgress;


public AsyncRequestInterceptor(SessionFactory sessionFactory, SessionHolder sessionHolder) {
this.sessionFactory = sessionFactory;
Expand All @@ -65,6 +67,7 @@ public <T> void preProcess(NativeWebRequest request, Callable<T> task) {

public void bindSession() {
this.timeoutInProgress = false;
this.errorInProgress = false;
TransactionSynchronizationManager.bindResource(this.sessionFactory, this.sessionHolder);
}

Expand All @@ -79,14 +82,20 @@ public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) {
return RESULT_NONE; // give other interceptors a chance to handle the timeout
}

@Override
public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) {
this.errorInProgress = true;
return RESULT_NONE; // give other interceptors a chance to handle the error
}

@Override
public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception {
closeAfterTimeout();
closeSession();
}

private void closeAfterTimeout() {
if (this.timeoutInProgress) {
logger.debug("Closing Hibernate Session after async request timeout");
private void closeSession() {
if (this.timeoutInProgress || this.errorInProgress) {
logger.debug("Closing Hibernate Session after async request timeout/error");
SessionFactoryUtils.closeSession(this.sessionHolder.getSession());
}
}
Expand All @@ -112,9 +121,15 @@ public <T> boolean handleTimeout(NativeWebRequest request, DeferredResult<T> def
return true; // give other interceptors a chance to handle the timeout
}

@Override
public <T> boolean handleError(NativeWebRequest request, DeferredResult<T> deferredResult, Throwable t) {
this.errorInProgress = true;
return true; // give other interceptors a chance to handle the error
}

@Override
public <T> void afterCompletion(NativeWebRequest request, DeferredResult<T> deferredResult) {
closeAfterTimeout();
closeSession();
}

}
Expand Up @@ -36,7 +36,7 @@
*
* Ensures the following:
* 1) The session is bound/unbound when "callable processing" is started
* 2) The session is closed if an async request times out
* 2) The session is closed if an async request times out or an error occurred
*
* @author Rossen Stoyanchev
* @since 3.2.5
Expand All @@ -51,6 +51,8 @@ class AsyncRequestInterceptor extends CallableProcessingInterceptorAdapter imple

private volatile boolean timeoutInProgress;

private volatile boolean errorInProgress;


public AsyncRequestInterceptor(EntityManagerFactory emFactory, EntityManagerHolder emHolder) {
this.emFactory = emFactory;
Expand All @@ -65,6 +67,7 @@ public <T> void preProcess(NativeWebRequest request, Callable<T> task) {

public void bindEntityManager() {
this.timeoutInProgress = false;
this.errorInProgress = false;
TransactionSynchronizationManager.bindResource(this.emFactory, this.emHolder);
}

Expand All @@ -79,14 +82,20 @@ public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) {
return RESULT_NONE; // give other interceptors a chance to handle the timeout
}

@Override
public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) {
this.errorInProgress = true;
return RESULT_NONE; // give other interceptors a chance to handle the error
}

@Override
public <T> void afterCompletion(NativeWebRequest request, Callable<T> task) throws Exception {
closeAfterTimeout();
closeEntityManager();
}

private void closeAfterTimeout() {
if (this.timeoutInProgress) {
logger.debug("Closing JPA EntityManager after async request timeout");
private void closeEntityManager() {
if (this.timeoutInProgress || this.errorInProgress) {
logger.debug("Closing JPA EntityManager after async request timeout/error");
EntityManagerFactoryUtils.closeEntityManager(emHolder.getEntityManager());
}
}
Expand All @@ -112,9 +121,15 @@ public <T> boolean handleTimeout(NativeWebRequest request, DeferredResult<T> def
return true; // give other interceptors a chance to handle the timeout
}

@Override
public <T> boolean handleError(NativeWebRequest request, DeferredResult<T> deferredResult, Throwable t) {
this.errorInProgress = true;
return true; // give other interceptors a chance to handle the error
}

@Override
public <T> void afterCompletion(NativeWebRequest request, DeferredResult<T> deferredResult) {
closeAfterTimeout();
closeEntityManager();
}

}
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -228,6 +228,48 @@ public String call() throws Exception {
verify(this.manager).close();
}

@Test
public void testOpenEntityManagerInViewInterceptorAsyncErrorScenario() throws Exception {

// Initial request thread

OpenEntityManagerInViewInterceptor interceptor = new OpenEntityManagerInViewInterceptor();
interceptor.setEntityManagerFactory(factory);

given(this.factory.createEntityManager()).willReturn(this.manager);

interceptor.preHandle(this.webRequest);
assertTrue(TransactionSynchronizationManager.hasResource(this.factory));

AsyncWebRequest asyncWebRequest = new StandardServletAsyncWebRequest(this.request, this.response);
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(this.request);
asyncManager.setTaskExecutor(new SyncTaskExecutor());
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.startCallableProcessing(new Callable<String>() {
@Override
public String call() throws Exception {
return "anything";
}
});

interceptor.afterConcurrentHandlingStarted(this.webRequest);
assertFalse(TransactionSynchronizationManager.hasResource(this.factory));

// Async request timeout

given(this.manager.isOpen()).willReturn(true);

MockAsyncContext asyncContext = (MockAsyncContext) this.request.getAsyncContext();
for (AsyncListener listener : asyncContext.getListeners()) {
listener.onError(new AsyncEvent(asyncContext, new Exception()));
}
for (AsyncListener listener : asyncContext.getListeners()) {
listener.onComplete(new AsyncEvent(asyncContext));
}

verify(this.manager).close();
}

@Test
public void testOpenEntityManagerInViewFilter() throws Exception {
given(manager.isOpen()).willReturn(true);
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.web.context.request.async;

import java.util.function.Consumer;

import org.springframework.lang.Nullable;
import org.springframework.web.context.request.NativeWebRequest;

Expand All @@ -42,7 +44,13 @@ public interface AsyncWebRequest extends NativeWebRequest {
void addTimeoutHandler(Runnable runnable);

/**
* Add a handle to invoke when request processing completes.
* Add a handler to invoke when an error occurred while concurrent
* handling of a request.
*/
void addErrorHandler(Consumer<Throwable> exceptionHandler);

/**
* Add a handler to invoke when request processing completes.
*/
void addCompletionHandler(Runnable runnable);

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -94,6 +94,24 @@ else if (result != CallableProcessingInterceptor.RESULT_NONE) {
return CallableProcessingInterceptor.RESULT_NONE;
}

public Object triggerAfterError(NativeWebRequest request, Callable<?> task, Throwable throwable) {
for (CallableProcessingInterceptor interceptor : this.interceptors) {
try {
Object result = interceptor.handleError(request, task, throwable);
if (result == CallableProcessingInterceptor.RESPONSE_HANDLED) {
break;
}
else if (result != CallableProcessingInterceptor.RESULT_NONE) {
return result;
}
}
catch (Throwable t) {
return t;
}
}
return CallableProcessingInterceptor.RESULT_NONE;
}

public void triggerAfterCompletion(NativeWebRequest request, Callable<?> task) {
for (int i = this.interceptors.size()-1; i >= 0; i--) {
try {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,15 +28,15 @@
*
* <p>A {@code CallableProcessingInterceptor} is invoked before and after the
* invocation of the {@code Callable} task in the asynchronous thread, as well
* as on timeout from a container thread, or after completing for any reason
* as on timeout/error from a container thread, or after completing for any reason
* including a timeout or network error.
*
* <p>As a general rule exceptions raised by interceptor methods will cause
* async processing to resume by dispatching back to the container and using
* the Exception instance as the concurrent result. Such exceptions will then
* be processed through the {@code HandlerExceptionResolver} mechanism.
*
* <p>The {@link #handleTimeout(NativeWebRequest, Callable) afterTimeout} method
* <p>The {@link #handleTimeout(NativeWebRequest, Callable) handleTimeout} method
* can select a value to be used to resume processing.
*
* @author Rossen Stoyanchev
Expand Down Expand Up @@ -101,6 +101,21 @@ public interface CallableProcessingInterceptor {
*/
<T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws Exception;

/**
* Invoked from a container thread when an error occurred while processing the async request
* before the {@code Callable} task completes. Implementations may return a value,
* including an {@link Exception}, to use instead of the value the
* {@link Callable} did not return in time.
* @param request the current request
* @param task the task for the current async request
* @paramt t the error that occurred while request processing
* @return a concurrent result value; if the value is anything other than
* {@link #RESULT_NONE} or {@link #RESPONSE_HANDLED}, concurrent processing
* is resumed and subsequent interceptors are not invoked
* @throws Exception in case of errors
*/
<T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) throws Exception;

/**
* Invoked from a container thread when async processing completes for any
* reason including timeout or network error.
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2015 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,6 +60,15 @@ public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) thro
return RESULT_NONE;
}

/**
* This implementation always returns
* {@link CallableProcessingInterceptor#RESULT_NONE RESULT_NONE}.
*/
@Override
public <T> Object handleError(NativeWebRequest request, Callable<T> task, Throwable t) throws Exception {
return RESULT_NONE;
}

/**
* This implementation is empty.
*/
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.util.PriorityQueue;
import java.util.concurrent.Callable;
import java.util.function.Consumer;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -61,6 +62,8 @@ public class DeferredResult<T> {

private Runnable timeoutCallback;

private Consumer<Throwable> errorCallback;

private Runnable completionCallback;

private DeferredResultHandler resultHandler;
Expand Down Expand Up @@ -150,6 +153,17 @@ public void onTimeout(Runnable callback) {
this.timeoutCallback = callback;
}

/**
* Register code to invoke when an error occurred while processing the async request.
* <p>This method is called from a container thread when an error occurred while
* processing an async request before the {@code DeferredResult} has been populated.
* It may invoke {@link DeferredResult#setResult setResult} or
* {@link DeferredResult#setErrorResult setErrorResult} to resume processing.
*/
public void onError(Consumer<Throwable> callback) {
this.errorCallback = callback;
}

/**
* Register code to invoke when the async request completes.
* <p>This method is called from a container thread when an async request
Expand Down Expand Up @@ -275,6 +289,25 @@ public <S> boolean handleTimeout(NativeWebRequest request, DeferredResult<S> def
return continueProcessing;
}
@Override
public <S> boolean handleError(NativeWebRequest request, DeferredResult<S> deferredResult, Throwable t) {
boolean continueProcessing = true;
try {
if (errorCallback != null) {
errorCallback.accept(t);
}
}
finally {
continueProcessing = false;
try {
setResultInternal(t);
}
catch (Throwable ex) {
logger.debug("Failed to handle error result", ex);
}
}
return continueProcessing;
}
@Override
public <S> void afterCompletion(NativeWebRequest request, DeferredResult<S> deferredResult) {
expired = true;
if (completionCallback != null) {
Expand Down

0 comments on commit e0678ba

Please sign in to comment.