From 575ac12355e8747d426e900eb6b4cc58b8210354 Mon Sep 17 00:00:00 2001 From: xingfudeshi Date: Fri, 1 Mar 2024 13:37:11 +0800 Subject: [PATCH] optimize: compatible with tcc (#6345) --- changes/en-us/2.x.md | 1 + changes/zh-cn/2.x.md | 2 + compatible/pom.xml | 15 ++ .../tx/api/interceptor/ActionContextUtil.java | 30 ++- .../java/io/seata/rm/tcc/api/LocalTCC.java | 30 +++ .../rm/tcc/api/TwoPhaseBusinessAction.java | 79 ++++++ .../TccActionInterceptorHandler.java | 224 ++++++++++++++++++ .../parser/TccActionInterceptorParser.java | 71 ++++++ .../parser/LocalTCCRemotingParser.java | 85 +++++++ .../parser/TccRegisterResourceParser.java | 78 ++++++ .../spring/tcc/TccAnnotationProcessor.java | 50 ++++ .../TccActionInterceptorHandler.java | 9 +- 12 files changed, 669 insertions(+), 5 deletions(-) create mode 100644 compatible/src/main/java/io/seata/rm/tcc/api/LocalTCC.java create mode 100644 compatible/src/main/java/io/seata/rm/tcc/api/TwoPhaseBusinessAction.java create mode 100644 compatible/src/main/java/io/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java create mode 100644 compatible/src/main/java/io/seata/rm/tcc/interceptor/parser/TccActionInterceptorParser.java create mode 100644 compatible/src/main/java/io/seata/rm/tcc/remoting/parser/LocalTCCRemotingParser.java create mode 100644 compatible/src/main/java/io/seata/rm/tcc/resource/parser/TccRegisterResourceParser.java create mode 100644 compatible/src/main/java/io/seata/spring/tcc/TccAnnotationProcessor.java diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 908fd92e84b..b831f723a45 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -95,6 +95,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6254](https://github.com/apache/incubator-seata/pull/6254)] optimize Hessian Serialize - [[#6332](https://github.com/apache/incubator-seata/pull/6332)] remove mysql dependency from the distribution package - [[#6343](https://github.com/apache/incubator-seata/pull/6343)] compatible with tm module and rm-datasource module +- [[#6345](https://github.com/apache/incubator-seata/pull/6345)] compatible with tcc module - [[#6356](https://github.com/apache/incubator-seata/pull/6356)] remove authentication from the health check page - [[#6360](https://github.com/apache/incubator-seata/pull/6360)] optimize 401 issues for some links - [[#6369](https://github.com/apache/incubator-seata/pull/6369)] optimize arm64 ci diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index c9e3ca17aa0..2997bbf7dde 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -94,6 +94,8 @@ - [[#6330](https://github.com/apache/incubator-seata/pull/6330)] 去除 mariadb API - [[#6329](https://github.com/apache/incubator-seata/pull/6312)] 添加 saga 子组件的 io.seata 兼容性 API - [[#6254](https://github.com/apache/incubator-seata/pull/6254)] 优化Hessian 序列化 +- [[#6343](https://github.com/apache/incubator-seata/pull/6343)] 兼容tm 模块和rm-datasource模块 +- [[#6345](https://github.com/apache/incubator-seata/pull/6345)] 兼容tcc模块 - [[#6332](https://github.com/apache/incubator-seata/pull/6332)] 分发包中移除 mysql 依赖 - [[#6343](https://github.com/apache/incubator-seata/pull/6343)] 兼容 TM 模块和 rm-datasource 模块 - [[#6349](https://github.com/apache/incubator-seata/pull/6349)] 迁移 dockerhub 仓库 diff --git a/compatible/pom.xml b/compatible/pom.xml index 42f961df6e8..05d84e4eaee 100644 --- a/compatible/pom.xml +++ b/compatible/pom.xml @@ -32,6 +32,16 @@ 8 + + org.apache.seata + seata-saga-engine + ${project.version} + + + org.apache.seata + seata-saga-engine-store + ${project.version} + org.apache.seata seata-saga-spring @@ -101,5 +111,10 @@ 1.27.1 provided + + org.apache.seata + seata-tcc + ${project.version} + diff --git a/compatible/src/main/java/io/seata/integration/tx/api/interceptor/ActionContextUtil.java b/compatible/src/main/java/io/seata/integration/tx/api/interceptor/ActionContextUtil.java index ac90af80be8..478fc222047 100644 --- a/compatible/src/main/java/io/seata/integration/tx/api/interceptor/ActionContextUtil.java +++ b/compatible/src/main/java/io/seata/integration/tx/api/interceptor/ActionContextUtil.java @@ -21,13 +21,14 @@ import org.apache.seata.common.util.CollectionUtils; import org.apache.seata.common.util.StringUtils; import org.apache.seata.rm.tcc.api.ParamType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.List; import java.util.Map; -import static org.apache.seata.integration.tx.api.interceptor.ActionContextUtil.getByIndex; - /** * Extracting TCC Context from Method */ @@ -36,6 +37,8 @@ public final class ActionContextUtil { private ActionContextUtil() { } + private static final Logger LOGGER = LoggerFactory.getLogger(ActionContextUtil.class); + /** * Extracting context data from parameters * @@ -86,6 +89,29 @@ public static void loadParamByAnnotationAndPutToContext(@Nonnull final ParamType } } + @Nullable + private static Object getByIndex(@Nonnull ParamType paramType, @Nonnull String paramName, @Nonnull Object paramValue, int index) { + if (paramValue instanceof List) { + @SuppressWarnings("unchecked") + List list = (List) paramValue; + if (list.isEmpty()) { + return null; + } + if (list.size() <= index) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("The index '{}' is out of bounds for the list {} named '{}'," + + " whose size is '{}', so pass this {}", index, paramType.getCode(), paramName, list.size(), paramType.getCode()); + } + return null; + } + paramValue = list.get(index); + } else { + LOGGER.warn("the {} named '{}' is not a `List`, so the 'index' field of '@{}' cannot be used on it", + paramType.getCode(), paramName, BusinessActionContextParameter.class.getSimpleName()); + } + + return paramValue; + } public static String getParamNameFromAnnotation(@Nonnull BusinessActionContextParameter annotation) { String paramName = annotation.paramName(); diff --git a/compatible/src/main/java/io/seata/rm/tcc/api/LocalTCC.java b/compatible/src/main/java/io/seata/rm/tcc/api/LocalTCC.java new file mode 100644 index 00000000000..a28f6bea2c2 --- /dev/null +++ b/compatible/src/main/java/io/seata/rm/tcc/api/LocalTCC.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.rm.tcc.api; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +@Inherited +public @interface LocalTCC { +} diff --git a/compatible/src/main/java/io/seata/rm/tcc/api/TwoPhaseBusinessAction.java b/compatible/src/main/java/io/seata/rm/tcc/api/TwoPhaseBusinessAction.java new file mode 100644 index 00000000000..1d1e86377a6 --- /dev/null +++ b/compatible/src/main/java/io/seata/rm/tcc/api/TwoPhaseBusinessAction.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.rm.tcc.api; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD}) +@Inherited +public @interface TwoPhaseBusinessAction { + + /** + * TCC bean name, must be unique + * + * @return the string + */ + String name(); + + /** + * commit method name + * + * @return the string + */ + String commitMethod() default "commit"; + + /** + * rollback method name + * + * @return the string + */ + String rollbackMethod() default "rollback"; + + /** + * delay branch report while sharing params to tcc phase 2 to enhance performance + * + * @return isDelayReport + */ + boolean isDelayReport() default false; + + /** + * whether use TCC fence (idempotent,non_rollback,suspend) + * + * @return the boolean + */ + boolean useTCCFence() default false; + + /** + * commit method's args + * + * @return the Class[] + */ + Class[] commitArgsClasses() default {BusinessActionContext.class}; + + /** + * rollback method's args + * + * @return the Class[] + */ + Class[] rollbackArgsClasses() default {BusinessActionContext.class}; +} diff --git a/compatible/src/main/java/io/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java b/compatible/src/main/java/io/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java new file mode 100644 index 00000000000..25d19a588a7 --- /dev/null +++ b/compatible/src/main/java/io/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.rm.tcc.interceptor; + +import io.seata.core.context.RootContext; +import io.seata.core.model.BranchType; +import io.seata.rm.tcc.api.TwoPhaseBusinessAction; +import org.apache.seata.common.Constants; +import org.apache.seata.common.holder.ObjectHolder; +import org.apache.seata.common.util.ReflectionUtil; +import org.apache.seata.integration.tx.api.fence.config.CommonFenceConfig; +import org.apache.seata.integration.tx.api.interceptor.InvocationWrapper; +import org.apache.seata.integration.tx.api.interceptor.TwoPhaseBusinessActionParam; +import org.slf4j.MDC; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.seata.common.Constants.BEAN_NAME_SPRING_FENCE_CONFIG; + +public class TccActionInterceptorHandler extends org.apache.seata.rm.tcc.interceptor.TccActionInterceptorHandler { + + public TccActionInterceptorHandler(Object targetBean, Set methodsToProxy) { + super(targetBean, methodsToProxy); + } + + @Override + protected Object doInvoke(InvocationWrapper invocation) throws Throwable { + if (!RootContext.inGlobalTransaction() || RootContext.inSagaBranch()) { + //not in transaction, or this interceptor is disabled + return invocation.proceed(); + } + Method method = invocation.getMethod(); + TwoPhaseBusinessAction businessAction = parseAnnotation(method); + + //try method + if (businessAction != null) { + //save the xid + String xid = RootContext.getXID(); + //save the previous branchType + BranchType previousBranchType = RootContext.getBranchType(); + //if not TCC, bind TCC branchType + if (BranchType.TCC != previousBranchType) { + RootContext.bindBranchType(BranchType.TCC); + } + try { + TwoPhaseBusinessActionParam businessActionParam = new TwoPhaseBusinessActionParam(); + businessActionParam.setActionName(businessAction.name()); + businessActionParam.setDelayReport(businessAction.isDelayReport()); + businessActionParam.setUseCommonFence(businessAction.useTCCFence()); + businessActionParam.setBranchType(org.apache.seata.core.model.BranchType.TCC); + Map businessActionContextMap = new HashMap<>(4); + //the phase two method name + businessActionContextMap.put(Constants.COMMIT_METHOD, businessAction.commitMethod()); + businessActionContextMap.put(Constants.ROLLBACK_METHOD, businessAction.rollbackMethod()); + businessActionContextMap.put(Constants.ACTION_NAME, businessAction.name()); + businessActionContextMap.put(Constants.USE_COMMON_FENCE, businessAction.useTCCFence()); + businessActionParam.setBusinessActionContext(businessActionContextMap); + //Handler the TCC Aspect, and return the business result + return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessActionParam, + invocation::proceed); + } finally { + //if not TCC, unbind branchType + if (BranchType.TCC != previousBranchType) { + RootContext.unbindBranchType(); + } + //MDC remove branchId + MDC.remove(org.apache.seata.core.context.RootContext.MDC_KEY_BRANCH_ID); + } + } + + //not TCC try method + return invocation.proceed(); + } + + private TwoPhaseBusinessAction parseAnnotation(Method methodKey) throws NoSuchMethodException { + TwoPhaseBusinessAction result = convertIoSeata(parseAnnotationCache.computeIfAbsent(methodKey, method -> { + TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class); + if (businessAction == null && targetBean.getClass() != null) { + Set> interfaceClasses = ReflectionUtil.getInterfaces(targetBean.getClass()); + if (interfaceClasses != null) { + for (Class interClass : interfaceClasses) { + try { + Method m = interClass.getMethod(method.getName(), method.getParameterTypes()); + businessAction = m.getAnnotation(TwoPhaseBusinessAction.class); + if (businessAction != null) { + // init common fence clean task if enable useTccFence + initCommonFenceCleanTask(businessAction); + break; + } + } catch (NoSuchMethodException e) { + throw new RuntimeException(e); + } + } + } + } + return convertApacheSeata(businessAction); + })); + return result; + } + + private org.apache.seata.rm.tcc.api.TwoPhaseBusinessAction convertApacheSeata(TwoPhaseBusinessAction twoPhaseBusinessAction) { + org.apache.seata.rm.tcc.api.TwoPhaseBusinessAction result = new org.apache.seata.rm.tcc.api.TwoPhaseBusinessAction() { + + @Override + public Class annotationType() { + return org.apache.seata.rm.tcc.api.TwoPhaseBusinessAction.class; + } + + @Override + public String name() { + return twoPhaseBusinessAction.name(); + } + + @Override + public String commitMethod() { + return twoPhaseBusinessAction.commitMethod(); + } + + @Override + public String rollbackMethod() { + return twoPhaseBusinessAction.rollbackMethod(); + } + + @Override + public boolean isDelayReport() { + return twoPhaseBusinessAction.isDelayReport(); + } + + @Override + public boolean useTCCFence() { + return twoPhaseBusinessAction.useTCCFence(); + } + + @Override + public Class[] commitArgsClasses() { + return twoPhaseBusinessAction.commitArgsClasses(); + } + + @Override + public Class[] rollbackArgsClasses() { + return twoPhaseBusinessAction.rollbackArgsClasses(); + } + }; + return result; + } + + private TwoPhaseBusinessAction convertIoSeata(org.apache.seata.rm.tcc.api.TwoPhaseBusinessAction twoPhaseBusinessAction) { + TwoPhaseBusinessAction result = new TwoPhaseBusinessAction() { + @Override + public Class annotationType() { + return TwoPhaseBusinessAction.class; + } + + @Override + public String name() { + return twoPhaseBusinessAction.name(); + } + + @Override + public String commitMethod() { + return twoPhaseBusinessAction.commitMethod(); + } + + @Override + public String rollbackMethod() { + return twoPhaseBusinessAction.rollbackMethod(); + } + + @Override + public boolean isDelayReport() { + return twoPhaseBusinessAction.isDelayReport(); + } + + @Override + public boolean useTCCFence() { + return twoPhaseBusinessAction.useTCCFence(); + } + + @Override + public Class[] commitArgsClasses() { + return twoPhaseBusinessAction.commitArgsClasses(); + } + + @Override + public Class[] rollbackArgsClasses() { + return twoPhaseBusinessAction.rollbackArgsClasses(); + } + }; + return result; + } + + + private void initCommonFenceCleanTask(TwoPhaseBusinessAction twoPhaseBusinessAction) { + CommonFenceConfig commonFenceConfig = (CommonFenceConfig) ObjectHolder.INSTANCE.getObject(BEAN_NAME_SPRING_FENCE_CONFIG); + if (commonFenceConfig == null || commonFenceConfig.getInitialized().get()) { + return; + } + if (twoPhaseBusinessAction != null && twoPhaseBusinessAction.useTCCFence()) { + if (commonFenceConfig.getInitialized().compareAndSet(false, true)) { + // init common fence clean task if enable useTccFence + commonFenceConfig.init(); + } + } + } + +} diff --git a/compatible/src/main/java/io/seata/rm/tcc/interceptor/parser/TccActionInterceptorParser.java b/compatible/src/main/java/io/seata/rm/tcc/interceptor/parser/TccActionInterceptorParser.java new file mode 100644 index 00000000000..dc6b68b28ed --- /dev/null +++ b/compatible/src/main/java/io/seata/rm/tcc/interceptor/parser/TccActionInterceptorParser.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.rm.tcc.interceptor.parser; + +import io.seata.rm.tcc.api.TwoPhaseBusinessAction; +import io.seata.rm.tcc.interceptor.TccActionInterceptorHandler; +import org.apache.seata.common.util.ReflectionUtil; +import org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler; +import org.apache.seata.integration.tx.api.interceptor.parser.DefaultResourceRegisterParser; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +public class TccActionInterceptorParser extends org.apache.seata.rm.tcc.interceptor.parser.TccActionInterceptorParser { + + @Override + public ProxyInvocationHandler parserInterfaceToProxy(Object target, String objectName) { + // eliminate the bean without two phase annotation. + Set methodsToProxy = this.tccProxyTargetMethod(target); + if (methodsToProxy.isEmpty()) { + return null; + } + // register resource and enhance with interceptor + DefaultResourceRegisterParser.get().registerResource(target, objectName); + return new TccActionInterceptorHandler(target, methodsToProxy); + } + + private Set tccProxyTargetMethod(Object target) { + Set methodsToProxy = new HashSet<>(); + //check if it is TCC bean + Class tccServiceClazz = target.getClass(); + Set methods = new HashSet<>(Arrays.asList(tccServiceClazz.getMethods())); + Set> interfaceClasses = ReflectionUtil.getInterfaces(tccServiceClazz); + if (interfaceClasses != null) { + for (Class interClass : interfaceClasses) { + methods.addAll(Arrays.asList(interClass.getMethods())); + } + } + + TwoPhaseBusinessAction twoPhaseBusinessAction; + for (Method method : methods) { + twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class); + if (twoPhaseBusinessAction != null) { + methodsToProxy.add(method.getName()); + } + } + + if (methodsToProxy.isEmpty()) { + return Collections.emptySet(); + } + // sofa:reference / dubbo:reference, AOP + return methodsToProxy; + } +} diff --git a/compatible/src/main/java/io/seata/rm/tcc/remoting/parser/LocalTCCRemotingParser.java b/compatible/src/main/java/io/seata/rm/tcc/remoting/parser/LocalTCCRemotingParser.java new file mode 100644 index 00000000000..884796e497b --- /dev/null +++ b/compatible/src/main/java/io/seata/rm/tcc/remoting/parser/LocalTCCRemotingParser.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.rm.tcc.remoting.parser; + +import io.seata.rm.tcc.api.LocalTCC; +import org.apache.seata.common.exception.FrameworkException; +import org.apache.seata.common.util.ReflectionUtil; +import org.apache.seata.integration.tx.api.remoting.Protocols; +import org.apache.seata.integration.tx.api.remoting.RemotingDesc; +import org.springframework.aop.framework.AopProxyUtils; + +import java.util.Set; + +public class LocalTCCRemotingParser extends org.apache.seata.rm.tcc.remoting.parser.LocalTCCRemotingParser { + + @Override + public RemotingDesc getServiceDesc(Object bean, String beanName) throws FrameworkException { + if (!this.isRemoting(bean, beanName)) { + return null; + } + RemotingDesc remotingDesc = new RemotingDesc(); + remotingDesc.setReference(this.isReference(bean, beanName)); + remotingDesc.setService(this.isService(bean, beanName)); + remotingDesc.setProtocol(Protocols.IN_JVM); + Class classType = bean.getClass(); + // check if LocalTCC annotation is marked on the implementation class + if (classType.isAnnotationPresent(LocalTCC.class)) { + remotingDesc.setServiceClass(AopProxyUtils.ultimateTargetClass(bean)); + remotingDesc.setServiceClassName(remotingDesc.getServiceClass().getName()); + remotingDesc.setTargetBean(bean); + return remotingDesc; + } + // check if LocalTCC annotation is marked on the interface + Set> interfaceClasses = ReflectionUtil.getInterfaces(classType); + for (Class interClass : interfaceClasses) { + if (interClass.isAnnotationPresent(LocalTCC.class)) { + remotingDesc.setServiceClassName(interClass.getName()); + remotingDesc.setServiceClass(interClass); + remotingDesc.setTargetBean(bean); + return remotingDesc; + } + } + throw new FrameworkException("Couldn't parser any Remoting info"); + } + + @Override + public boolean isService(Class beanClass) throws FrameworkException { + return isLocalTCC(beanClass); + } + + @Override + public boolean isReference(Object bean, String beanName) { + return isLocalTCC(bean); + } + + private boolean isLocalTCC(Object bean) { + Class classType = bean.getClass(); + return isLocalTCC(classType); + } + + + private boolean isLocalTCC(Class classType) { + Set> interfaceClasses = ReflectionUtil.getInterfaces(classType); + for (Class interClass : interfaceClasses) { + if (interClass.isAnnotationPresent(LocalTCC.class)) { + return true; + } + } + return classType.isAnnotationPresent(LocalTCC.class); + } +} diff --git a/compatible/src/main/java/io/seata/rm/tcc/resource/parser/TccRegisterResourceParser.java b/compatible/src/main/java/io/seata/rm/tcc/resource/parser/TccRegisterResourceParser.java new file mode 100644 index 00000000000..b1570ec4193 --- /dev/null +++ b/compatible/src/main/java/io/seata/rm/tcc/resource/parser/TccRegisterResourceParser.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.rm.tcc.resource.parser; + +import io.seata.rm.tcc.api.TwoPhaseBusinessAction; +import org.apache.seata.common.exception.FrameworkException; +import org.apache.seata.common.util.ReflectionUtil; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.rm.DefaultResourceManager; +import org.apache.seata.rm.tcc.TCCResource; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +public class TccRegisterResourceParser extends org.apache.seata.rm.tcc.resource.parser.TccRegisterResourceParser { + + @Override + public void registerResource(Object target, String beanName) { + try { + //service bean, registry resource + Class serviceClass = target.getClass(); + this.executeRegisterResource(target, new HashSet<>(Arrays.asList(serviceClass.getMethods())), target.getClass()); + Set> interfaceClasses = ReflectionUtil.getInterfaces(serviceClass); + for (Class interClass : interfaceClasses) { + this.executeRegisterResource(target, new HashSet<>(Arrays.asList(interClass.getMethods())), interClass); + } + } catch (Throwable t) { + throw new FrameworkException(t, "parser remoting service error"); + } + } + + private void executeRegisterResource(Object target, Set methods, Class targetServiceClass) throws NoSuchMethodException { + for (Method m : methods) { + TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class); + if (twoPhaseBusinessAction != null) { + TCCResource tccResource = new TCCResource(); + if (StringUtils.isBlank(twoPhaseBusinessAction.name())) { + throw new FrameworkException("TCC bean name cannot be null or empty"); + } + tccResource.setActionName(twoPhaseBusinessAction.name()); + tccResource.setTargetBean(target); + tccResource.setPrepareMethod(m); + tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod()); + tccResource.setCommitMethod(targetServiceClass.getMethod(twoPhaseBusinessAction.commitMethod(), + twoPhaseBusinessAction.commitArgsClasses())); + tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod()); + tccResource.setRollbackMethod(targetServiceClass.getMethod(twoPhaseBusinessAction.rollbackMethod(), + twoPhaseBusinessAction.rollbackArgsClasses())); + // set argsClasses + tccResource.setCommitArgsClasses(twoPhaseBusinessAction.commitArgsClasses()); + tccResource.setRollbackArgsClasses(twoPhaseBusinessAction.rollbackArgsClasses()); + // set phase two method's keys + tccResource.setPhaseTwoCommitKeys(this.getTwoPhaseArgs(tccResource.getCommitMethod(), + twoPhaseBusinessAction.commitArgsClasses())); + tccResource.setPhaseTwoRollbackKeys(this.getTwoPhaseArgs(tccResource.getRollbackMethod(), + twoPhaseBusinessAction.rollbackArgsClasses())); + //registry tcc resource + DefaultResourceManager.get().registerResource(tccResource); + } + } + } +} diff --git a/compatible/src/main/java/io/seata/spring/tcc/TccAnnotationProcessor.java b/compatible/src/main/java/io/seata/spring/tcc/TccAnnotationProcessor.java new file mode 100644 index 00000000000..9e7744928ab --- /dev/null +++ b/compatible/src/main/java/io/seata/spring/tcc/TccAnnotationProcessor.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.seata.spring.tcc; + +import io.seata.rm.tcc.api.TwoPhaseBusinessAction; +import org.apache.seata.integration.tx.api.remoting.RemotingDesc; +import org.apache.seata.integration.tx.api.util.ProxyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; + +public class TccAnnotationProcessor extends org.apache.seata.spring.tcc.TccAnnotationProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(org.apache.seata.spring.tcc.TccAnnotationProcessor.class); + + @Override + public void addTccAdvise(Object bean, String beanName, Field field, Class serviceClass) throws IllegalAccessException { + Object fieldValue = field.get(bean); + if (fieldValue == null) { + return; + } + for (Method method : field.getType().getMethods()) { + if (!Modifier.isStatic(method.getModifiers()) && (method.isAnnotationPresent(TwoPhaseBusinessAction.class))) { + RemotingDesc remotingDesc = new RemotingDesc(); + remotingDesc.setServiceClass(serviceClass); + + Object proxyBean = ProxyUtil.createProxy(bean, beanName); + field.setAccessible(true); + field.set(bean, proxyBean); + LOGGER.info("Bean[" + bean.getClass().getName() + "] with name [" + field.getName() + "] would use proxy"); + } + } + } +} diff --git a/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java b/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java index 9b50104c9e0..82a7d6a7dc0 100644 --- a/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java +++ b/tcc/src/main/java/org/apache/seata/rm/tcc/interceptor/TccActionInterceptorHandler.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; + import org.apache.seata.common.Constants; import org.apache.seata.common.DefaultValues; import org.apache.seata.common.holder.ObjectHolder; @@ -37,6 +38,8 @@ import org.apache.seata.integration.tx.api.interceptor.handler.AbstractProxyInvocationHandler; import org.apache.seata.rm.tcc.api.TwoPhaseBusinessAction; import org.slf4j.MDC; + + import static org.apache.seata.common.ConfigurationKeys.TCC_ACTION_INTERCEPTOR_ORDER; import static org.apache.seata.common.Constants.BEAN_NAME_SPRING_FENCE_CONFIG; @@ -45,12 +48,12 @@ public class TccActionInterceptorHandler extends AbstractProxyInvocationHandler private static final int ORDER_NUM = ConfigurationFactory.getInstance().getInt(TCC_ACTION_INTERCEPTOR_ORDER, DefaultValues.TCC_ACTION_INTERCEPTOR_ORDER); - private ActionInterceptorHandler actionInterceptorHandler = new ActionInterceptorHandler(); + protected ActionInterceptorHandler actionInterceptorHandler = new ActionInterceptorHandler(); private Set methodsToProxy; - private Object targetBean; + protected Object targetBean; - private Map parseAnnotationCache = new ConcurrentHashMap<>(); + protected Map parseAnnotationCache = new ConcurrentHashMap<>(); public TccActionInterceptorHandler(Object targetBean, Set methodsToProxy) { this.targetBean = targetBean;