Skip to content

Commit

Permalink
JDBC batch updates
Browse files Browse the repository at this point in the history
Add support for JDBC batch updates. It includes an implementation of
Statement.*Batch(...) as well as PreparedStatement.*Batch() methods.

Under the hood SQLConnection uses the pipelining sending requests one by
one asynchronously and awaiting all of them. There are some issues
regarding vinyl storage engine where execution order are not specified
and DDL statements which are not transactional.

Closes: #62
  • Loading branch information
nicktorwald committed Jul 6, 2019
1 parent b53e0ba commit 05bfde6
Show file tree
Hide file tree
Showing 10 changed files with 511 additions and 73 deletions.
26 changes: 26 additions & 0 deletions src/main/java/org/tarantool/jdbc/SQLBatchResultHolder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.tarantool.jdbc;

import java.util.List;

/**
* Wrapper for batch SQL query results.
*/
public class SQLBatchResultHolder {

private final List<SQLResultHolder> results;
private final Exception error;

public SQLBatchResultHolder(List<SQLResultHolder> results, Exception error) {
this.results = results;
this.error = error;
}

public List<SQLResultHolder> getResults() {
return results;
}

public Exception getError() {
return error;
}

}
117 changes: 91 additions & 26 deletions src/main/java/org/tarantool/jdbc/SQLConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

/**
* Tarantool {@link Connection} implementation.
Expand Down Expand Up @@ -525,46 +527,59 @@ public int getNetworkTimeout() throws SQLException {
return (int) client.getOperationTimeout();
}

protected SQLResultHolder execute(long timeout, String sql, Object... args) throws SQLException {
protected SQLResultHolder execute(long timeout, SQLQueryHolder query) throws SQLException {
checkNotClosed();
return (useNetworkTimeout(timeout))
? executeWithNetworkTimeout(query)
: executeWithQueryTimeout(timeout, query);
}

protected SQLBatchResultHolder executeBatch(long timeout, List<SQLQueryHolder> queries) throws SQLException {
checkNotClosed();
SQLTarantoolClientImpl.SQLRawOps sqlOps = client.sqlRawOps();
SQLBatchResultHolder batchResult = useNetworkTimeout(timeout)
? sqlOps.executeBatch(queries)
: sqlOps.executeBatch(timeout, queries);

return batchResult;
}

private boolean useNetworkTimeout(long timeout) throws SQLException {
int networkTimeout = getNetworkTimeout();
return (timeout == 0 || (networkTimeout > 0 && networkTimeout < timeout))
? executeWithNetworkTimeout(sql, args)
: executeWithStatementTimeout(timeout, sql, args);
return timeout == 0 || (networkTimeout > 0 && networkTimeout < timeout);
}

private SQLResultHolder executeWithNetworkTimeout(String sql, Object... args) throws SQLException {
private SQLResultHolder executeWithNetworkTimeout(SQLQueryHolder query) throws SQLException {
try {
return client.sqlRawOps().execute(sql, args);
return client.sqlRawOps().execute(query);
} catch (Exception e) {
handleException(e);
throw new SQLException(formatError(sql, args), e);
throw new SQLException(formatError(query), e);
}
}

/**
* Executes a query using a custom timeout.
*
* @param timeout query timeout
* @param sql query
* @param args query bindings
* @param query query
*
* @return SQL result holder
*
* @throws StatementTimeoutException if query execution took more than query timeout
* @throws SQLException if any other errors occurred
*/
private SQLResultHolder executeWithStatementTimeout(long timeout, String sql, Object... args) throws SQLException {
private SQLResultHolder executeWithQueryTimeout(long timeout, SQLQueryHolder query) throws SQLException {
try {
return client.sqlRawOps().execute(timeout, sql, args);
return client.sqlRawOps().execute(timeout, query);
} catch (Exception e) {
// statement timeout should not affect the current connection
// but can be handled by the caller side
if (e.getCause() instanceof TimeoutException) {
throw new StatementTimeoutException(formatError(sql, args), e.getCause());
throw new StatementTimeoutException(formatError(query), e.getCause());
}
handleException(e);
throw new SQLException(formatError(sql, args), e);
throw new SQLException(formatError(query), e);
}
}

Expand Down Expand Up @@ -708,28 +723,74 @@ private void checkHoldabilitySupport(int holdability) throws SQLException {
/**
* Provides error message that contains parameters of failed SQL statement.
*
* @param sql SQL Text.
* @param params Parameters of the SQL statement.
* @param query SQL query
*
* @return Formatted error message.
*/
private static String formatError(String sql, Object... params) {
return "Failed to execute SQL: " + sql + ", params: " + Arrays.deepToString(params);
private static String formatError(SQLQueryHolder query) {
return "Failed to execute SQL: " + query.getQuery() + ", params: " + query.getParams();
}

static class SQLTarantoolClientImpl extends TarantoolClientImpl {

private Future<?> executeQuery(SQLQueryHolder queryHolder) {
return exec(Code.EXECUTE, Key.SQL_TEXT, queryHolder.getQuery(), Key.SQL_BIND, queryHolder.getParams());
}

private Future<?> executeQuery(SQLQueryHolder queryHolder, long timeoutMillis) {
return exec(
timeoutMillis, Code.EXECUTE, Key.SQL_TEXT, queryHolder.getQuery(), Key.SQL_BIND, queryHolder.getParams()
);
}

final SQLRawOps sqlRawOps = new SQLRawOps() {
@Override
public SQLResultHolder execute(String sql, Object... binds) {
return (SQLResultHolder) syncGet(exec(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, binds));
public SQLResultHolder execute(SQLQueryHolder query) {
return (SQLResultHolder) syncGet(executeQuery(query));
}

@Override
public SQLResultHolder execute(long timeoutMillis, String sql, Object... binds) {
return (SQLResultHolder) syncGet(
exec(timeoutMillis, Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, binds)
);
public SQLResultHolder execute(long timeoutMillis, SQLQueryHolder query) {
return (SQLResultHolder) syncGet(executeQuery(query, timeoutMillis));
}

@Override
public SQLBatchResultHolder executeBatch(List<SQLQueryHolder> queries) {
return executeInternal(queries, (query) -> executeQuery(query));
}

@Override
public SQLBatchResultHolder executeBatch(long timeoutMillis, List<SQLQueryHolder> queries) {
return executeInternal(queries, (query) -> executeQuery(query, timeoutMillis));
}

private SQLBatchResultHolder executeInternal(List<SQLQueryHolder> queries,
Function<SQLQueryHolder, Future<?>> fetcher) {
List<Future<?>> sqlFutures = new ArrayList<>();
// using queries pipelining to emulate a batch request
for (SQLQueryHolder query : queries) {
sqlFutures.add(fetcher.apply(query));
}
// wait for all the results
Exception lastError = null;
List<SQLResultHolder> items = new ArrayList<>(queries.size());
for (Future<?> future : sqlFutures) {
try {
SQLResultHolder result = (SQLResultHolder) syncGet(future);
if (result.isQueryResult()) {
lastError = new SQLException(
"Result set is not allowed in the batch response",
SQLStates.TOO_MANY_RESULTS.getSqlState()
);
}
items.add(result);
} catch (RuntimeException e) {
// empty result set will be treated as a wrong result
items.add(SQLResultHolder.ofEmptyQuery());
lastError = e;
}
}
return new SQLBatchResultHolder(items, lastError);
}
};

Expand Down Expand Up @@ -758,9 +819,13 @@ protected void completeSql(TarantoolOp<?> future, TarantoolPacket pack) {

interface SQLRawOps {

SQLResultHolder execute(String sql, Object... binds);
SQLResultHolder execute(SQLQueryHolder query);

SQLResultHolder execute(long timeoutMillis, SQLQueryHolder query);

SQLBatchResultHolder executeBatch(List<SQLQueryHolder> queries);

SQLResultHolder execute(long timeoutMillis, String sql, Object... binds);
SQLBatchResultHolder executeBatch(long timeoutMillis, List<SQLQueryHolder> queries);

}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/tarantool/jdbc/SQLDatabaseMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ public boolean insertsAreDetected(int type) throws SQLException {

@Override
public boolean supportsBatchUpdates() throws SQLException {
return false;
return true;
}

@Override
Expand Down
79 changes: 50 additions & 29 deletions src/main/java/org/tarantool/jdbc/SQLPreparedStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,25 @@
import java.sql.SQLXML;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class SQLPreparedStatement extends SQLStatement implements PreparedStatement {

static final String INVALID_CALL_MSG = "The method cannot be called on a PreparedStatement.";
final String sql;
final Map<Integer, Object> params;
private static final String INVALID_CALL_MESSAGE = "The method cannot be called on a PreparedStatement.";

private final String sql;
private final Map<Integer, Object> parameters;

private List<Map<Integer, Object>> batchParameters = new ArrayList<>();

public SQLPreparedStatement(SQLConnection connection, String sql) throws SQLException {
super(connection);
this.sql = sql;
this.params = new HashMap<>();
this.parameters = new HashMap<>();
}

public SQLPreparedStatement(SQLConnection connection,
Expand All @@ -46,39 +50,28 @@ public SQLPreparedStatement(SQLConnection connection,
int resultSetHoldability) throws SQLException {
super(connection, resultSetType, resultSetConcurrency, resultSetHoldability);
this.sql = sql;
this.params = new HashMap<>();
this.parameters = new HashMap<>();
}

@Override
public ResultSet executeQuery() throws SQLException {
checkNotClosed();
if (!executeInternal(sql, getParams())) {
if (!executeInternal(sql, toParametersList(parameters))) {
throw new SQLException("No results were returned", SQLStates.NO_DATA.getSqlState());
}
return resultSet;
}

@Override
public ResultSet executeQuery(String sql) throws SQLException {
throw new SQLException(INVALID_CALL_MSG);
}

protected Object[] getParams() throws SQLException {
Object[] objects = new Object[params.size()];
for (int i = 1; i <= params.size(); i++) {
if (params.containsKey(i)) {
objects[i - 1] = params.get(i);
} else {
throw new SQLException("Parameter " + i + " is missing");
}
}
return objects;
checkNotClosed();
throw new SQLException(INVALID_CALL_MESSAGE);
}

@Override
public int executeUpdate() throws SQLException {
checkNotClosed();
if (executeInternal(sql, getParams())) {
if (executeInternal(sql, toParametersList(parameters))) {
throw new SQLException(
"Result was returned but nothing was expected",
SQLStates.TOO_MANY_RESULTS.getSqlState()
Expand All @@ -89,7 +82,8 @@ public int executeUpdate() throws SQLException {

@Override
public int executeUpdate(String sql) throws SQLException {
throw new SQLException(INVALID_CALL_MSG);
checkNotClosed();
throw new SQLException(INVALID_CALL_MESSAGE);
}

@Override
Expand Down Expand Up @@ -219,7 +213,7 @@ public void setBinaryStream(int parameterIndex, InputStream x) throws SQLExcepti

@Override
public void clearParameters() throws SQLException {
params.clear();
parameters.clear();
}

@Override
Expand All @@ -242,18 +236,19 @@ public void setObject(int parameterIndex,

private void setParameter(int parameterIndex, Object value) throws SQLException {
checkNotClosed();
params.put(parameterIndex, value);
parameters.put(parameterIndex, value);
}

@Override
public boolean execute() throws SQLException {
checkNotClosed();
return executeInternal(sql, getParams());
return executeInternal(sql, toParametersList(parameters));
}

@Override
public boolean execute(String sql) throws SQLException {
throw new SQLException(INVALID_CALL_MSG);
checkNotClosed();
throw new SQLException(INVALID_CALL_MESSAGE);
}

@Override
Expand Down Expand Up @@ -368,22 +363,48 @@ public void setSQLXML(int parameterIndex, SQLXML xmlObject) throws SQLException

@Override
public void addBatch(String sql) throws SQLException {
throw new SQLFeatureNotSupportedException();
checkNotClosed();
throw new SQLException(INVALID_CALL_MESSAGE);
}

@Override
public void addBatch() throws SQLException {
throw new SQLFeatureNotSupportedException();
checkNotClosed();
// shadow copy of the current parameters
batchParameters.add(new HashMap<>(parameters));
}

@Override
public int[] executeBatch() throws SQLException {
throw new SQLFeatureNotSupportedException();
checkNotClosed();
try {
List<SQLQueryHolder> queries = new ArrayList<>();
for (Map<Integer, Object> p : batchParameters) {
SQLQueryHolder of = SQLQueryHolder.of(sql, toParametersList(p));
queries.add(of);
}
return executeBatchInternal(queries);
} finally {
batchParameters.clear();
}
}

@Override
public void clearBatch() throws SQLException {
throw new SQLFeatureNotSupportedException();
checkNotClosed();
batchParameters.clear();
}

private Object[] toParametersList(Map<Integer, Object> parameters) throws SQLException {
Object[] objects = new Object[parameters.size()];
for (int i = 1; i <= parameters.size(); i++) {
if (parameters.containsKey(i)) {
objects[i - 1] = parameters.get(i);
} else {
throw new SQLException("Parameter " + i + " is missing");
}
}
return objects;
}

}

0 comments on commit 05bfde6

Please sign in to comment.