Permalink
Browse files

fix: make sure executeBatch returns error response for rows that woul…

…d not get into database

Previously, executeBatch did not consider that under error condition, a part of the batch might get rolled back.

closes #502, closes #503
  • Loading branch information...
vlsi committed Feb 3, 2016
1 parent 8c9898a commit d6e3b17e41ded02bd111a7644c0b47b936862e6e
@@ -72,6 +72,7 @@ public PGStream(SocketFactory socketFactory, HostSpec hostSpec, int timeout) thr
/**
* Constructor: Connect to the PostgreSQL back end and return a stream connection.
*
* @param socketFactory socket factory
* @param hostSpec the host and port to connect to
* @throws IOException if an IOException occurs below it.
* @deprecated use {@link #PGStream(SocketFactory, org.postgresql.util.HostSpec, int)}
@@ -10,6 +10,7 @@
package org.postgresql.core;

import org.postgresql.copy.CopyOperation;
import org.postgresql.jdbc.BatchResultHandler;

import java.sql.SQLException;

@@ -24,7 +25,7 @@
* {@link #createParameterizedQuery})
* <li>execution methods for created Query objects (
* {@link #execute(Query, ParameterList, ResultHandler, int, int, int)} for single queries and
* {@link #execute(Query[], ParameterList[], ResultHandler, int, int, int)} for batches of queries)
* {@link #execute(Query[], ParameterList[], BatchResultHandler, int, int, int)} for batches of queries)
* <li>a fastpath call interface ({@link #createFastpathParameters} and {@link #fastpathCall}).
* </ul>
*
@@ -142,7 +143,7 @@ void execute(Query query, ParameterList parameters, ResultHandler handler, int m
* @param flags a combination of QUERY_* flags indicating how to handle the query.
* @throws SQLException if query execution fails
*/
void execute(Query[] queries, ParameterList[] parameterLists, ResultHandler handler, int maxRows,
void execute(Query[] queries, ParameterList[] parameterLists, BatchResultHandler handler, int maxRows,
int fetchSize, int flags) throws SQLException;

/**
@@ -20,6 +20,7 @@
import org.postgresql.core.QueryExecutor;
import org.postgresql.core.ResultCursor;
import org.postgresql.core.ResultHandler;
import org.postgresql.jdbc.BatchResultHandler;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
@@ -271,9 +272,9 @@ public synchronized void execute(Query query, ParameterList parameters, ResultHa

// Nothing special yet, just run the queries one at a time.
public synchronized void execute(Query[] queries, ParameterList[] parameters,
ResultHandler handler, int maxRows, int fetchSize, int flags) throws SQLException {
final ResultHandler delegateHandler = handler;
handler = new ResultHandler() {
BatchResultHandler batchHandler, int maxRows, int fetchSize, int flags) throws SQLException {
final ResultHandler delegateHandler = batchHandler;
ResultHandler handler = new ResultHandler() {
public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
ResultCursor cursor) {
delegateHandler.handleResultRows(fromQuery, fields, tuples, cursor);
@@ -297,6 +298,7 @@ public void handleCompletion() throws SQLException {

for (int i = 0; i < queries.length; ++i) {
execute((V2Query) queries[i], (SimpleParameterList) parameters[i], handler, maxRows, flags);
batchHandler.secureProgress();
}

delegateHandler.handleCompletion();
@@ -25,6 +25,7 @@
import org.postgresql.core.ResultCursor;
import org.postgresql.core.ResultHandler;
import org.postgresql.core.Utils;
import org.postgresql.jdbc.BatchResultHandler;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
@@ -194,7 +195,7 @@ public synchronized void execute(Query query, ParameterList parameters, ResultHa
handler = sendQueryPreamble(handler, flags);
ErrorTrackingResultHandler trackingHandler = new ErrorTrackingResultHandler(handler);
sendQuery((V3Query) query, (V3ParameterList) parameters, maxRows, fetchSize, flags,
trackingHandler);
trackingHandler, null);
sendSync();
processResults(handler, flags);
estimatedReceiveBufferBytes = 0;
@@ -317,10 +318,10 @@ boolean hasErrors() {
}

public synchronized void execute(Query[] queries, ParameterList[] parameterLists,
ResultHandler handler, int maxRows, int fetchSize, int flags) throws SQLException {
BatchResultHandler batchHandler, int maxRows, int fetchSize, int flags) throws SQLException {
waitOnLock();
if (logger.logDebug()) {
logger.debug("batch execute " + queries.length + " queries, handler=" + handler + ", maxRows="
logger.debug("batch execute " + queries.length + " queries, handler=" + batchHandler + ", maxRows="
+ maxRows + ", fetchSize=" + fetchSize + ", flags=" + flags);
}

@@ -334,8 +335,9 @@ public synchronized void execute(Query[] queries, ParameterList[] parameterLists
}
}

ResultHandler handler = batchHandler;
try {
handler = sendQueryPreamble(handler, flags);
handler = sendQueryPreamble(batchHandler, flags);
ErrorTrackingResultHandler trackingHandler = new ErrorTrackingResultHandler(handler);
estimatedReceiveBufferBytes = 0;

@@ -346,7 +348,7 @@ public synchronized void execute(Query[] queries, ParameterList[] parameterLists
parameters = SimpleQuery.NO_PARAMETERS;
}

sendQuery(query, parameters, maxRows, fetchSize, flags, trackingHandler);
sendQuery(query, parameters, maxRows, fetchSize, flags, trackingHandler, batchHandler);

if (trackingHandler.hasErrors()) {
break;
@@ -1146,7 +1148,9 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)
* See the comments above MAX_BUFFERED_RECV_BYTES's declaration for details.
*/
private void flushIfDeadlockRisk(Query query, boolean disallowBatching,
ErrorTrackingResultHandler trackingHandler, final int flags) throws IOException {
ErrorTrackingResultHandler trackingHandler,
BatchResultHandler batchHandler,
final int flags) throws IOException {
// Assume all statements need at least this much reply buffer space,
// plus params
estimatedReceiveBufferBytes += NODATA_QUERY_RESPONSE_SIZE_BYTES;
@@ -1182,6 +1186,9 @@ private void flushIfDeadlockRisk(Query query, boolean disallowBatching,
sendSync();
processResults(trackingHandler, flags);
estimatedReceiveBufferBytes = 0;
if (batchHandler != null) {
batchHandler.secureProgress();
}
}

}
@@ -1190,7 +1197,8 @@ private void flushIfDeadlockRisk(Query query, boolean disallowBatching,
* Send a query to the backend.
*/
private void sendQuery(V3Query query, V3ParameterList parameters, int maxRows, int fetchSize,
int flags, ErrorTrackingResultHandler trackingHandler) throws IOException, SQLException {
int flags, ErrorTrackingResultHandler trackingHandler,
BatchResultHandler batchHandler) throws IOException, SQLException {
// Now the query itself.
SimpleQuery[] subqueries = query.getSubqueries();
SimpleParameterList[] subparams = parameters.getSubparams();
@@ -1201,7 +1209,7 @@ private void sendQuery(V3Query query, V3ParameterList parameters, int maxRows, i
boolean disallowBatching = (flags & QueryExecutor.QUERY_DISALLOW_BATCHING) != 0;

if (subqueries == null) {
flushIfDeadlockRisk(query, disallowBatching, trackingHandler, flags);
flushIfDeadlockRisk(query, disallowBatching, trackingHandler, batchHandler, flags);

// If we saw errors, don't send anything more.
if (!trackingHandler.hasErrors()) {
@@ -1211,7 +1219,7 @@ private void sendQuery(V3Query query, V3ParameterList parameters, int maxRows, i
} else {
for (int i = 0; i < subqueries.length; ++i) {
final SimpleQuery subquery = subqueries[i];
flushIfDeadlockRisk(subquery, disallowBatching, trackingHandler, flags);
flushIfDeadlockRisk(subquery, disallowBatching, trackingHandler, batchHandler, flags);

// If we saw errors, don't send anything more.
if (trackingHandler.hasErrors()) {
@@ -954,27 +954,31 @@ public void setAllowEncodingChanges(boolean allow) {
}

/**
* @return socket factory class name
* @see PGProperty#SOCKET_FACTORY
*/
public String getSocketFactory() {
return PGProperty.SOCKET_FACTORY.get(properties);
}

/**
* @param socketFactoryClassName socket factory class name
* @see PGProperty#SOCKET_FACTORY
*/
public void setSocketFactory(String socketFactoryClassName) {
PGProperty.SOCKET_FACTORY.set(properties, socketFactoryClassName);
}

/**
* @return socket factory argument
* @see PGProperty#SOCKET_FACTORY_ARG
*/
public String getSocketFactoryArg() {
return PGProperty.SOCKET_FACTORY_ARG.get(properties);
}

/**
* @param socketFactoryArg socket factory argument
* @see PGProperty#SOCKET_FACTORY_ARG
*/
public void setSocketFactoryArg(String socketFactoryArg) {
@@ -13,9 +13,15 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

class BatchResultHandler implements ResultHandler {
/**
* Internal class, it is not a part of public API.
*/
public class BatchResultHandler implements ResultHandler {
private PgStatement pgStatement;
private BatchUpdateException batchException = null;
private int resultIndex = 0;
@@ -24,7 +30,11 @@
private final ParameterList[] parameterLists;
private final int[] updateCounts;
private final boolean expectGeneratedKeys;
private ResultSet generatedKeys;
private PgResultSet generatedKeys;
private int committedRows; // 0 means no rows committed. 1 means row 0 was committed, and so on
private List<List<byte[][]>> allGeneratedRows;
private List<byte[][]> latestGeneratedRows;
private PgResultSet latestGeneratedKeysRs;

BatchResultHandler(PgStatement pgStatement, Query[] queries, ParameterList[] parameterLists,
int[] updateCounts, boolean expectGeneratedKeys) {
@@ -33,50 +43,86 @@
this.parameterLists = parameterLists;
this.updateCounts = updateCounts;
this.expectGeneratedKeys = expectGeneratedKeys;
this.allGeneratedRows = !expectGeneratedKeys ? null : new ArrayList<List<byte[][]>>();
}

public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
ResultCursor cursor) {
// If SELECT, then handleCommandStatus call would just be missing
resultIndex++;
if (!expectGeneratedKeys) {
handleError(new PSQLException(GT.tr("A result was returned when none was expected."),
PSQLState.TOO_MANY_RESULTS));
} else {
if (generatedKeys == null) {
try {
generatedKeys = pgStatement.createResultSet(fromQuery, fields, tuples, cursor);
} catch (SQLException e) {
handleError(e);

}
} else {
((PgResultSet) generatedKeys).addRows(tuples);
// No rows expected -> just ignore rows
return;
}
if (generatedKeys == null) {
try {
// If SELECT, the resulting ResultSet is not valid
// Thus it is up to handleCommandStatus to decide if resultSet is good enough
latestGeneratedKeysRs =
(PgResultSet) pgStatement.createResultSet(fromQuery, fields,
new ArrayList<byte[][]>(), cursor);
} catch (SQLException e) {
handleError(e);
}
}
latestGeneratedRows = tuples;
}

public void handleCommandStatus(String status, int updateCount, long insertOID) {
if (latestGeneratedRows != null) {
// We have DML. Decrease resultIndex that was just increased in handleResultRows
resultIndex--;
// If exception thrown, no need to collect generated keys
// Note: some generated keys might be secured in generatedKeys
if (updateCount > 0 && batchException == null) {
allGeneratedRows.add(latestGeneratedRows);
if (generatedKeys == null) {
generatedKeys = latestGeneratedKeysRs;
}
}
latestGeneratedRows = null;
}

if (resultIndex >= updateCounts.length) {
handleError(new PSQLException(GT.tr("Too many update results were returned."),
PSQLState.TOO_MANY_RESULTS));
return;
}
latestGeneratedKeysRs = null;

updateCounts[resultIndex++] = updateCount;
}

public void secureProgress() {
try {
if (batchException == null && pgStatement.getConnection().getAutoCommit()) {
committedRows = resultIndex;
updateGeneratedKeys();
}
} catch (SQLException e) {
/* Should not get here */
}
}

private void updateGeneratedKeys() {
if (allGeneratedRows == null || allGeneratedRows.isEmpty()) {
return;
}
for (List<byte[][]> rows : allGeneratedRows) {
generatedKeys.addRows(rows);
}
allGeneratedRows.clear();
}

public void handleWarning(SQLWarning warning) {
pgStatement.addWarning(warning);
}

public void handleError(SQLException newError) {
if (batchException == null) {
int[] successCounts;

if (resultIndex >= updateCounts.length) {
successCounts = updateCounts;
} else {
successCounts = new int[resultIndex];
System.arraycopy(updateCounts, 0, successCounts, 0, resultIndex);
Arrays.fill(updateCounts, committedRows, updateCounts.length, Statement.EXECUTE_FAILED);
if (allGeneratedRows != null) {
allGeneratedRows.clear();
}

String queryString = "<unknown>";
@@ -87,7 +133,7 @@ public void handleError(SQLException newError) {
batchException = new BatchUpdateException(
GT.tr("Batch entry {0} {1} was aborted. Call getNextException to see the cause.",
new Object[]{resultIndex, queryString}),
newError.getSQLState(), successCounts);
newError.getSQLState(), updateCounts);
}

batchException.setNextException(newError);
@@ -97,6 +143,7 @@ public void handleCompletion() throws SQLException {
if (batchException != null) {
throw batchException;
}
updateGeneratedKeys();
}

public ResultSet getGeneratedKeys() {
@@ -11,7 +11,6 @@
import org.postgresql.Driver;
import org.postgresql.core.ParameterList;
import org.postgresql.core.Query;
import org.postgresql.core.ResultHandler;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
@@ -435,7 +434,7 @@ private void checkIndex(int parameterIndex, boolean fetchingData) throws SQLExce
}

@Override
protected ResultHandler createBatchHandler(int[] updateCounts, Query[] queries,
protected BatchResultHandler createBatchHandler(int[] updateCounts, Query[] queries,
ParameterList[] parameterLists) {
return new CallableBatchResultHandler(this, queries, parameterLists, updateCounts);
}
Oops, something went wrong.

0 comments on commit d6e3b17

Please sign in to comment.