Skip to content

Commit

Permalink
[CONJ-672] batch timeout correction to avoid hanging when using multi…
Browse files Browse the repository at this point in the history
…-send with query time
  • Loading branch information
rusher committed Jan 10, 2019
1 parent b36423e commit 0f9661c
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 28 deletions.
14 changes: 7 additions & 7 deletions src/main/java/org/mariadb/jdbc/MariaDbStatement.java
Expand Up @@ -159,13 +159,14 @@ public MariaDbStatement clone(MariaDbConnection connection) throws CloneNotSuppo
}

// Part of query prolog - setup timeout timer
protected void setTimerTask() {
protected void setTimerTask(boolean isBatch) {
assert (timerTaskFuture == null);

timerTaskFuture = timeoutScheduler.schedule(() -> {
try {
isTimedout = true;
protocol.cancelCurrentQuery();
if (!isBatch) protocol.cancelCurrentQuery();
protocol.interrupt();
} catch (Throwable e) {
//eat
}
Expand All @@ -179,18 +180,17 @@ protected void setTimerTask() {
* <li>launch timeout timer if needed</li>
* </ol>
*
* @param forceUseOfTimer even if query timeout if possible on server using max_statement_time,
* force using timer (for batch)
* @param isBatch is batch
* @throws SQLException if statement is closed
*/
protected void executeQueryPrologue(boolean forceUseOfTimer) throws SQLException {
protected void executeQueryPrologue(boolean isBatch) throws SQLException {
executing = true;
if (closed) {
throw new SQLException("execute() is called on closed statement");
}
protocol.prolog(maxRows, protocol.getProxy() != null, connection, this);
if (queryTimeout != 0 && (!canUseServerTimeout || forceUseOfTimer)) {
setTimerTask();
if (queryTimeout != 0 && (!canUseServerTimeout || isBatch)) {
setTimerTask(isBatch);
}
}

Expand Down
Expand Up @@ -271,6 +271,7 @@ private void executeBatchInternal(int queryParameterSize) throws SQLException {
executing = true;
try {
executeQueryPrologue(serverPrepareResult);
if (queryTimeout != 0) setTimerTask(true);

results = new Results(this,
0,
Expand Down Expand Up @@ -304,7 +305,7 @@ private void executeBatchInternal(int queryParameterSize) throws SQLException {
protocol.executePreparedQuery(mustExecuteOnMaster, serverPrepareResult, results,
parameterHolder);
} catch (SQLException queryException) {
if (options.continueBatchOnError) {
if (options.continueBatchOnError && protocol.isConnected() && !protocol.isInterrupted()) {
if (exception == null) {
exception = queryException;
}
Expand Down Expand Up @@ -357,9 +358,7 @@ private void executeQueryPrologue(ServerPrepareResult serverPrepareResult) throw
}
protocol
.prologProxy(serverPrepareResult, maxRows, protocol.getProxy() != null, connection, this);
if (queryTimeout != 0) {
setTimerTask();
}

}

@Override
Expand Down Expand Up @@ -405,6 +404,8 @@ protected boolean executeInternal(int fetchSize) throws SQLException {
lock.lock();
try {
executeQueryPrologue(serverPrepareResult);
if (queryTimeout != 0) setTimerTask(false);

ParameterHolder[] parameterHolders = currentParameterHolder.values()
.toArray(new ParameterHolder[0]);

Expand Down
Expand Up @@ -313,13 +313,12 @@ private PrepareResult executeBatchStandard(int estimatedParameterCount) throws S
}
}

futureReadTask = null;

if (protocol.isInterrupted()) {
//interrupted during read, must throw an exception manually
futureReadTask.cancel(true);
throw new SQLTimeoutException("Timeout during batch execution");
}

futureReadTask = null;

} while (status.sendCmdCounter < totalExecutionNumber);

Expand Down
Expand Up @@ -82,13 +82,13 @@
import java.sql.SQLException;
import java.sql.SQLNonTransientConnectionException;
import java.sql.SQLTimeoutException;
import java.sql.SQLTransientConnectionException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -340,7 +340,8 @@ public void executeQuery(boolean mustExecuteOnMaster, Results results,
*/
public boolean executeBatchClient(boolean mustExecuteOnMaster, Results results,
final ClientPrepareResult prepareResult,
final List<ParameterHolder[]> parametersList, boolean hasLongData) throws SQLException {
final List<ParameterHolder[]> parametersList, boolean hasLongData)
throws SQLException {

//***********************************************************************************************************
// Multiple solution for batching :
Expand Down Expand Up @@ -1881,14 +1882,14 @@ public SQLException handleIoException(IOException initialException) {
UNDEFINED_SQLSTATE.getSqlState() + getTraces(), initialException);
}

return new SQLException("Could not send query: query size is >= to max_allowed_packet ("
return new SQLTransientConnectionException("Could not send query: query size is >= to max_allowed_packet ("
+ writer.getMaxAllowedPacket() + ")" + getTraces(), UNDEFINED_SQLSTATE.getSqlState(),
initialException);
}
if (!driverPreventError) {
connected = false;
}
return new SQLException(initialException.getMessage() + getTraces(),
return new SQLNonTransientConnectionException(initialException.getMessage() + getTraces(),
driverPreventError ? UNDEFINED_SQLSTATE.getSqlState() : CONNECTION_EXCEPTION.getSqlState(),
initialException);

Expand All @@ -1898,6 +1899,10 @@ public void setActiveFutureTask(FutureTask activeFutureTask) {
this.activeFutureTask = activeFutureTask;
}

public void interrupt() {
interrupted = true;
}

public boolean isInterrupted() {
return interrupted;
}
Expand Down
Expand Up @@ -55,6 +55,8 @@
import static org.mariadb.jdbc.internal.util.SqlStates.INTERRUPTED_EXCEPTION;

import java.sql.SQLException;
import java.sql.SQLNonTransientConnectionException;
import java.sql.SQLTransientConnectionException;
import java.util.List;
import java.util.concurrent.Callable;
import org.mariadb.jdbc.internal.com.read.dao.Results;
Expand Down Expand Up @@ -132,12 +134,17 @@ public AsyncMultiReadResult call() throws Exception {
int counter = 0;

//ensure to not finished loop while all bulk has not been send
outerloop:
while (!status.sendEnded || counter < status.sendSubCmdCounter) {
//read results for each send data
while (counter < status.sendSubCmdCounter) {
try {
protocol.getResult(results);
} catch (SQLException qex) {
if (qex instanceof SQLNonTransientConnectionException
|| qex instanceof SQLTransientConnectionException) {
break outerloop;
}
if (asyncMultiReadResult.getException() == null) {
asyncMultiReadResult.setException(bulkSend.handleResultException(qex, results,
parametersList, queries, counter, sendCmdInitialCounter, paramCount,
Expand Down
Expand Up @@ -180,6 +180,8 @@ boolean executeBatchServer(boolean mustExecuteOnMaster, ServerPrepareResult serv

void cancelCurrentQuery() throws SQLException;

void interrupt();

void skip() throws SQLException;

boolean checkIfMaster() throws SQLException;
Expand Down
31 changes: 21 additions & 10 deletions src/test/java/org/mariadb/jdbc/CancelTest.java
Expand Up @@ -54,6 +54,7 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.sql.BatchUpdateException;
import java.sql.Connection;
Expand Down Expand Up @@ -124,21 +125,31 @@ public void timeoutPrepareSleep() throws Exception {
}
}

@Test(timeout = 5000, expected = BatchUpdateException.class)
@Test(timeout = 5000)
public void timeoutBatch() throws Exception {
Assume.assumeFalse(sharedIsAurora());
Assume.assumeTrue(!sharedOptions().allowMultiQueries && !sharedIsRewrite());
createTable("timeoutBatch", "aa text");
createTable("timeoutBatch", "id int not null primary key auto_increment, aa text");

Statement stmt = sharedConnection.createStatement();
char[] arr = new char[1000];
Arrays.fill(arr, 'a');
String str = String.valueOf(arr);
for (int i = 0; i < 20000; i++) {
stmt.addBatch("INSERT INTO timeoutBatch VALUES ('" + str + "')");
try (Connection connection = setConnection("&maxQuerySizeToLog=92")) {
Statement stmt = connection.createStatement();

char[] arr = new char[1000];
Arrays.fill(arr, 'a');
String str = String.valueOf(arr);
for (int i = 0; i < 20000; i++) {
stmt.addBatch("INSERT INTO timeoutBatch (aa) VALUES ('" + str + "')");
}
stmt.setQueryTimeout(1);
try {
stmt.executeBatch();
fail();
} catch (BatchUpdateException b) {
ResultSet rs2 = stmt.executeQuery("SELECT 2");
assertTrue(rs2.next());
assertEquals("2", rs2.getString(1));
}
}
stmt.setQueryTimeout(1);
stmt.executeBatch();
}

@Test(timeout = 5000)
Expand Down

0 comments on commit 0f9661c

Please sign in to comment.