Skip to content

Commit

Permalink
Merge pull request #204 from 2ndQuadrant/batch-returning-support
Browse files Browse the repository at this point in the history
Add limited support for returning generated columns in batches

See individual commits for notes on the limitations. This isn't a perfect solution to issue #194 and #195, but it'll work well for most people.
  • Loading branch information
ringerc committed Dec 1, 2014
2 parents 5600ae0 + 1471bd9 commit 83c8c3b
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 37 deletions.
3 changes: 3 additions & 0 deletions org/postgresql/core/QueryExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ public interface QueryExecutor {
/**
* Flag to disable batch execution when we expect results (generated keys)
* from a statement.
*
* @deprecated in PgJDBC 9.4 as we now auto-size batches.
*/
@Deprecated
static int QUERY_DISALLOW_BATCHING = 128;

/**
Expand Down
132 changes: 99 additions & 33 deletions org/postgresql/core/v3/QueryExecutorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,10 @@ public synchronized void execute(Query query,
{
handler = sendQueryPreamble(handler, flags);
ErrorTrackingResultHandler trackingHandler = new ErrorTrackingResultHandler(handler);
queryCount = 0;
sendQuery((V3Query)query, (V3ParameterList)parameters, maxRows, fetchSize, flags, trackingHandler);
sendSync();
processResults(handler, flags);
estimatedReceiveBufferBytes = 0;
}
catch (PGBindException se)
{
Expand All @@ -272,6 +272,7 @@ public synchronized void execute(Query query,
//
sendSync();
processResults(handler, flags);
estimatedReceiveBufferBytes = 0;
handler.handleError(new PSQLException(GT.tr("Unable to bind parameter values for statement."), PSQLState.INVALID_PARAMETER_VALUE, se.getIOException()));
}
}
Expand Down Expand Up @@ -304,21 +305,35 @@ public synchronized void execute(Query query,
// for the server to read some more data, and the server is blocked on write()
// waiting for the driver to read some more data.
//
// To avoid this, we guess at how many queries we can send before the server ->
// driver stream's buffer is full (MAX_BUFFERED_QUERIES). This is the point where
// the server blocks on write and stops reading data. If we reach this point, we
// force a Sync message and read pending data from the server until ReadyForQuery,
// then go back to writing more queries unless we saw an error.
// To avoid this, we guess at how much response data we can request from the
// server before the server -> driver stream's buffer is full (MAX_BUFFERED_RECV_BYTES).
// This is the point where the server blocks on write and stops reading data. If we
// reach this point, we force a Sync message and read pending data from the server
// until ReadyForQuery, then go back to writing more queries unless we saw an error.
//
// This is not 100% reliable -- it's only done in the batch-query case and only
// at a reasonably high level (per query, not per message), and it's only an estimate
// -- so it might break. To do it correctly in all cases would seem to require a
// separate send or receive thread as we can only do the Sync-and-read-results
// operation at particular points, and also as we don't really know how much data
// the server is sending.

// Assume 64k server->client buffering and 250 bytes response per query (conservative).
private static final int MAX_BUFFERED_QUERIES = (64000 / 250);
//
// Our message size estimation is coarse, and disregards asynchronous
// notifications, warnings/info/debug messages, etc, so the repsonse size may be
// quite different from the 250 bytes assumed here even for queries that don't
// return data.
//
// See github issue #194 and #195 .
//
// Assume 64k server->client buffering, which is extremely conservative. A typical
// system will have 200kb or more of buffers for its receive buffers, and the sending
// system will typically have the same on the send side, giving us 400kb or to work
// with. (We could check Java's receive buffer size, but prefer to assume a very
// conservative buffer instead, and we don't know how big the server's send
// buffer is.)
//
private static final int MAX_BUFFERED_RECV_BYTES = 64000;
private static final int NODATA_QUERY_RESPONSE_SIZE_BYTES = 250;

// Helper handler that tracks error status.
private static class ErrorTrackingResultHandler implements ResultHandler {
Expand Down Expand Up @@ -384,7 +399,7 @@ public synchronized void execute(Query[] queries,
{
handler = sendQueryPreamble(handler, flags);
ErrorTrackingResultHandler trackingHandler = new ErrorTrackingResultHandler(handler);
queryCount = 0;
estimatedReceiveBufferBytes = 0;

for (int i = 0; i < queries.length; ++i)
{
Expand All @@ -403,6 +418,7 @@ public synchronized void execute(Query[] queries,
{
sendSync();
processResults(handler, flags);
estimatedReceiveBufferBytes = 0;
}
}
catch (IOException e)
Expand Down Expand Up @@ -549,6 +565,7 @@ public void handleCompletion() throws SQLException{
sendOneQuery(beginTransactionQuery, SimpleQuery.NO_PARAMETERS, 0, 0, QueryExecutor.QUERY_NO_METADATA);
sendSync();
processResults(handler, 0);
estimatedReceiveBufferBytes = 0;
}
catch (IOException ioe)
{
Expand Down Expand Up @@ -1112,25 +1129,75 @@ else if (value.equals("off"))
return op;
}

/*
* To prevent client/server protocol deadlocks, we try to manage the
* estimated recv buffer size and force a sync +flush and process results if
* we think it might be getting too full.
*
* See the comments above MAX_BUFFERED_RECV_BYTES's declaration for details.
*/
private void flushIfDeadlockRisk(Query query, boolean disallowBatching,
ErrorTrackingResultHandler trackingHandler, final int flags)
throws IOException {
// Assume all statements need at least this much reply buffer space,
// plus params
estimatedReceiveBufferBytes += NODATA_QUERY_RESPONSE_SIZE_BYTES;

SimpleQuery sq = (SimpleQuery) query;
if (sq.isStatementDescribed()) {
/*
* Estimate the response size of the fields and add it to the
* expected response size.
*
* It's impossible for us to estimate the rowcount. We'll assume one
* row, as that's the common case for batches and we're leaving
* plenty of breathing room in this approach. It's still not
* deadlock-proof though; see pgjdbc github issues #194 and #195.
*/
int maxResultRowSize = sq.getMaxResultRowSize();
if (maxResultRowSize >= 0) {
estimatedReceiveBufferBytes += maxResultRowSize;
} else {
logger.debug("Couldn't estimate result size or result size unbounded, "
+ "disabling batching for this query.");
disallowBatching = true;
}
} else {
/*
* We only describe a statement if we're expecting results from it,
* so it's legal to batch unprepared statements. We'll abort later
* if we get any uresults from them where none are expected. For now
* all we can do is hope the user told us the truth and assume that
* NODATA_QUERY_RESPONSE_SIZE_BYTES is enough to cover it.
*/
}

if (disallowBatching
|| estimatedReceiveBufferBytes >= MAX_BUFFERED_RECV_BYTES) {
logger.debug("Forcing Sync, receive buffer full or batching disallowed");
sendSync();
processResults(trackingHandler, flags);
estimatedReceiveBufferBytes = 0;
}

}

/*
* Send a query to the backend.
*/
private void sendQuery(V3Query query, V3ParameterList parameters, int maxRows, int fetchSize, int flags, ErrorTrackingResultHandler trackingHandler) throws IOException, SQLException {
// Now the query itself.
SimpleQuery[] subqueries = query.getSubqueries();
SimpleParameterList[] subparams = parameters.getSubparams();

// We know this is deprecated, but still respect it in case anyone's using it.
// PgJDBC its self no longer does.
@SuppressWarnings("deprecation")
boolean disallowBatching = (flags & QueryExecutor.QUERY_DISALLOW_BATCHING) != 0;

if (subqueries == null)
{
++queryCount;
if (disallowBatching || queryCount >= MAX_BUFFERED_QUERIES)
{
sendSync();
processResults(trackingHandler, flags);

queryCount = 0;
}
flushIfDeadlockRisk(query, disallowBatching, trackingHandler, flags);

// If we saw errors, don't send anything more.
if (!trackingHandler.hasErrors())
Expand All @@ -1140,18 +1207,12 @@ private void sendQuery(V3Query query, V3ParameterList parameters, int maxRows, i
{
for (int i = 0; i < subqueries.length; ++i)
{
++queryCount;
if (disallowBatching || queryCount >= MAX_BUFFERED_QUERIES)
{
sendSync();
processResults(trackingHandler, flags);
final SimpleQuery subquery = subqueries[i];
flushIfDeadlockRisk(subquery, disallowBatching, trackingHandler, flags);

// If we saw errors, don't send anything more.
if (trackingHandler.hasErrors())
break;

queryCount = 0;
}
// If we saw errors, don't send anything more.
if (trackingHandler.hasErrors())
break;

// In the situation where parameters is already
// NO_PARAMETERS it cannot know the correct
Expand All @@ -1164,7 +1225,7 @@ private void sendQuery(V3Query query, V3ParameterList parameters, int maxRows, i
{
subparam = subparams[i];
}
sendOneQuery(subqueries[i], subparam, maxRows, fetchSize, flags);
sendOneQuery(subquery, subparam, maxRows, fetchSize, flags);
}
}
}
Expand Down Expand Up @@ -2128,6 +2189,7 @@ public void handleCompletion() throws SQLException{
sendSync();

processResults(handler, 0);
estimatedReceiveBufferBytes = 0;
}
catch (IOException e)
{
Expand Down Expand Up @@ -2290,10 +2352,14 @@ private void receiveRFQ() throws IOException {
private final boolean allowEncodingChanges;

/**
* The number of queries executed so far without processing any results.
* Used to avoid deadlocks, see MAX_BUFFERED_QUERIES.
* The estimated server response size since we last consumed the input stream
* from the server, in bytes.
*
* Starts at zero, reset by every Sync message. Mainly used for batches.
*
* Used to avoid deadlocks, see MAX_BUFFERED_RECV_BYTES.
*/
private int queryCount;
private int estimatedReceiveBufferBytes = 0;

private final SimpleQuery beginTransactionQuery = new SimpleQuery(new String[] { "BEGIN" }, null);

Expand Down
51 changes: 49 additions & 2 deletions org/postgresql/core/v3/SimpleQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.postgresql.core.v3;

import org.postgresql.core.*;

import java.lang.ref.PhantomReference;

/**
Expand Down Expand Up @@ -63,6 +64,46 @@ public SimpleQuery[] getSubqueries() {
return null;
}

/*
* Return maximum size in bytes that each result row from this query may
* return. Mainly used for batches that return results.
*
* Results are cached until/unless the query is re-described.
*
* @return Max size of result data in bytes according to returned fields, 0
* if no results, -1 if result is unbounded.
*
* @throws IllegalStateException if the query is not described
*/
public int getMaxResultRowSize() {
if (cachedMaxResultRowSize != null) {
return cachedMaxResultRowSize.intValue();
}
if (!this.statementDescribed) {
throw new IllegalStateException(
"Cannot estimate result row size on a statement that is not described");
}
int maxResultRowSize = 0;
if (fields != null) {
for (int i = 0; i < fields.length; i++) {
Field f = fields[i];
final int fieldLength = f.getLength();
if (fieldLength < 1 || fieldLength >= 65535) {
/*
* Field length unknown or large; we can't make any safe
* estimates about the result size, so we have to fall back to
* sending queries individually.
*/
maxResultRowSize = -1;
break;
}
maxResultRowSize += fieldLength;
}
}
cachedMaxResultRowSize = maxResultRowSize;
return maxResultRowSize;
}

//
// Implementation guts
//
Expand Down Expand Up @@ -125,6 +166,7 @@ byte[] getEncodedStatementName() {
*/
void setFields(Field[] fields) {
this.fields = fields;
this.cachedMaxResultRowSize = null;
}

/**
Expand All @@ -143,6 +185,7 @@ boolean isPortalDescribed() {
}
void setPortalDescribed(boolean portalDescribed) {
this.portalDescribed = portalDescribed;
this.cachedMaxResultRowSize = null;
}

// Have we sent a Describe Statement message for this query yet?
Expand All @@ -152,6 +195,7 @@ public boolean isStatementDescribed() {
}
void setStatementDescribed(boolean statementDescribed) {
this.statementDescribed = statementDescribed;
this.cachedMaxResultRowSize = null;
}

public boolean isEmpty()
Expand Down Expand Up @@ -180,22 +224,25 @@ void unprepare() {
fields = null;
portalDescribed = false;
statementDescribed = false;
cachedMaxResultRowSize = null;
}

private final String[] fragments;
private final ProtocolConnectionImpl protoConnection;
private String statementName;
private byte[] encodedStatementName;
/**
* The stored fields from previous query of a prepared statement,
* if executed before. Always null for non-prepared statements.
* The stored fields from previous execution or describe of a prepared
* statement. Always null for non-prepared statements.
*/
private Field[] fields;
private boolean portalDescribed;
private boolean statementDescribed;
private PhantomReference cleanupRef;
private int[] preparedTypes;

private Integer cachedMaxResultRowSize;

final static SimpleParameterList NO_PARAMETERS = new SimpleParameterList(0, null);
}

Expand Down
22 changes: 20 additions & 2 deletions org/postgresql/jdbc2/AbstractJdbc2Statement.java
Original file line number Diff line number Diff line change
Expand Up @@ -2855,12 +2855,21 @@ public int[] executeBatch() throws SQLException
batchStatements.clear();
batchParameters.clear();

int flags;
int flags = 0;

// Force a Describe before any execution? We need to do this if we're going
// to send anything dependent on the Desribe results, e.g. binary parameters.
boolean preDescribe = false;

if (wantsGeneratedKeysAlways) {
flags = QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS | QueryExecutor.QUERY_DISALLOW_BATCHING;
/*
* This batch will return generated keys, tell the executor to
* expect result rows. We force a Describe later, too.
*/
flags = QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS;
} else {
// If a batch hasn't specified that it wants generated keys, using the appropriate
// Connection.createStatement(...) interfaces, disallow any result set.
flags = QueryExecutor.QUERY_NO_RESULTS;
}

Expand All @@ -2872,13 +2881,22 @@ public int[] executeBatch() throws SQLException
if (m_prepareThreshold == 0 || m_useCount < m_prepareThreshold) {
flags |= QueryExecutor.QUERY_ONESHOT;
} else {
// If a batch requests generated keys and isn't already described,
// force a Describe of the query before proceeding. That way we can
// determine the appropriate size of each batch by estimating the
// maximum data returned. Without that, we don't know how many queries
// we'll be able to queue up before we risk a deadlock.
// (see v3.QueryExecutorImpl's MAX_BUFFERED_RECV_BYTES)
preDescribe = wantsGeneratedKeysAlways && !queries[0].isStatementDescribed();
}

if (connection.getAutoCommit())
flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;

if (preDescribe || forceBinaryTransfers) {
// Do a client-server round trip, parsing and describing the query so we
// can determine its result types for use in binary parameters, batch sizing,
// etc.
int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY;
StatementResultHandler handler2 = new StatementResultHandler();
connection.getQueryExecutor().execute(queries[0], parameterLists[0], handler2, 0, 0, flags2);
Expand Down

0 comments on commit 83c8c3b

Please sign in to comment.