Skip to content

Commit

Permalink
Do data validation when undo. (apache#1060)
Browse files Browse the repository at this point in the history
* Do data validation when undo.

* Run test case use h2 database.

* fix as review.

* Add log.
  • Loading branch information
ujjboy authored and nick-tan committed Jul 12, 2019
1 parent b20ec97 commit 9677601
Show file tree
Hide file tree
Showing 5 changed files with 502 additions and 31 deletions.
12 changes: 12 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
<httpcore.version>4.4.11</httpcore.version>
<druid.version>1.1.12</druid.version>
<caffeine.version>2.7.0</caffeine.version>
<commons-dbcp.version>1.3</commons-dbcp.version>
<h2.version>1.4.181</h2.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -270,6 +272,16 @@
<artifactId>caffeine</artifactId>
<version>${caffeine.version}</version>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>${commons-dbcp.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
</dependency>

</dependencies>
</dependencyManagement>
Expand Down
11 changes: 11 additions & 0 deletions rm-datasource/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@
<artifactId>caffeine</artifactId>
</dependency>

<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>

</dependencies>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@
*/
package io.seata.rm.datasource.undo;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;

import com.alibaba.fastjson.JSON;
import io.seata.common.util.StringUtils;
import io.seata.rm.datasource.DataCompareUtils;
import io.seata.rm.datasource.sql.struct.Field;
import io.seata.rm.datasource.sql.struct.KeyType;
import io.seata.rm.datasource.sql.struct.Row;
import io.seata.rm.datasource.sql.struct.TableMeta;
import io.seata.rm.datasource.sql.struct.TableRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

/**
* The type Abstract undo executor.
Expand All @@ -34,6 +41,18 @@
*/
public abstract class AbstractUndoExecutor {

/**
* Logger for AbstractUndoExecutor
**/
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractUndoExecutor.class);

/**
* template of check sql
*
* TODO support multiple primary key
*/
private static final String CHECK_SQL_TEMPLATE = "SELECT * FROM %s WHERE %s in (%s)";

/**
* The Sql undo log.
*/
Expand Down Expand Up @@ -72,11 +91,10 @@ public SQLUndoLog getSqlUndoLog() {
*/
public void executeOn(Connection conn) throws SQLException {

// no need undo if the before data snapshot is equivalent to the after data snapshot.
if (DataCompareUtils.isRecordsEquals(sqlUndoLog.getBeforeImage(), sqlUndoLog.getAfterImage())) {
if (!dataValidationAndGoOn(conn)) {
return;
}
dataValidation(conn);

try {
String undoSQL = buildUndoSQL();

Expand Down Expand Up @@ -145,9 +163,125 @@ protected void undoPrepare(PreparedStatement undoPST, ArrayList<Field> undoValue
* Data validation.
*
* @param conn the conn
* @throws SQLException the sql exception
* @return return true if data validation is ok and need continue undo, and return false if no need continue undo.
* @throws SQLException the sql exception such as has dirty data
*/
protected void dataValidation(Connection conn) throws SQLException {
protected boolean dataValidationAndGoOn(Connection conn) throws SQLException {

TableRecords beforeRecords = sqlUndoLog.getBeforeImage();
TableRecords afterRecords = sqlUndoLog.getAfterImage();

// Compare current data with before data
// No need undo if the before data snapshot is equivalent to the after data snapshot.
if (DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Stop rollback because there is no data change " +
"between the before data snapshot and the after data snapshot.");
}
// no need continue undo.
return false;
}

// Validate if data is dirty.
TableRecords currentRecords = queryCurrentRecords(conn);
// compare with current data and after image.
if (!DataCompareUtils.isRecordsEquals(afterRecords, currentRecords)) {

// If current data is not equivalent to the after data, then compare the current data with the before
// data, too. No need continue to undo if current data is equivalent to the before data snapshot
if (DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Stop rollback because there is no data change " +
"between the before data snapshot and the current data snapshot.");
}
// no need continue undo.
return false;
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("check dirty datas failed, old and new data are not equal," +
"tableName:[" + sqlUndoLog.getTableName() + "]," +
"oldRows:[" + JSON.toJSONString(afterRecords.getRows()) + "]," +
"newRows:[" + JSON.toJSONString(currentRecords.getRows()) + "].");
}
throw new SQLException("Has dirty records when undo.");
}
}
return true;
}

/**
* Query current records.
*
* @param conn the conn
* @return the table records
* @throws SQLException the sql exception
*/
protected TableRecords queryCurrentRecords(Connection conn) throws SQLException {
TableRecords undoRecords = getUndoRows();
TableMeta tableMeta = undoRecords.getTableMeta();
String pkName = tableMeta.getPkName();
int pkType = tableMeta.getColumnMeta(pkName).getDataType();

// pares pk values
Object[] pkValues = parsePkValues(getUndoRows());
if (pkValues.length == 0) {
return TableRecords.empty(tableMeta);
}
StringBuffer replace = new StringBuffer();
for (int i = 0; i < pkValues.length; i++) {
replace.append("?,");
}
// build check sql
String checkSQL = String.format(CHECK_SQL_TEMPLATE, sqlUndoLog.getTableName(), pkName,
replace.substring(0, replace.length() - 1));

PreparedStatement statement = null;
ResultSet checkSet = null;
TableRecords currentRecords;
try {
statement = conn.prepareStatement(checkSQL);
for (int i = 1; i <= pkValues.length; i++) {
statement.setObject(i, pkValues[i - 1], pkType);
}
checkSet = statement.executeQuery();
currentRecords = TableRecords.buildRecords(tableMeta, checkSet);
} finally {
if (checkSet != null) {
try {
checkSet.close();
} catch (Exception e) {
}
}
if (statement != null) {
try {
statement.close();
} catch (Exception e) {
}
}
}
return currentRecords;
}

/**
* Parse pk values object [ ].
*
* @param records the records
* @return the object [ ]
*/
protected Object[] parsePkValues(TableRecords records) {
String pkName = records.getTableMeta().getPkName();
List<Row> undoRows = records.getRows();
Object[] pkValues = new Object[undoRows.size()];
for (int i = 0; i < undoRows.size(); i++) {
List<Field> fields = undoRows.get(i).getFields();
if (fields != null) {
for (Field field : fields) {
if (StringUtils.equalsIgnoreCase(pkName, field.getName())) {
pkValues[i] = field.getValue();
}
}
}
}
return pkValues;
}
}
Loading

0 comments on commit 9677601

Please sign in to comment.