Skip to content

Commit

Permalink
[CONJ-525] merge PR to handle DELETE batching with bulk command
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Sep 20, 2017
1 parent 90da571 commit ab8e666
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 37 deletions.
Expand Up @@ -71,7 +71,7 @@ public interface CmdInformation {

void addErrorStat();

void clearErrorStat();
void reset();

void addResultSetStat();

Expand Down
Expand Up @@ -64,8 +64,8 @@

public class CmdInformationBatch implements CmdInformation {

private final Queue<Long> insertIds;
private final Queue<Long> updateCounts;
private final Queue<Long> insertIds = new ConcurrentLinkedQueue<>();
private final Queue<Long> updateCounts = new ConcurrentLinkedQueue<>();
private final int expectedSize;
private final int autoIncrement;
private int insertIdNumber = 0;
Expand All @@ -84,25 +84,26 @@ public class CmdInformationBatch implements CmdInformation {
*/
public CmdInformationBatch(int expectedSize, int autoIncrement) {
this.expectedSize = expectedSize;
this.insertIds = new ConcurrentLinkedQueue<>();
this.updateCounts = new ConcurrentLinkedQueue<>();
this.autoIncrement = autoIncrement;
}

@Override
public void addErrorStat() {
hasException = true;
this.updateCounts.add((long) Statement.EXECUTE_FAILED);
updateCounts.add((long) Statement.EXECUTE_FAILED);
}

/**
* Clear error state, used for clear exception after first batch query, when fall back to per-query execution.
*
*/
@Override
public void clearErrorStat() {
public void reset() {
insertIds.clear();
updateCounts.clear();
insertIdNumber = 0;
hasException = false;
this.updateCounts.remove((long) Statement.EXECUTE_FAILED);
rewritten = false;
}

public void addResultSetStat() {
Expand All @@ -111,9 +112,9 @@ public void addResultSetStat() {

@Override
public void addSuccessStat(long updateCount, long insertId) {
this.insertIds.add(insertId);
insertIds.add(insertId);
insertIdNumber += updateCount;
this.updateCounts.add(updateCount);
updateCounts.add(updateCount);
}

@Override
Expand Down
Expand Up @@ -80,38 +80,42 @@ public class CmdInformationMultiple implements CmdInformation {
* @param autoIncrement connection auto increment value.
*/
public CmdInformationMultiple(int expectedSize, int autoIncrement) {
insertIds = new ArrayList<>(expectedSize);
updateCounts = new ArrayList<>(expectedSize);
this.expectedSize = expectedSize;
this.insertIds = new ArrayList<>(expectedSize);
this.updateCounts = new ArrayList<>(expectedSize);
this.autoIncrement = autoIncrement;
}

@Override
public void addErrorStat() {
hasException = true;
this.updateCounts.add((long) Statement.EXECUTE_FAILED);
updateCounts.add((long) Statement.EXECUTE_FAILED);
}

/**
* Clear error state, used for clear exception after first batch query, when fall back to per-query execution.
*
*/
@Override
public void clearErrorStat() {
public void reset() {
insertIds.clear();
updateCounts.clear();
insertIdNumber = 0;
moreResults = 0;
hasException = false;
this.updateCounts.remove((long) Statement.EXECUTE_FAILED);
rewritten = false;
}


public void addResultSetStat() {
this.updateCounts.add((long) RESULT_SET_VALUE);
updateCounts.add((long) RESULT_SET_VALUE);
}

@Override
public void addSuccessStat(long updateCount, long insertId) {
this.insertIds.add(insertId);
insertIds.add(insertId);
insertIdNumber += updateCount;
this.updateCounts.add(updateCount);
updateCounts.add(updateCount);
}

@Override
Expand Down
Expand Up @@ -103,7 +103,7 @@ public void addErrorStat() {
}

@Override
public void clearErrorStat() {
public void reset() {
//not expected
}

Expand Down
Expand Up @@ -430,10 +430,10 @@ private boolean executeBulkBatch(Results results, String sql, ServerPrepareResul
getResult(results);
} catch (SQLException sqle) {
if ("HY000".equals(sqle.getSQLState()) && sqle.getErrorCode() == 1295) {
//query contain SELECT or DELETE. cannot be handle by BULK protocol
//query contain commands that cannot be handled by BULK protocol
// clear error and special error code, so it won't leak anywhere
// and wouldn't be misinterpreted as an additional update count
results.getCmdInformation().clearErrorStat();
results.getCmdInformation().reset();
return false;
}
if (exception == null) {
Expand Down
67 changes: 51 additions & 16 deletions src/test/java/org/mariadb/jdbc/StatementTest.java
Expand Up @@ -367,35 +367,70 @@ public void testFallbackBatchUpdate() throws SQLException {
Assume.assumeTrue(doPrecisionTest);

createTable("testFallbackBatchUpdate", "col int");
int[] results;
int queriesInBatch = 2;
Statement statement = sharedConnection.createStatement();

//add 100 data
StringBuilder sb = new StringBuilder("INSERT INTO testFallbackBatchUpdate(col) VALUES (0)");
for (int i = 1; i < 100; i++) sb.append(",(").append(i).append(")");
statement.execute(sb.toString());

try (PreparedStatement preparedStatement = sharedConnection.prepareStatement(
"DELETE FROM testFallbackBatchUpdate WHERE col = ? ")) {
for (int i = 0; i < queriesInBatch; i++) {
preparedStatement.setInt(1, 0);
preparedStatement.addBatch();
"DELETE FROM testFallbackBatchUpdate WHERE col = ?")) {
preparedStatement.setInt(1, 10);
preparedStatement.addBatch();

preparedStatement.setInt(1, 15);
preparedStatement.addBatch();

int[] results = preparedStatement.executeBatch();
assertEquals(2, results.length);
}

//check results
try (ResultSet rs = statement.executeQuery("SELECT * FROM testFallbackBatchUpdate")) {
for (int i = 0; i < 100; i++) {
if (i == 10 || i == 15) continue;
assertTrue(rs.next());
assertEquals(i, rs.getInt(1));
}
results = preparedStatement.executeBatch();
assertFalse(rs.next());
}
assertEquals(results.length, queriesInBatch);
}

@Test
public void testProperBatchUpdate() throws SQLException {
Assume.assumeTrue(doPrecisionTest);

createTable("testProperBatchUpdate", "col int, col2 int");
int[] results;
int queriesInBatch = 3;
Statement statement = sharedConnection.createStatement();

//add 100 data
StringBuilder sb = new StringBuilder("INSERT INTO testProperBatchUpdate(col, col2) VALUES (0,0)");
for (int i = 1; i < 100; i++) sb.append(",(").append(i).append(",0)");
statement.execute(sb.toString());

try (PreparedStatement preparedStatement = sharedConnection.prepareStatement(
"UPDATE testProperBatchUpdate set col2 = ? WHERE col = ? ")) {
for (int i = 0; i < queriesInBatch; i++) {
preparedStatement.setInt(1, i);
preparedStatement.setInt(2, i);
preparedStatement.addBatch();
preparedStatement.setInt(1, 10);
preparedStatement.setInt(2, 10);
preparedStatement.addBatch();

preparedStatement.setInt(1, 15);
preparedStatement.setInt(2, 15);
preparedStatement.addBatch();

int[] results = preparedStatement.executeBatch();
assertEquals(2, results.length);
}

//check results
try (ResultSet rs = statement.executeQuery("SELECT * FROM testProperBatchUpdate")) {
for (int i = 0; i < 100; i++) {
assertTrue(rs.next());
assertEquals(i, rs.getInt(1));
assertEquals((i == 10 || i == 15) ? i : 0, rs.getInt(2));
}
results = preparedStatement.executeBatch();
assertFalse(rs.next());
}
assertEquals(results.length, queriesInBatch);
}
}

0 comments on commit ab8e666

Please sign in to comment.