Skip to content

Latest commit

 

History

History
599 lines (467 loc) · 26.1 KB

细节知多少 - spring boot transaction解析之阶段二 访问service类的@Transactional方法实现事务的阶段.adoc

File metadata and controls

599 lines (467 loc) · 26.1 KB

细节知多少 - spring boot transaction解析之阶段二 访问service类的@Transactional方法实现事务的阶段

Doc writer yaoyihao1@gmail.com

概述

spring boot transaction我们平时用着非常简单,只需要在service方法上声明@Transactional就可以了。但是要知道,简单的背后是不简单,只是很多东西spring框架帮我们做好了。如果想进阶,想进步,想弄懂,想学习,需要接近它,了解它,弄懂它,最好的方法莫过于debug它的源码了。

笔者把spring boot transaction的整个过程分为两个阶段

  1. 带有@Transactional方法的service类生成代理类的阶段

  2. 访问service类的@Transactional方法实现事务的阶段 本文我们产出第二阶段:访问service类的@Transactional方法实现事务的阶段

技术栈

spring boot 1.5.10
spring framework 4.3.14

带着问题学习

带着问题学习往往起到更好的效果

问题:

1. 平时总是spring默认的事务只有运行时异常才回滚,为什么
2. 事务的savepoint是什么
3. 如何做到多个数据操作的事务提交和回滚的

调用流程图

20201129090652

这是spring官网doc上关于spring transaction的调用流程图。我们大致先有个印象,每个节点的细节正是本文的重点内容。随着本文细节的逐一阐述完成,再回来看下这个图,会有更深的理解和掌握

http 访问

当从controller访问service的方法时,因为controller引用的service指向的代理类,所以进去代理类的方法,如下

代理类的方法

这里我们以UserService的updateSchoolName为例,下面代码为UserService的updateSchoolName方法生成的代理类的方法,var10000即是DynamicAdvisedInterceptor类

final void CGLIB$updateSchoolName$4(String var1, long var2) {
	super.updateSchoolName(var1, var2);
}

public final void updateSchoolName(String var1, long var2) {
	try {
		MethodInterceptor var10000 = this.CGLIB$CALLBACK_0;
		if (var10000 == null) {
			CGLIB$BIND_CALLBACKS(this);
			var10000 = this.CGLIB$CALLBACK_0;
		}

		if (var10000 != null) {
			var10000.intercept(this, CGLIB$updateSchoolName$4$Method, new Object[]{var1, new Long(var2)}, CGLIB$updateSchoolName$4$Proxy);
		} else {
			super.updateSchoolName(var1, var2);
		}
	} catch (Error | RuntimeException var4) {
		throw var4;
	} catch (Throwable var5) {
		throw new UndeclaredThrowableException(var5);
	}
}

从这个方法我们知道,程序会进去var10000.intercept,即DynamicAdvisedInterceptor.intercept方法,我们看看这个方法内部代码

public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
	Object oldProxy = null;
	Class<?> targetClass = null;
	Object target = null;
	try {
		if (this.advised.exposeProxy) {
			oldProxy = AopContext.setCurrentProxy(proxy);
		}

		List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
		Object retVal;

		if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {
			······
		}
		else {
			retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed(); // (1)
		}
		retVal = processReturnType(proxy, target, method, retVal);
		return retVal;
	}
	finally {
		······
	}
}

在这个方法中,会创建CglibMethodInvocation实例,proxy, target,chain等参数传给它,然后调动proceed()方法。这里运用了一个设计模式: 责任链模式的变种 ,CglibMethodInvocation拥有一个MethodInterceptor集合,通过proceed()方法执行interceptor chain。proceed()会遍历interceptorsAndDynamicMethodMatchers(其实就是interceptor集合)从而执行每个interceptor.invoke(MethodInvocation)自身的逻辑,因为这个时候MethodInvocation会传递给invoke方法,所以每一个invoke方法内部都会执行MethodInvocation.proceed(),从而这样形成了一个链式的调用关系。我们看下链式代码结构

MethodInvocation类
public Object proceed() throws Throwable {
    // 链式调用终结点
    if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
        return invokeJoinpoint();
    }
    // 从interceptor集合中获取一个interceptor
    Object interceptorOrInterceptionAdvice =
            this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
    // 调用这个interceptor.invoke方法,开始走链了
    return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
}
MethodInterceptor类
public interface MethodInterceptor extends Interceptor {

    Object invoke(MethodInvocation invocation) throws Throwable;
}

MethodInterceptor实现类
public Object invoke(MethodInvocation mi) throws Throwable {
    // 又调回MethodInvocation.proceed()方法
    return mi.proceed();
}

对我们事务代理而言,这个链上的interceptor就是TransactionInterceptor,程序进入TransactionInterceptor.invoke(this),可以看到这个方法的参数是this, 一般的链式调用模型都会传this作为参数,我们应该学会这种高级操作

从这个方法才开始进行spring transaction事务有关的操作,看下内部

public Object invoke(final MethodInvocation invocation) throws Throwable {
	Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

	// Adapt to TransactionAspectSupport's invokeWithinTransaction...
	return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() {
		@Override
		public Object proceedWithInvocation() throws Throwable {
			return invocation.proceed();
		}
	});
}

invokeWithinTransaction方法逻辑为开始操作Transaction事务相关的操作,所以这个方法就很重要了,同时提供一个回调操作:invocation.proceed()以达到链式调用。

事务入口点

从invokeWithinTransaction方法的代码我们可以看到这时候开始运用事务逻辑,事务包裹着方法的调用。 可以说这里是事务的核心和入口。看其代码

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
		throws Throwable {

	// If the transaction attribute is null, the method is non-transactional.
	final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); (1)
	final PlatformTransactionManager tm = determineTransactionManager(txAttr); (2)
	final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); (3)

	if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
		// Standard transaction demarcation with getTransaction and commit/rollback calls.
	    // 获取事务信息,包含这些属性:事务管理器:ransactionManager;事务属性:transactionAttribute; 事务状态:transactionStatus;老事务:oldTransactionInfo;
		TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); (4)
		Object retVal = null;
		try {
			// This is an around advice: Invoke the next interceptor in the chain.
			// This will normally result in a target object being invoked.
			retVal = invocation.proceedWithInvocation(); (5)
		}
		catch (Throwable ex) {
			completeTransactionAfterThrowing(txInfo, ex); (6)
			throw ex;
		}
		finally {
			cleanupTransactionInfo(txInfo);
		}
		commitTransactionAfterReturning(txInfo); (7)
		return retVal;
	}else{
		······
	}
}

从代码我们知道,整个方法的核心逻辑和我们平时对事务的理解一样, 分为三步:

  1. 调方法前开始事务[即代码(4)]

  2. 开始调用方法[即代码(5)]

  3. 方法报错就回滚[即代码(6)],正常结束提交事务[即代码(7)]

只是在开始事务前我们要获取到事务属性(TransactionAttribute)[即代码(1)]和事务管理器(PlatformTransactionManager)[即代码(2)]。所以首先是从TransactionAttributeSource(其实是AnnotationTransactionAttributeSource)获取TransactionAttribute,这个就是程序初始化时解析UserServiceImpl方法时每个方法生成的TransactionAttribute,现在只是从缓存中取出来。接着获取TransactionManager,其实通过 defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class); ,即beanFactory容器获取的。我们使用的是默认的PlatformTransactionManager,所以获取到的是DataSourcePlatformTransactionManager。

下面我们看第一步:获取事务。获取事务是根据事务管理器,事务属性,方法全限定名,看下createTransactionIfNecessary内部逻辑

protected TransactionInfo createTransactionIfNecessary(
			PlatformTransactionManager tm, TransactionAttribute txAttr, final String joinpointIdentification) {

	// If no name specified, apply method identification as transaction name.
	if (txAttr != null && txAttr.getName() == null) {
		txAttr = new DelegatingTransactionAttribute(txAttr) {
			@Override
			public String getName() {
				return joinpointIdentification;
			}
		};
	}
    // 创建事务状态对象
	TransactionStatus status = tm.getTransaction(txAttr);
	// 准备事务信息对象,并把事务信息对象绑定到当前线程上
	return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

方法中通过事务管理器和事务属性获取事务状态,所谓的事务状态保存了一些属性,这些属性值导向了事务的特性。我们详细说下

DefaultTransactionStatus

	private final Object transaction;

	private final boolean newTransaction;

	private final boolean newSynchronization;

	private final boolean readOnly;

	private final Object suspendedResources;
  1. Object transaction表示事务对象,实际是DataSourceTransactionObject,拥有ConnectionHolder、previousIsolationLevel等属性, 事务实现的本质是多个数据库的操作使用通过连接:Connection 。ConnectionHolder包装了Connection对象,TransactionManager通过一个ThreadLocal属性以datasource为key,ConnectionHolder为value的方式持有ConnectionHolder,

  2. newTransaction事务有传播特性,如果是嵌套事务,那么第一个事务就是新的(newTransaction=true),嵌套内的事务就是旧的(newTransaction=false)

  3. newSynchronization 用于在事务commit/complete前后进行回调操作的标识

  4. suspendedResources 翻译中文就是暂停资源,用于当有两个以上事务场景的时候,保存上一个事务信息,从而当当前事务完成后可以回到上一个事务

我们现在回到程序中,看下事务是怎样获取的,事务状态属性值是怎样赋值和获取的,事务状态是根据事务管理器和事务属性获取的。看其tm.getTransaction内部代码

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
	// spring默认使用spring jdbc的事务,所以这里会创建DataSourceTransactionObject对象,并从事务管理器TransactionManager根据dataSource从ThreadLocal获取ConnectioinHolder赋值给DataSourceTransactionObject对象的属性
	Object transaction = doGetTransaction();(a)

	if (definition == null) {
		definition = new DefaultTransactionDefinition();
	}

	// 判断是否已存在事务:判断依据就是DataSourceTransactionObject.connectionHolder值是否为null
	if (isExistingTransaction(transaction)) {
		// Existing transaction found -> check propagation behavior to find out how to behave.
		// 如果存在事务,走已存在事务的逻辑,使用同上一个事务去处理操作
		return handleExistingTransaction(definition, transaction, debugEnabled);(b)
	}

	// 程序走到这里,说明没有已存在的事务
	if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
			definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
			definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
		// 此方法为暂停事务,由于没有已存在的事务,所以这里传的是null,表示无需暂停事务
		SuspendedResourcesHolder suspendedResources = suspend(null);(c)
		try {
			boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
			// 创建事务状态对象
			DefaultTransactionStatus status = newTransactionStatus(
					definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
			// 给transaction属性赋值,包括ConnectionHolder(Connection的AutoCommit设置为false),previousIsolationLevel
			// 同时将transaction.ConnectionHolder和DataSourece绑定到事务管理器TransactionSynchronizationManager的ThreadLocal属性上
			doBegin(transaction, definition);(d)
			// TransactionStatus.newSynchronization==true,实例化transaction synchronization并绑定到TransactionSynchronizationManager
			prepareSynchronization(status, definition);(e)
			return status;
		}
		catch (RuntimeException ex) {
			resume(null, suspendedResources);
			throw ex;
		}
	}else {
		// Create "empty" transaction: no actual transaction, but potentially synchronization.
		boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
		return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
	}
}

从这个方法的逻辑可以看到,先获取事务对象(此时很多属性还没有赋值),然后判断这个事务是否是已存在的,是走已存在的逻辑;不是的话,使用这个事务对象创建一个新的事务状态对象TransactionStatus,然后根据TransactionDefinition(持有隔离级别和传播特性值)的个属性值给事务对象DataSourceTransactionObject的各个属性赋值。同时将ConnectionHolder和DataSourece和绑定到TransactionStatus和TransactionSynchronizationManager对象上。接着根据事务状态和事务定义实例化transaction synchronization并赋值给事务管理器。

这样事务状态就确定了,即事务状态TransactionStatus对象创建好了。然后根据创建好的事务对象和事务属性创建事务信息对象TransactionInfo。这个对象囊括了事务相关的所有信息,包括事务管理器,事务属性,事务状态,老事务。属性如下:

TransactionInfo class
	private final PlatformTransactionManager transactionManager;

	private final TransactionAttribute transactionAttribute;

	private final String joinpointIdentification;

	private TransactionStatus transactionStatus;

	private TransactionInfo oldTransactionInfo;

到这,事务信息就都设置完了,第一步也执行完了。下面走第二步,调用我们实际的业务方法,即userService.updateSchoolName方法。这里就不展开了。

下面如果业务方法执行正常结果,就提交事务,即commitTransactionAfterReturning方法;如果出现异常,则执行completeTransactionAfterThrowing方法逻辑。两种情况走完的后,都会调用cleanupTransactionInfo方法清除对象,释放内存。 下面看第一种情况,正常commit提交的逻辑,即commitTransactionAfterReturning方法,我们看下这个方法的内部逻辑

protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
	txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
AbstractPlatformTransactoinManager
public final void commit(TransactionStatus status) throws TransactionException {
	······
	processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
	try {
		boolean beforeCompletionInvoked = false;
		try {
			prepareForCommit(status);
			// 提交前回调,这时事务状态对象的newSynchronization开始起作用,Transaction synchronization做些事情
			triggerBeforeCommit(status);
			triggerBeforeCompletion(status);
			beforeCompletionInvoked = true;
			boolean globalRollbackOnly = false;
			// NewTransaction是否是新事务,只有一个事务那就是新事务,嵌套事务的时候,只是最外层的事务是新事务(即NewTransaction=true)
			if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
				globalRollbackOnly = status.isGlobalRollbackOnly();
			}
			if (status.hasSavepoint()) {
				status.releaseHeldSavepoint();
			}
			// 事务都执行完才做事务提交doCommit操作
			else if (status.isNewTransaction()) {
				doCommit(status);
			}
		catch (UnexpectedRollbackException ex) {
			// can only be caused by doCommit
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
			throw ex;
		}
		catch (TransactionException ex) {
			// can only be caused by doCommit
			if (isRollbackOnCommitFailure()) {
				doRollbackOnCommitException(status, ex);
			}
			else {
				triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
			}
			throw ex;
		}
		catch (RuntimeException ex) {
			if (!beforeCompletionInvoked) {
				triggerBeforeCompletion(status);
			}
			doRollbackOnCommitException(status, ex);
			throw ex;
		}
		catch (Error err) {
			if (!beforeCompletionInvoked) {
				triggerBeforeCompletion(status);
			}
			doRollbackOnCommitException(status, err);
			throw err;
		}

		// Trigger afterCommit callbacks, with an exception thrown there
		// propagated to callers but the transaction still considered as committed.
		try {
			triggerAfterCommit(status);
		}
		finally {
			triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
		}

	}
	finally {
		cleanupAfterCompletion(status);
	}
}

对于processCommit方法,核心是doCommit操作,在这前后根据执行newSynchronization和NewTransaction触发回调操作

接着看第二种情况,即出现异常执行completeTransactionAfterThrowing方法逻辑。看其代码实现

protected void completeTransactionAfterThrowing(TransactionInfo txInfo, Throwable ex) {
	if (txInfo != null && txInfo.hasTransaction()) {
		// 判断对这个异常是否回滚,这个逻辑很重要了
		if (txInfo.transactionAttribute.rollbackOn(ex)) {
			try {
				// 执行回滚操作,最终调用Connection.rollback()方法
				txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
			}catch (Error err) {
				throw err;
			}
		}
		else {
			// 不回滚了,直接提交commit
			try {
				txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
			}catch (Error err) {
				logger.error("Application exception overridden by commit error", ex);
				throw err;
			}
		}
	}
}

我们平时可能会听到其他人说spring事务默认只有运行时已成RuntimeException才回滚,像非运行时异常不回滚,我看下这里回滚的逻辑,从而知道这个现象的本质原因,看代码

RuleBasedTransactionAttribute 类
public boolean rollbackOn(Throwable ex) {
	RollbackRuleAttribute winner = null;
	int deepest = Integer.MAX_VALUE;

    // 只有我们业务方法中配置的@Transactional.rollbackfor的值,rollbackRules才只有这个值,否则rollbackRules为空
	if (this.rollbackRules != null) {
		for (RollbackRuleAttribute rule : this.rollbackRules) {
			int depth = rule.getDepth(ex);
			if (depth >= 0 && depth < deepest) {
				deepest = depth;
				winner = rule;
			}
		}
	}
    // 我们没有配置业务方法中的@Transactional.rollbackfor的值时,走这里的逻辑,即spring默认的回滚逻辑
	if (winner == null) {
		return super.rollbackOn(ex);
	}

	return !(winner instanceof NoRollbackRuleAttribute);
}

这个方法的返回值决定了事务回滚,而返回值由两种情况决定 1. 情况一,业务方法中配置了@Transactional.rollbackfor的值 从而List<RollbackRuleAttribute>类型的属性rollbackRules有值,所以RollbackRuleAttribute.exceptionName属性有值,值为我们在业务方法上配置的@Transactional.rollbackfor的exception值,对比如下图,更清晰

20201129091020
20201129091044

我们看下整个RuleBasedTransactionAttribute.rollbackOn方法的逻辑为根据RollbackRuleAttribute.getDepth(ex)方法返回的int值决定winner属性的值,判断winner的类型是否为NoRollbackRuleAttribute,从而决定方法的返回值true or false,进而决定是否执行回滚操作。重点就在RollbackRuleAttribute.getDepth(ex)。我们看下这个方法的逻辑

RollbackRuleAttribute 类
private int getDepth(Class<?> exceptionClass, int depth) {
	if (exceptionClass.getName().contains(this.exceptionName)) {
		return depth;
	}
	if (exceptionClass == Throwable.class) {
		return -1;
	}
	return getDepth(exceptionClass.getSuperclass(), depth + 1);
}

这是一个递归方法,首先比较我们配置的ExceptionName和程序产生的异常name,如果直接包含,返回默认的depth值(默认为0,表示精准匹配),如果产生的异常时Throwable,表示不匹配,两种情况都不符合,进入递归操作,拿到程序产生异常类的父类进行以上逻辑的判断。

  1. 情况二 我们没有配置业务方法中的@Transactional.rollbackfor的值时,情形如下图。

20201129091218

没有配置@Transactional.rollbackfor的值,winner == null,所以程序走super.rollbackOn(ex),即spring事务默认的回滚逻辑。看下这个代码逻辑

DefaultTransactionAttribute 类
public boolean rollbackOn(Throwable ex) {
	return (ex instanceof RuntimeException || ex instanceof Error);
}

方法逻辑为是RuntimeException才回滚。

到这里我们就明白了, 为什么说spring事务默认只有运行时异常RuntimeException才回滚,像非运行时异常不回滚,所以平时在实际的开发过程中,写事务时记得配置@Transactional.rollbackfor,这样任何异常都会执行回滚操作了

问题回答

问题:

1. 平时总是spring默认的事务只有运行时异常才回滚,为什么
2. 事务的savepoint是什么
3. 如何做到多个数据操作的事务提交和回滚的

回答:

1. RuleBasedTransactionAttribute.rollbackOn(Throwable ex)方法解释了这个问题的本质,文中也有详细的分析和阐述
2. 事务的savepoint是什么 TODO
3. 事务具有传播特性,同一个事务对数据库的连接Connection的是相同的,本质的通过Connection.commit和Connection.rollback实现的

spring transaction 关键词

0.AbstractPlatformTransactionManager

spring标准事务工作流的基础类,作为具体事务管理器的基础。这个基础类提供以下的工作流的处理:

  1. 判断是否有一个存在的事务

  2. 应用合适的传播行为

  3. 暂停suspend和重新使用resume事务if necessary

  4. 在commit提交是检查回滚rollback-only标识

  5. 在回滚rollback时应用合适的修改动作 actual rollback or setting rollback-only

  6. triggers registered synchronization callbacks (if transaction synchronization is active). 子类必须为事务特定状态实现特定的template methods,像: begin, suspend, resume, commit, rollback. 他们都是很重要的,必须被具体的实现类实现和提供 事务同步(Transaction synchronization)是当事务完成时提供回调的一般机制。这主要是被用于内部的数据 the data access support classes for JDBC, Hibernate, JPA

    1. ConnectionHolder 持有Connection,savepoint属性,savepoint是jdbc的特性。ConnectionHolder创建Connection对象的savepoint

    2. TransactionAspectSupport 事务切面的基础类(Base class for transactional aspects),是TransactionInterceptor的父类,拥有transactionInfoHolder属性,他的方法invokeWithinTransaction是事务和业务方法结合的入口处,事务操作几乎在这个类中完成

    3. TransactionDefinition Spring transaction属性类,定义了事务传播特性,事务隔离级别,可读性,事务名称等属性

    4. TransactionAttribute TransactionDefinition的子类,比TransactionDefinition多的部分是增加了rollback功能

    5. DataSourceTransactionObject 事务本身对象。DataSource的事务对象(transaction object),持有ConnectionHolder等

    6. TransactionStatus

DefaultTransactionStatus

	private final Object transaction; 事务对象

	private final boolean newTransaction; 是否为新事物,事务嵌套和多个事务时,只有第一个事务是新事务(newTransaction=true)

	private final boolean newSynchronization; 是否为新同步, newSynchronization=true时触发事务回调

	private Object savepoint; 保存点,用于嵌套事务时

	private final Object suspendedResources; 暂停的资源:事务,当多个事务时起作用

7.TransactionInfo

拥有PlatformTransactionManager,TransactionAttribute,TransactionStatus,oldTransactionInfo等属性

这几个对象间的关系通过图的方法

20201129091326

调用链图

20201129091348

扩展

  1. Transaction Synchronization含义 (触发提交操作回调的)Trigger xxxCommit callbacks.

Transaction Synchronization. The Java Transaction API includes a javax.transaction.Synchronization interface, which issues notifications before and after a transaction is completed. ... After the transaction is committed or rolled back, the TransactionManager calls the Synchronization object's afterCompletion() method.