Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make sure executeBatch returns error response for rows that would not get into database #503

Merged
merged 1 commit into from Feb 7, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pgjdbc/src/main/java/org/postgresql/core/PGStream.java
Expand Up @@ -72,6 +72,7 @@ public PGStream(SocketFactory socketFactory, HostSpec hostSpec, int timeout) thr
/**
* Constructor: Connect to the PostgreSQL back end and return a stream connection.
*
* @param socketFactory socket factory
* @param hostSpec the host and port to connect to
* @throws IOException if an IOException occurs below it.
* @deprecated use {@link #PGStream(SocketFactory, org.postgresql.util.HostSpec, int)}
Expand Down
5 changes: 3 additions & 2 deletions pgjdbc/src/main/java/org/postgresql/core/QueryExecutor.java
Expand Up @@ -10,6 +10,7 @@
package org.postgresql.core;

import org.postgresql.copy.CopyOperation;
import org.postgresql.jdbc.BatchResultHandler;

import java.sql.SQLException;

Expand All @@ -24,7 +25,7 @@
* {@link #createParameterizedQuery})
* <li>execution methods for created Query objects (
* {@link #execute(Query, ParameterList, ResultHandler, int, int, int)} for single queries and
* {@link #execute(Query[], ParameterList[], ResultHandler, int, int, int)} for batches of queries)
* {@link #execute(Query[], ParameterList[], BatchResultHandler, int, int, int)} for batches of queries)
* <li>a fastpath call interface ({@link #createFastpathParameters} and {@link #fastpathCall}).
* </ul>
*
Expand Down Expand Up @@ -142,7 +143,7 @@ void execute(Query query, ParameterList parameters, ResultHandler handler, int m
* @param flags a combination of QUERY_* flags indicating how to handle the query.
* @throws SQLException if query execution fails
*/
void execute(Query[] queries, ParameterList[] parameterLists, ResultHandler handler, int maxRows,
void execute(Query[] queries, ParameterList[] parameterLists, BatchResultHandler handler, int maxRows,
int fetchSize, int flags) throws SQLException;

/**
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.postgresql.core.QueryExecutor;
import org.postgresql.core.ResultCursor;
import org.postgresql.core.ResultHandler;
import org.postgresql.jdbc.BatchResultHandler;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
Expand Down Expand Up @@ -271,9 +272,9 @@ public synchronized void execute(Query query, ParameterList parameters, ResultHa

// Nothing special yet, just run the queries one at a time.
public synchronized void execute(Query[] queries, ParameterList[] parameters,
ResultHandler handler, int maxRows, int fetchSize, int flags) throws SQLException {
final ResultHandler delegateHandler = handler;
handler = new ResultHandler() {
BatchResultHandler batchHandler, int maxRows, int fetchSize, int flags) throws SQLException {
final ResultHandler delegateHandler = batchHandler;
ResultHandler handler = new ResultHandler() {
public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
ResultCursor cursor) {
delegateHandler.handleResultRows(fromQuery, fields, tuples, cursor);
Expand All @@ -297,6 +298,7 @@ public void handleCompletion() throws SQLException {

for (int i = 0; i < queries.length; ++i) {
execute((V2Query) queries[i], (SimpleParameterList) parameters[i], handler, maxRows, flags);
batchHandler.secureProgress();
}

delegateHandler.handleCompletion();
Expand Down
26 changes: 17 additions & 9 deletions pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
Expand Up @@ -25,6 +25,7 @@
import org.postgresql.core.ResultCursor;
import org.postgresql.core.ResultHandler;
import org.postgresql.core.Utils;
import org.postgresql.jdbc.BatchResultHandler;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
Expand Down Expand Up @@ -194,7 +195,7 @@ public synchronized void execute(Query query, ParameterList parameters, ResultHa
handler = sendQueryPreamble(handler, flags);
ErrorTrackingResultHandler trackingHandler = new ErrorTrackingResultHandler(handler);
sendQuery((V3Query) query, (V3ParameterList) parameters, maxRows, fetchSize, flags,
trackingHandler);
trackingHandler, null);
sendSync();
processResults(handler, flags);
estimatedReceiveBufferBytes = 0;
Expand Down Expand Up @@ -317,10 +318,10 @@ boolean hasErrors() {
}

public synchronized void execute(Query[] queries, ParameterList[] parameterLists,
ResultHandler handler, int maxRows, int fetchSize, int flags) throws SQLException {
BatchResultHandler batchHandler, int maxRows, int fetchSize, int flags) throws SQLException {
waitOnLock();
if (logger.logDebug()) {
logger.debug("batch execute " + queries.length + " queries, handler=" + handler + ", maxRows="
logger.debug("batch execute " + queries.length + " queries, handler=" + batchHandler + ", maxRows="
+ maxRows + ", fetchSize=" + fetchSize + ", flags=" + flags);
}

Expand All @@ -334,8 +335,9 @@ public synchronized void execute(Query[] queries, ParameterList[] parameterLists
}
}

ResultHandler handler = batchHandler;
try {
handler = sendQueryPreamble(handler, flags);
handler = sendQueryPreamble(batchHandler, flags);
ErrorTrackingResultHandler trackingHandler = new ErrorTrackingResultHandler(handler);
estimatedReceiveBufferBytes = 0;

Expand All @@ -346,7 +348,7 @@ public synchronized void execute(Query[] queries, ParameterList[] parameterLists
parameters = SimpleQuery.NO_PARAMETERS;
}

sendQuery(query, parameters, maxRows, fetchSize, flags, trackingHandler);
sendQuery(query, parameters, maxRows, fetchSize, flags, trackingHandler, batchHandler);

if (trackingHandler.hasErrors()) {
break;
Expand Down Expand Up @@ -1146,7 +1148,9 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)
* See the comments above MAX_BUFFERED_RECV_BYTES's declaration for details.
*/
private void flushIfDeadlockRisk(Query query, boolean disallowBatching,
ErrorTrackingResultHandler trackingHandler, final int flags) throws IOException {
ErrorTrackingResultHandler trackingHandler,
BatchResultHandler batchHandler,
final int flags) throws IOException {
// Assume all statements need at least this much reply buffer space,
// plus params
estimatedReceiveBufferBytes += NODATA_QUERY_RESPONSE_SIZE_BYTES;
Expand Down Expand Up @@ -1182,6 +1186,9 @@ private void flushIfDeadlockRisk(Query query, boolean disallowBatching,
sendSync();
processResults(trackingHandler, flags);
estimatedReceiveBufferBytes = 0;
if (batchHandler != null) {
batchHandler.secureProgress();
}
}

}
Expand All @@ -1190,7 +1197,8 @@ private void flushIfDeadlockRisk(Query query, boolean disallowBatching,
* Send a query to the backend.
*/
private void sendQuery(V3Query query, V3ParameterList parameters, int maxRows, int fetchSize,
int flags, ErrorTrackingResultHandler trackingHandler) throws IOException, SQLException {
int flags, ErrorTrackingResultHandler trackingHandler,
BatchResultHandler batchHandler) throws IOException, SQLException {
// Now the query itself.
SimpleQuery[] subqueries = query.getSubqueries();
SimpleParameterList[] subparams = parameters.getSubparams();
Expand All @@ -1201,7 +1209,7 @@ private void sendQuery(V3Query query, V3ParameterList parameters, int maxRows, i
boolean disallowBatching = (flags & QueryExecutor.QUERY_DISALLOW_BATCHING) != 0;

if (subqueries == null) {
flushIfDeadlockRisk(query, disallowBatching, trackingHandler, flags);
flushIfDeadlockRisk(query, disallowBatching, trackingHandler, batchHandler, flags);

// If we saw errors, don't send anything more.
if (!trackingHandler.hasErrors()) {
Expand All @@ -1211,7 +1219,7 @@ private void sendQuery(V3Query query, V3ParameterList parameters, int maxRows, i
} else {
for (int i = 0; i < subqueries.length; ++i) {
final SimpleQuery subquery = subqueries[i];
flushIfDeadlockRisk(subquery, disallowBatching, trackingHandler, flags);
flushIfDeadlockRisk(subquery, disallowBatching, trackingHandler, batchHandler, flags);

// If we saw errors, don't send anything more.
if (trackingHandler.hasErrors()) {
Expand Down
Expand Up @@ -954,27 +954,31 @@ public void setAllowEncodingChanges(boolean allow) {
}

/**
* @return socket factory class name
* @see PGProperty#SOCKET_FACTORY
*/
public String getSocketFactory() {
return PGProperty.SOCKET_FACTORY.get(properties);
}

/**
* @param socketFactoryClassName socket factory class name
* @see PGProperty#SOCKET_FACTORY
*/
public void setSocketFactory(String socketFactoryClassName) {
PGProperty.SOCKET_FACTORY.set(properties, socketFactoryClassName);
}

/**
* @return socket factory argument
* @see PGProperty#SOCKET_FACTORY_ARG
*/
public String getSocketFactoryArg() {
return PGProperty.SOCKET_FACTORY_ARG.get(properties);
}

/**
* @param socketFactoryArg socket factory argument
* @see PGProperty#SOCKET_FACTORY_ARG
*/
public void setSocketFactoryArg(String socketFactoryArg) {
Expand Down
91 changes: 69 additions & 22 deletions pgjdbc/src/main/java/org/postgresql/jdbc/BatchResultHandler.java
Expand Up @@ -13,9 +13,15 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

class BatchResultHandler implements ResultHandler {
/**
* Internal class, it is not a part of public API.
*/
public class BatchResultHandler implements ResultHandler {
private PgStatement pgStatement;
private BatchUpdateException batchException = null;
private int resultIndex = 0;
Expand All @@ -24,7 +30,11 @@ class BatchResultHandler implements ResultHandler {
private final ParameterList[] parameterLists;
private final int[] updateCounts;
private final boolean expectGeneratedKeys;
private ResultSet generatedKeys;
private PgResultSet generatedKeys;
private int committedRows; // 0 means no rows committed. 1 means row 0 was committed, and so on
private List<List<byte[][]>> allGeneratedRows;
private List<byte[][]> latestGeneratedRows;
private PgResultSet latestGeneratedKeysRs;

BatchResultHandler(PgStatement pgStatement, Query[] queries, ParameterList[] parameterLists,
int[] updateCounts, boolean expectGeneratedKeys) {
Expand All @@ -33,50 +43,86 @@ class BatchResultHandler implements ResultHandler {
this.parameterLists = parameterLists;
this.updateCounts = updateCounts;
this.expectGeneratedKeys = expectGeneratedKeys;
this.allGeneratedRows = !expectGeneratedKeys ? null : new ArrayList<List<byte[][]>>();
}

public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
ResultCursor cursor) {
// If SELECT, then handleCommandStatus call would just be missing
resultIndex++;
if (!expectGeneratedKeys) {
handleError(new PSQLException(GT.tr("A result was returned when none was expected."),
PSQLState.TOO_MANY_RESULTS));
} else {
if (generatedKeys == null) {
try {
generatedKeys = pgStatement.createResultSet(fromQuery, fields, tuples, cursor);
} catch (SQLException e) {
handleError(e);

}
} else {
((PgResultSet) generatedKeys).addRows(tuples);
// No rows expected -> just ignore rows
return;
}
if (generatedKeys == null) {
try {
// If SELECT, the resulting ResultSet is not valid
// Thus it is up to handleCommandStatus to decide if resultSet is good enough
latestGeneratedKeysRs =
(PgResultSet) pgStatement.createResultSet(fromQuery, fields,
new ArrayList<byte[][]>(), cursor);
} catch (SQLException e) {
handleError(e);
}
}
latestGeneratedRows = tuples;
}

public void handleCommandStatus(String status, int updateCount, long insertOID) {
if (latestGeneratedRows != null) {
// We have DML. Decrease resultIndex that was just increased in handleResultRows
resultIndex--;
// If exception thrown, no need to collect generated keys
// Note: some generated keys might be secured in generatedKeys
if (updateCount > 0 && batchException == null) {
allGeneratedRows.add(latestGeneratedRows);
if (generatedKeys == null) {
generatedKeys = latestGeneratedKeysRs;
}
}
latestGeneratedRows = null;
}

if (resultIndex >= updateCounts.length) {
handleError(new PSQLException(GT.tr("Too many update results were returned."),
PSQLState.TOO_MANY_RESULTS));
return;
}
latestGeneratedKeysRs = null;

updateCounts[resultIndex++] = updateCount;
}

public void secureProgress() {
try {
if (batchException == null && pgStatement.getConnection().getAutoCommit()) {
committedRows = resultIndex;
updateGeneratedKeys();
}
} catch (SQLException e) {
/* Should not get here */
}
}

private void updateGeneratedKeys() {
if (allGeneratedRows == null || allGeneratedRows.isEmpty()) {
return;
}
for (List<byte[][]> rows : allGeneratedRows) {
generatedKeys.addRows(rows);
}
allGeneratedRows.clear();
}

public void handleWarning(SQLWarning warning) {
pgStatement.addWarning(warning);
}

public void handleError(SQLException newError) {
if (batchException == null) {
int[] successCounts;

if (resultIndex >= updateCounts.length) {
successCounts = updateCounts;
} else {
successCounts = new int[resultIndex];
System.arraycopy(updateCounts, 0, successCounts, 0, resultIndex);
Arrays.fill(updateCounts, committedRows, updateCounts.length, Statement.EXECUTE_FAILED);
if (allGeneratedRows != null) {
allGeneratedRows.clear();
}

String queryString = "<unknown>";
Expand All @@ -87,7 +133,7 @@ public void handleError(SQLException newError) {
batchException = new BatchUpdateException(
GT.tr("Batch entry {0} {1} was aborted. Call getNextException to see the cause.",
new Object[]{resultIndex, queryString}),
newError.getSQLState(), successCounts);
newError.getSQLState(), updateCounts);
}

batchException.setNextException(newError);
Expand All @@ -97,6 +143,7 @@ public void handleCompletion() throws SQLException {
if (batchException != null) {
throw batchException;
}
updateGeneratedKeys();
}

public ResultSet getGeneratedKeys() {
Expand Down
Expand Up @@ -11,7 +11,6 @@
import org.postgresql.Driver;
import org.postgresql.core.ParameterList;
import org.postgresql.core.Query;
import org.postgresql.core.ResultHandler;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
Expand Down Expand Up @@ -435,7 +434,7 @@ private void checkIndex(int parameterIndex, boolean fetchingData) throws SQLExce
}

@Override
protected ResultHandler createBatchHandler(int[] updateCounts, Query[] queries,
protected BatchResultHandler createBatchHandler(int[] updateCounts, Query[] queries,
ParameterList[] parameterLists) {
return new CallableBatchResultHandler(this, queries, parameterLists, updateCounts);
}
Expand Down