Permalink
Browse files

refactor: introduce ResultHandlerBase to factor out common error proc…

…essing logic
  • Loading branch information...
vlsi committed Aug 12, 2016
1 parent 0452e79 commit edcdccd658cbf3f2bfd677901f7bebac13a6259b
@@ -83,4 +83,16 @@ void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
* statements are executed successfully and which are not.
*/
void secureProgress();

/**
* Returns the first encountered exception. The rest are chained via {@link SQLException#setNextException(SQLException)}
* @return the first encountered exception
*/
SQLException getException();

/**
* Returns the first encountered warning. The rest are chained via {@link SQLException#setNextException(SQLException)}
* @return the first encountered warning
*/
SQLWarning getWarning();
}
@@ -0,0 +1,78 @@
/*-------------------------------------------------------------------------
*
* Copyright (c) 2016-2016, PostgreSQL Global Development Group
*
*
*-------------------------------------------------------------------------
*/

package org.postgresql.core;

import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.List;

/**
* Empty implementation of {@link ResultHandler} interface.
* {@link SQLException#setNextException(SQLException)} has {@code O(N)} complexity,
* so this class tracks the last exception object to speedup {@code setNextException}.
*/
public class ResultHandlerBase implements ResultHandler {
// Last exception is tracked to avoid O(N) SQLException#setNextException just in case there
// will be lots of exceptions (e.g. all batch rows fail with constraint violation or so)
private SQLException firstException;
private SQLException lastException;

private SQLWarning firstWarning;
private SQLWarning lastWarning;

@Override
public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
ResultCursor cursor) {
}

@Override
public void handleCommandStatus(String status, int updateCount, long insertOID) {
}

@Override
public void secureProgress() {
}

@Override
public void handleWarning(SQLWarning warning) {
if (firstWarning == null) {
firstWarning = lastWarning = warning;
return;
}
lastWarning.setNextException(warning);
lastWarning = warning;
}

@Override
public void handleError(SQLException error) {
if (firstException == null) {
firstException = lastException = error;
return;
}
lastException.setNextException(error);
lastException = error;
}

@Override
public void handleCompletion() throws SQLException {
if (firstException != null) {
throw firstException;
}
}

@Override
public SQLException getException() {
return firstException;
}

@Override
public SQLWarning getWarning() {
return firstWarning;
}
}
@@ -59,4 +59,20 @@ public void secureProgress() {
delegate.secureProgress();
}
}

@Override
public SQLException getException() {
if (delegate != null) {
return delegate.getException();
}
return null;
}

@Override
public SQLWarning getWarning() {
if (delegate != null) {
return delegate.getWarning();
}
return null;
}
}
@@ -23,13 +23,9 @@
*/
public class SetupQueryRunner {

private static class SimpleResultHandler implements ResultHandler {
private SQLException error;
private static class SimpleResultHandler extends ResultHandlerBase {
private List<byte[][]> tuples;

SimpleResultHandler() {
}

List<byte[][]> getResults() {
return tuples;
}
@@ -39,31 +35,10 @@ public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tup
this.tuples = tuples;
}

public void handleCommandStatus(String status, int updateCount, long insertOID) {
}

public void handleWarning(SQLWarning warning) {
// We ignore warnings. We assume we know what we're
// doing in the setup queries.
}

public void handleError(SQLException newError) {
if (error == null) {
error = newError;
} else {
error.setNextException(newError);
}
}

public void handleCompletion() throws SQLException {
if (error != null) {
throw error;
}
}

@Override
public void secureProgress() {
}
}

public static byte[][] run(QueryExecutor executor, String queryString,
@@ -26,6 +26,7 @@
import org.postgresql.core.QueryExecutorBase;
import org.postgresql.core.ResultCursor;
import org.postgresql.core.ResultHandler;
import org.postgresql.core.ResultHandlerBase;
import org.postgresql.core.ResultHandlerDelegate;
import org.postgresql.core.SqlCommand;
import org.postgresql.core.SqlCommandType;
@@ -262,9 +263,8 @@ public synchronized void execute(Query query, ParameterList parameters, ResultHa
try {
try {
handler = sendQueryPreamble(handler, flags);
ErrorTrackingResultHandler trackingHandler = new ErrorTrackingResultHandler(handler);
sendQuery(query, (V3ParameterList) parameters, maxRows, fetchSize, flags,
trackingHandler, null);
handler, null);
if ((flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) != 0) {
// Sync message is not required for 'Q' execution as 'Q' ends with ReadyForQuery message
// on its own
@@ -355,24 +355,6 @@ public synchronized void execute(Query query, ParameterList parameters, ResultHa
private static final int MAX_BUFFERED_RECV_BYTES = 64000;
private static final int NODATA_QUERY_RESPONSE_SIZE_BYTES = 250;

// Helper handler that tracks error status.
private static class ErrorTrackingResultHandler extends ResultHandlerDelegate {
private boolean sawError = false;

ErrorTrackingResultHandler(ResultHandler delegateHandler) {
super(delegateHandler);
}

public void handleError(SQLException error) {
sawError = true;
super.handleError(error);
}

boolean hasErrors() {
return sawError;
}
}

public synchronized void execute(Query[] queries, ParameterList[] parameterLists,
BatchResultHandler batchHandler, int maxRows, int fetchSize, int flags) throws SQLException {
waitOnLock();
@@ -396,7 +378,6 @@ public synchronized void execute(Query[] queries, ParameterList[] parameterLists
ResultHandler handler = batchHandler;
try {
handler = sendQueryPreamble(batchHandler, flags);
ErrorTrackingResultHandler trackingHandler = new ErrorTrackingResultHandler(handler);
estimatedReceiveBufferBytes = 0;

for (int i = 0; i < queries.length; ++i) {
@@ -406,14 +387,14 @@ public synchronized void execute(Query[] queries, ParameterList[] parameterLists
parameters = SimpleQuery.NO_PARAMETERS;
}

sendQuery(query, parameters, maxRows, fetchSize, flags, trackingHandler, batchHandler);
sendQuery(query, parameters, maxRows, fetchSize, flags, handler, batchHandler);

if (trackingHandler.hasErrors()) {
if (handler.getException() != null) {
break;
}
}

if (!trackingHandler.hasErrors()) {
if (handler.getException() == null) {
if ((flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) != 0) {
// Sync message is not required for 'Q' execution as 'Q' ends with ReadyForQuery message
// on its own
@@ -508,13 +489,8 @@ public void doSubprotocolBegin() throws SQLException {
logger.debug("Issuing BEGIN before fastpath or copy call.");
}

ResultHandler handler = new ResultHandlerDelegate(null) {
ResultHandler handler = new ResultHandlerBase() {
private boolean sawBegin = false;
private SQLException sqle = null;

public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
ResultCursor cursor) {
}

public void handleCommandStatus(String status, int updateCount, long insertOID) {
if (!sawBegin) {
@@ -537,20 +513,6 @@ public void handleWarning(SQLWarning warning) {
// them errors.
handleError(warning);
}

public void handleError(SQLException error) {
if (sqle == null) {
sqle = error;
} else {
sqle.setNextException(error);
}
}

public void handleCompletion() throws SQLException {
if (sqle != null) {
throw sqle;
}
}
};

try {
@@ -1204,7 +1166,7 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)
* See the comments above MAX_BUFFERED_RECV_BYTES's declaration for details.
*/
private void flushIfDeadlockRisk(Query query, boolean disallowBatching,
ErrorTrackingResultHandler trackingHandler,
ResultHandler resultHandler,
BatchResultHandler batchHandler,
final int flags) throws IOException {
// Assume all statements need at least this much reply buffer space,
@@ -1240,7 +1202,7 @@ private void flushIfDeadlockRisk(Query query, boolean disallowBatching,
if (disallowBatching || estimatedReceiveBufferBytes >= MAX_BUFFERED_RECV_BYTES) {
logger.debug("Forcing Sync, receive buffer full or batching disallowed");
sendSync();
processResults(trackingHandler, flags);
processResults(resultHandler, flags);
estimatedReceiveBufferBytes = 0;
if (batchHandler != null) {
batchHandler.secureProgress();
@@ -1253,7 +1215,7 @@ private void flushIfDeadlockRisk(Query query, boolean disallowBatching,
* Send a query to the backend.
*/
private void sendQuery(Query query, V3ParameterList parameters, int maxRows, int fetchSize,
int flags, ErrorTrackingResultHandler trackingHandler,
int flags, ResultHandler resultHandler,
BatchResultHandler batchHandler) throws IOException, SQLException {
// Now the query itself.
Query[] subqueries = query.getSubqueries();
@@ -1265,20 +1227,20 @@ private void sendQuery(Query query, V3ParameterList parameters, int maxRows, int
boolean disallowBatching = (flags & QueryExecutor.QUERY_DISALLOW_BATCHING) != 0;

if (subqueries == null) {
flushIfDeadlockRisk(query, disallowBatching, trackingHandler, batchHandler, flags);
flushIfDeadlockRisk(query, disallowBatching, resultHandler, batchHandler, flags);

// If we saw errors, don't send anything more.
if (!trackingHandler.hasErrors()) {
if (resultHandler.getException() == null) {
sendOneQuery((SimpleQuery) query, (SimpleParameterList) parameters, maxRows, fetchSize,
flags);
}
} else {
for (int i = 0; i < subqueries.length; ++i) {
final Query subquery = subqueries[i];
flushIfDeadlockRisk(subquery, disallowBatching, trackingHandler, batchHandler, flags);
flushIfDeadlockRisk(subquery, disallowBatching, resultHandler, batchHandler, flags);

// If we saw errors, don't send anything more.
if (trackingHandler.hasErrors()) {
if (resultHandler.getException() != null) {
break;
}

@@ -1,10 +1,18 @@
/*-------------------------------------------------------------------------
*
* Copyright (c) 2016-2016, PostgreSQL Global Development Group
*
*
*-------------------------------------------------------------------------
*/

package org.postgresql.jdbc;

import org.postgresql.core.Field;
import org.postgresql.core.ParameterList;
import org.postgresql.core.Query;
import org.postgresql.core.ResultCursor;
import org.postgresql.core.ResultHandler;
import org.postgresql.core.ResultHandlerBase;
import org.postgresql.core.v3.BatchedQuery;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
@@ -22,12 +30,8 @@
/**
* Internal class, it is not a part of public API.
*/
public class BatchResultHandler implements ResultHandler {
public class BatchResultHandler extends ResultHandlerBase {
private PgStatement pgStatement;
private BatchUpdateException batchException = null;
// Last exception is tracked to avoid O(N) SQLException#setNextException just in case there
// will be lots of exceptions (e.g. all batch rows fail with constraint violation or so)
private SQLException lastException;
private int resultIndex = 0;

private final Query[] queries;
@@ -78,7 +82,7 @@ public void handleCommandStatus(String status, int updateCount, long insertOID)
resultIndex--;
// If exception thrown, no need to collect generated keys
// Note: some generated keys might be secured in generatedKeys
if (updateCount > 0 && (batchException == null || isAutoCommit())) {
if (updateCount > 0 && (getException() == null || isAutoCommit())) {
allGeneratedRows.add(latestGeneratedRows);
if (generatedKeys == null) {
generatedKeys = latestGeneratedKeysRs;
@@ -129,7 +133,7 @@ public void handleWarning(SQLWarning warning) {
}

public void handleError(SQLException newError) {
if (batchException == null) {
if (getException() == null) {
Arrays.fill(updateCounts, committedRows, updateCounts.length, Statement.EXECUTE_FAILED);
if (allGeneratedRows != null) {
allGeneratedRows.clear();
@@ -140,20 +144,19 @@ public void handleError(SQLException newError) {
queryString = queries[resultIndex].toString(parameterLists[resultIndex]);
}

batchException = new BatchUpdateException(
super.handleError(new BatchUpdateException(
GT.tr("Batch entry {0} {1} was aborted: {2} Call getNextException to see the cause.",
new Object[]{resultIndex, queryString, newError.getMessage()}),
newError.getSQLState(), uncompressUpdateCount());
lastException = batchException;
newError.getSQLState(), uncompressUpdateCount()));
}
resultIndex++;

lastException.setNextException(newError);
lastException = newError;
super.handleError(newError);
}

public void handleCompletion() throws SQLException {
updateGeneratedKeys();
SQLException batchException = getException();
if (batchException != null) {
if (isAutoCommit()) {
// Re-create batch exception since rows after exception might indeed succeed.
Oops, something went wrong.

0 comments on commit edcdccd

Please sign in to comment.