Skip to content

Commit

Permalink
Feature: Saga state language support "Retry" a service when error occ…
Browse files Browse the repository at this point in the history
…urred apache#1899
  • Loading branch information
long187 committed Nov 14, 2019
1 parent ab9a24c commit d9b5af8
Show file tree
Hide file tree
Showing 22 changed files with 621 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,12 @@
*/
public interface ServiceInvoker {

Object invoke(ServiceTaskState serviceTaskState, Object... input);
/**
* invoke service
* @param serviceTaskState
* @param input
* @return
* @throws Throwable
*/
Object invoke(ServiceTaskState serviceTaskState, Object... input) throws Throwable;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@
import java.lang.reflect.Modifier;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.Feature;

import io.seata.common.exception.FrameworkErrorCode;
import io.seata.saga.engine.exception.EngineExecutionException;
import io.seata.saga.engine.invoker.ServiceInvoker;
import io.seata.saga.engine.pcext.handlers.ServiceTaskStateHandler;
import io.seata.saga.engine.utils.ExceptionUtils;
import io.seata.saga.statelang.domain.ServiceTaskState;
import io.seata.saga.statelang.domain.TaskState.Retry;
import io.seata.saga.statelang.domain.impl.ServiceTaskStateImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,27 +57,31 @@ public class SpringBeanServiceInvoker implements ServiceInvoker, ApplicationCont
private ThreadPoolExecutor threadPoolExecutor;

@Override
public Object invoke(ServiceTaskState serviceTaskState, Object... input) {
ServiceTaskStateImpl state = (ServiceTaskStateImpl)serviceTaskState;
public Object invoke(ServiceTaskState serviceTaskState, Object... input) throws Throwable {
ServiceTaskStateImpl state = (ServiceTaskStateImpl) serviceTaskState;
if (state.isAsync()) {
if (threadPoolExecutor == null) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(
"threadPoolExecutor is null, Service[{}.{}] cannot execute asynchronously, executing "
+ "synchronously now. stateName: {}",
state.getServiceName(), state.getServiceMethod(), state.getName());
"threadPoolExecutor is null, Service[{}.{}] cannot execute asynchronously, executing "
+ "synchronously now. stateName: {}",
state.getServiceName(), state.getServiceMethod(), state.getName());
}
return doInvoke(state, input);
}

if (LOGGER.isInfoEnabled()) {
LOGGER.info("Submit Service[{}.{}] to asynchronously executing. stateName: {}", state.getServiceName(),
state.getServiceMethod(), state.getName());
state.getServiceMethod(), state.getName());
}
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
doInvoke(state, input);
try {
doInvoke(state, input);
} catch (Throwable e) {
LOGGER.error("Invoke Service[" + state.getServiceName() + "." + state.getServiceMethod() + "] failed.", e);
}
}
});
return null;
Expand All @@ -79,7 +90,7 @@ public void run() {
}
}

protected Object doInvoke(ServiceTaskStateImpl state, Object[] input) {
protected Object doInvoke(ServiceTaskStateImpl state, Object[] input) throws Throwable {

Object bean = applicationContext.getBean(state.getServiceName());

Expand All @@ -98,8 +109,8 @@ protected Object doInvoke(ServiceTaskStateImpl state, Object[] input) {

if (method == null) {
throw new EngineExecutionException(
"No such method[" + state.getServiceMethod() + "] on BeanClass[" + bean.getClass() + "]",
FrameworkErrorCode.NoSuchMethod);
"No such method[" + state.getServiceMethod() + "] on BeanClass[" + bean.getClass() + "]",
FrameworkErrorCode.NoSuchMethod);

}

Expand All @@ -114,11 +125,115 @@ protected Object doInvoke(ServiceTaskStateImpl state, Object[] input) {
}
} catch (Exception e) {
throw new EngineExecutionException(e,
"Input to java object error, Method[" + state.getServiceMethod() + "] on BeanClass[" + bean.getClass()
+ "]", FrameworkErrorCode.InvalidParameter);
"Input to java object error, Method[" + state.getServiceMethod() + "] on BeanClass[" + bean.getClass()
+ "]", FrameworkErrorCode.InvalidParameter);
}

if (!Modifier.isPublic(method.getModifiers())) {
throw new EngineExecutionException("Method[" + method.getName() + "] must be public",
FrameworkErrorCode.MethodNotPublic);
}

Map<Retry, AtomicInteger> retryCountMap = new HashMap<>();
while (true) {

try {
return invokeMethod(bean, method, args);
} catch (Throwable e) {

Retry matchedRetryConfig = matchRetryConfig(state.getRetry(), e);
if (matchedRetryConfig == null) {
throw e;
}

if (!retryCountMap.containsKey(matchedRetryConfig)) {
retryCountMap.put(matchedRetryConfig, new AtomicInteger(0));
}

AtomicInteger retryCount = retryCountMap.get(matchedRetryConfig);
if (retryCount.intValue() >= matchedRetryConfig.getMaxAttempts()) {
throw e;
}

double intervalSeconds = matchedRetryConfig.getIntervalSeconds();
double backoffRate = matchedRetryConfig.getBackoffRate();
long currentInterval = (long) (retryCount.intValue() > 0 ?
(intervalSeconds * backoffRate * retryCount.intValue() * 1000) : (intervalSeconds * 1000));

if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Invoke Service[" + state.getServiceName() + "." + state.getServiceMethod() + "] failed, will retry after "
+ currentInterval + " millis, current retry count: " + retryCount.intValue(), e);
}
try {
Thread.sleep(currentInterval);
} catch (InterruptedException e1) {
LOGGER.warn("Retry interval sleep error", e1);
}
retryCount.incrementAndGet();
}
}
}

return invokeMethod(bean, method, args);
private Retry matchRetryConfig(List<Retry> retryList, Throwable e) {

if (retryList != null && retryList.size() > 0) {
for (Retry retryConfig : retryList) {

List<String> exceptions = retryConfig.getExceptions();
if (exceptions == null || exceptions.size() == 0) {
// Exceptions not configured, Match current exception if it is NetException.
if (ExceptionUtils.isNetException(e)) {
return retryConfig;
}
} else {

List<Class<? extends Exception>> exceptionClasses = retryConfig.getExceptionClasses();
if (exceptionClasses == null) {
synchronized (retryConfig) {
exceptionClasses = retryConfig.getExceptionClasses();
if (exceptionClasses == null) {

exceptionClasses = new ArrayList<>(exceptions.size());
for (String expStr : exceptions) {

Class<? extends Exception> expClass = null;
try {
expClass = (Class<? extends Exception>) ServiceTaskStateHandler.class
.getClassLoader().loadClass(expStr);
} catch (Exception e1) {

LOGGER.warn("Cannot Load Exception Class by getClass().getClassLoader()", e1);

try {
expClass = (Class<? extends Exception>) Thread.currentThread()
.getContextClassLoader().loadClass(expStr);
} catch (Exception e2) {
LOGGER.warn(
"Cannot Load Exception Class by Thread.currentThread()"
+ ".getContextClassLoader()",
e2);
}
}

if (expClass != null) {
exceptionClasses.add(expClass);
}
}
retryConfig.setExceptionClasses(exceptionClasses);
}
}
}

for (Class<? extends Exception> expClass : exceptionClasses) {
if (expClass.isAssignableFrom(e.getClass())) {
return retryConfig;
}
}

}
}
}
return null;
}

@Override
Expand Down Expand Up @@ -161,17 +276,12 @@ protected Class classForName(String className) {
}
if (clazz == null) {
throw new EngineExecutionException("Parameter class not found [" + className + "]",
FrameworkErrorCode.ObjectNotExists);
FrameworkErrorCode.ObjectNotExists);
}
return clazz;
}

protected Object invokeMethod(Object serviceBean, Method method, Object... input) {

if (!Modifier.isPublic(method.getModifiers())) {
throw new EngineExecutionException("Method[" + method.getName() + "] must be public",
FrameworkErrorCode.MethodNotPublic);
}
protected Object invokeMethod(Object serviceBean, Method method, Object... input) throws Throwable {
try {
return method.invoke(serviceBean, input);
} catch (InvocationTargetException e) {
Expand All @@ -180,14 +290,7 @@ protected Object invokeMethod(Object serviceBean, Method method, Object... input
throw new EngineExecutionException(e, e.getMessage(), FrameworkErrorCode.MethodInvokeError);
}

if (targetExp instanceof RuntimeException) {
throw (RuntimeException)targetExp;
} else {
throw new EngineExecutionException(targetExp, targetExp.getMessage(),
FrameworkErrorCode.MethodInvokeError);
}
} catch (Exception e) {
throw new EngineExecutionException(e, e.getMessage(), FrameworkErrorCode.MethodInvokeError);
throw targetExp;
}
}

Expand All @@ -208,23 +311,23 @@ protected Object toJavaObject(Object value, Class paramType) {

protected boolean isPrimitive(Class<?> clazz) {
return clazz.isPrimitive() //
|| clazz == Boolean.class //
|| clazz == Character.class //
|| clazz == Byte.class //
|| clazz == Short.class //
|| clazz == Integer.class //
|| clazz == Long.class //
|| clazz == Float.class //
|| clazz == Double.class //
|| clazz == BigInteger.class //
|| clazz == BigDecimal.class //
|| clazz == String.class //
|| clazz == java.util.Date.class //
|| clazz == java.sql.Date.class //
|| clazz == java.sql.Time.class //
|| clazz == java.sql.Timestamp.class //
|| clazz.isEnum() //
;
|| clazz == Boolean.class //
|| clazz == Character.class //
|| clazz == Byte.class //
|| clazz == Short.class //
|| clazz == Integer.class //
|| clazz == Long.class //
|| clazz == Float.class //
|| clazz == Double.class //
|| clazz == BigInteger.class //
|| clazz == BigDecimal.class //
|| clazz == String.class //
|| clazz == java.util.Date.class //
|| clazz == java.sql.Date.class //
|| clazz == java.sql.Time.class //
|| clazz == java.sql.Timestamp.class //
|| clazz.isEnum() //
;
}

protected Class getPrimitiveClass(String className) {
Expand Down

0 comments on commit d9b5af8

Please sign in to comment.