Skip to content

Commit

Permalink
feat: implementation of adaptive fetching (#1707)
Browse files Browse the repository at this point in the history
* feat: implementation of adaptive fetching

Implementation of adaptive fetching logic during process of fetching result set.
Change adds three new properties used with adaptive fetching:
- adaptiveFetch - use to enable/disable work of adaptive fetching
- adaptiveFetchMinimum - use to define minimum size computed during adaptive fetching
- adaptiveFetchMaximum - use to define maximum size computed during adaptive fetching
Adaptive fetching is used to compute fetch size to fully use size defined by maxResultBuffer.
Computing is made by dividing maxResultBuffer size by max row result size noticed so far.
Each query have separate adaptive fetch size computed, but same queries have it shared.
If adaptive fetch is turned on, first fetch is going to be made with defaultRowFetchSize,
next fetching of resultSet will be made with computed adaptive fetch size.
If adaptive fetch is turned on during fetching, then first fetching made by ResultSet will
be made with defaultRowFetchSize, next will use computed adaptive fetch size.
Additionally, resultSet have new method - getLastUsedFetchSize to get fetch size used during
last fetching. If adaptive fetching is turned on, then getFetchSize will return lastUsedFetchSize.
Property adaptiveFetch need properties defaultRowFetchSize and maxResultBuffer to work.

Commit to add:
- unit tests for AdaptiveFetchQueryMonitoring class;
- some style fixes (removing extra spaces, simpler code in setMaxRowSize method in PGStream class).

Commit to add:
- integration tests for adaptive fetching process;
- change casting to from primitive types to Wrapper classes in AdaptiveFetchQueryMonitoringTest.

Change in AdaptiveFetchSizeTest:
- is using PreparedStatement during tests now;
- added assume to check if preferQueryMode is not Simple;
- new method to open connection and creating table to remove code repetitiveness.

Commit to add explanation about adaptive fetching process in AdaptiveFetchQueryMonitoring class.
  • Loading branch information
adrklos committed Feb 24, 2020
1 parent 630692d commit 5bb5f40
Show file tree
Hide file tree
Showing 18 changed files with 2,100 additions and 17 deletions.
3 changes: 3 additions & 0 deletions README.md
Expand Up @@ -153,6 +153,9 @@ In addition to the standard connection parameters the driver supports a number o
| reWriteBatchedInserts | Boolean | false | Enable optimization to rewrite and collapse compatible INSERT statements that are batched. |
| escapeSyntaxCallMode | String | select | Specifies how JDBC escape call syntax is transformed into underlying SQL (CALL/SELECT), for invoking procedures or functions (requires server version >= 11), possible values: select, callIfNoReturn, call |
| maxResultBuffer | String | null | Specifies size of result buffer in bytes, which can't be exceeded during reading result set. Can be specified as particular size (i.e. "100", "200M" "2G") or as percent of max heap memory (i.e. "10p", "20pct", "50percent") |
| adaptiveFetch | Boolean | false | Specifies if number of rows fetched in ResultSet by each fetch iteration should be dynamic. Number of rows will be calculated by dividing maxResultBuffer size into max row size observed so far. Requires declaring maxResultBuffer and defaultRowFetchSize for first iteration.
| adaptiveFetchMinimum | Integer | 0 | Specifies minimum number of rows, which can be calculated by adaptiveFetch. Number of rows used by adaptiveFetch cannot go below this value.
| adaptiveFetchMaximum | Integer | -1 | Specifies maximum number of rows, which can be calculated by adaptiveFetch. Number of rows used by adaptiveFetch cannot go above this value. Any negative number set as adaptiveFetchMaximum is used by adaptiveFetch as infinity number of rows.

## Contributing
For information on how to contribute to the project see the [Contributing Guidelines](CONTRIBUTING.md)
Expand Down
25 changes: 25 additions & 0 deletions docs/documentation/head/connect.md
Expand Up @@ -508,6 +508,31 @@ Connection conn = DriverManager.getConnection(url);

By default, maxResultBuffer is not set (is null), what means that reading of results gonna be performed without limits.

* **adaptiveFetch** = boolean

Specifies if number of rows, fetched in `ResultSet` by one fetch with trip to the database, should be dynamic.
Using dynamic number of rows, computed by adaptive fetch, allows to use most of the buffer declared in `maxResultBuffer` property.
Number of rows would be calculated by dividing `maxResultBuffer` size into max row size observed so far, rounded down.
First fetch will have number of rows declared in `defaultRowFetchSize`.
Number of rows can be limited by `adaptiveFetchMinimum` and `adaptiveFetchMaximum`.
Requires declaring of `maxResultBuffer` and `defaultRowFetchSize` to work.

By default, adaptiveFetch is false.

* **adaptiveFetchMinimum** = int

Specifies the lowest number of rows which can be calculated by `adaptiveFetch`.
Requires `adaptiveFetch` set to true to work.

By default, minimum of rows calculated by `adaptiveFetch` is 0.

* **adaptiveFetchMaximum** = int

Specifies the highest number of rows which can be calculated by `adaptiveFetch`.
Requires `adaptiveFetch` set to true to work.
By default, maximum of rows calculated by `adaptiveFetch` is -1, which is understood as infinite.
<a name="unix sockets"></a>
## Unix sockets

Expand Down
15 changes: 15 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/PGConnection.java
Expand Up @@ -313,4 +313,19 @@ public interface PGConnection {
* @since 42.2.6
*/
String getParameterStatus(String parameterName);

/**
* Method to turn on/off adaptive fetch for connection. Existing statements and resultSets won't
* be affected by change here.
*
* @param adaptiveFetch desired state of adaptive fetch.
*/
void setAdaptiveFetch(boolean adaptiveFetch);

/**
* Method to get state of adaptive fetch for connection.
*
* @return state of adaptive fetch (turned on or off)
*/
boolean getAdaptiveFetch();
}
31 changes: 31 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/PGProperty.java
Expand Up @@ -22,6 +22,37 @@
*/
public enum PGProperty {

/**
* Specifies if number of rows fetched in ResultSet by one fetch with trip to the database should
* be dynamic. Number of rows would be calculated by dividing maxResultBuffer size into max row
* size observed so far, rounded down. First fetch will have number of rows declared in
* defaultRowFetchSize. Number of rows can be limited by adaptiveFetchMinimum and
* adaptiveFetchMaximum. Requires declaring of maxResultBuffer and defaultRowFetchSize to work.
* Default value is false.
*/
ADAPTIVE_FETCH(
"adaptiveFetch",
"false",
"Specifies if number of rows fetched in ResultSet should be adaptive to maxResultBuffer and max row size."),

/**
* Specifies the highest number of rows which can be calculated by adaptiveFetch. Requires
* adaptiveFetch set to true to work. Default value is -1 (used as infinity).
*/
ADAPTIVE_FETCH_MAXIMUM(
"adaptiveFetchMaximum",
"-1",
"Specifies minimum number of rows used by adaptive fetch."),

/**
* Specifies the lowest number of rows which can be calculated by adaptiveFetch. Requires
* adaptiveFetch set to true to work. Default value is 0.
*/
ADAPTIVE_FETCH_MINIMUM(
"adaptiveFetchMinimum",
"0",
"Specifies maximum number of rows used by adaptive fetch."),

/**
* When using the V3 protocol the driver monitors changes in certain server configuration
* parameters that should not be touched by end users. The {@code client_encoding} setting is set
Expand Down
15 changes: 15 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/PGStatement.java
Expand Up @@ -79,4 +79,19 @@ public interface PGStatement {
* @since build 302
*/
int getPrepareThreshold();

/**
* Method to turn on/off adaptive fetch for statement. Existing resultSets won't be affected by
* change here.
*
* @param adaptiveFetch desired state of adaptive fetch.
*/
void setAdaptiveFetch(boolean adaptiveFetch);

/**
* Method to get state of adaptive fetch for statement.
*
* @return state of adaptive fetch (turned on or off)
*/
boolean getAdaptiveFetch();
}
44 changes: 44 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/PGStream.java
Expand Up @@ -58,6 +58,8 @@ public class PGStream implements Closeable, Flushable {
private long maxResultBuffer = -1;
private long resultBufferByteCount = 0;

private int maxRowSize = -1;

/**
* Constructor: Connect to the PostgreSQL back end and return a stream connection.
*
Expand Down Expand Up @@ -477,6 +479,8 @@ public Tuple receiveTupleV3() throws IOException, OutOfMemoryError, SQLException
int nf = receiveInteger2();
//size = messageSize - 4 bytes of message size - 2 bytes of field count - 4 bytes for each column length
int dataToReadSize = messageSize - 4 - 2 - 4 * nf;
setMaxRowSize(dataToReadSize);

byte[][] answer = new byte[nf][];

increaseByteCounter(dataToReadSize);
Expand Down Expand Up @@ -643,6 +647,46 @@ public void setMaxResultBuffer(String value) throws PSQLException {
maxResultBuffer = PGPropertyMaxResultBufferParser.parseProperty(value);
}

/**
* Method to get MaxResultBuffer from PGStream.
*
* @return size of MaxResultBuffer
*/
public long getMaxResultBuffer() {
return maxResultBuffer;
}

/**
* The idea behind this method is to keep in maxRowSize the size of biggest read data row. As
* there may be many data rows send after each other for a query, then value in maxRowSize would
* contain value noticed so far, because next data rows and their sizes are not read for that
* moment. We want it increasing, because the size of the biggest among data rows will be used
* during computing new adaptive fetch size for the query.
*
* @param rowSize new value to be set as maxRowSize
*/
public void setMaxRowSize(int rowSize) {
if (rowSize > maxRowSize) {
maxRowSize = rowSize;
}
}

/**
* Method to get actual max row size noticed so far.
*
* @return value of max row size
*/
public int getMaxRowSize() {
return maxRowSize;
}

/**
* Method to clear value of max row size noticed so far.
*/
public void clearMaxRowSize() {
maxRowSize = -1;
}

/**
* Method to clear count of byte buffer.
*/
Expand Down
85 changes: 84 additions & 1 deletion pgjdbc/src/main/java/org/postgresql/core/QueryExecutor.java
Expand Up @@ -145,6 +145,25 @@ public interface QueryExecutor extends TypeTransferModeRegistry {
void execute(Query query, ParameterList parameters, ResultHandler handler, int maxRows,
int fetchSize, int flags) throws SQLException;

/**
* Execute a Query with adaptive fetch, passing results to a provided ResultHandler.
*
* @param query the query to execute; must be a query returned from calling
* {@link #wrap(List)} on this QueryExecutor object.
* @param parameters the parameters for the query. Must be non-<code>null</code> if the query
* takes parameters. Must be a parameter object returned by
* {@link org.postgresql.core.Query#createParameterList()}.
* @param handler a ResultHandler responsible for handling results generated by this query
* @param maxRows the maximum number of rows to retrieve
* @param fetchSize if QUERY_FORWARD_CURSOR is set, the preferred number of rows to retrieve
* before suspending
* @param flags a combination of QUERY_* flags indicating how to handle the query.
* @param adaptiveFetch state of adaptiveFetch to use during execution
* @throws SQLException if query execution fails
*/
void execute(Query query, ParameterList parameters, ResultHandler handler, int maxRows,
int fetchSize, int flags, boolean adaptiveFetch) throws SQLException;

/**
* Execute several Query, passing results to a provided ResultHandler.
*
Expand All @@ -165,15 +184,37 @@ void execute(Query query, ParameterList parameters, ResultHandler handler, int m
void execute(Query[] queries, ParameterList[] parameterLists, BatchResultHandler handler, int maxRows,
int fetchSize, int flags) throws SQLException;

/**
* Execute several Query with adaptive fetch, passing results to a provided ResultHandler.
*
* @param queries the queries to execute; each must be a query returned from calling
* {@link #wrap(List)} on this QueryExecutor object.
* @param parameterLists the parameter lists for the queries. The parameter lists correspond 1:1
* to the queries passed in the <code>queries</code> array. Each must be non-
* <code>null</code> if the corresponding query takes parameters, and must be a parameter
* object returned by {@link org.postgresql.core.Query#createParameterList()} created by
* the corresponding query.
* @param handler a ResultHandler responsible for handling results generated by this query
* @param maxRows the maximum number of rows to retrieve
* @param fetchSize if QUERY_FORWARD_CURSOR is set, the preferred number of rows to retrieve
* before suspending
* @param flags a combination of QUERY_* flags indicating how to handle the query.
* @param adaptiveFetch state of adaptiveFetch to use during execution
* @throws SQLException if query execution fails
*/
void execute(Query[] queries, ParameterList[] parameterLists, BatchResultHandler handler, int maxRows,
int fetchSize, int flags, boolean adaptiveFetch) throws SQLException;

/**
* Fetch additional rows from a cursor.
*
* @param cursor the cursor to fetch from
* @param handler the handler to feed results to
* @param fetchSize the preferred number of rows to retrieve before suspending
* @param adaptiveFetch state of adaptiveFetch to use during fetching
* @throws SQLException if query execution fails
*/
void fetch(ResultCursor cursor, ResultHandler handler, int fetchSize) throws SQLException;
void fetch(ResultCursor cursor, ResultHandler handler, int fetchSize, boolean adaptiveFetch) throws SQLException;

/**
* Create an unparameterized Query object suitable for execution by this QueryExecutor. The
Expand Down Expand Up @@ -466,4 +507,46 @@ Object createQueryKey(String sql, boolean escapeProcessing, boolean isParameteri
Map<String,String> getParameterStatuses();

String getParameterStatus(String parameterName);

/**
* Method to get fetch size computed by adaptive fetch size for given query.
*
* @param adaptiveFetch state of adaptive fetch, which should be used during retrieving
* @param cursor Cursor used by resultSet, containing query, have to be able to cast to
* Portal class.
* @return fetch size computed by adaptive fetch size for given query passed inside cursor
*/
int getAdaptiveFetchSize(boolean adaptiveFetch, ResultCursor cursor);

/**
* Method to get state of adaptive fetch inside QueryExecutor.
*
* @return state of adaptive fetch inside QueryExecutor
*/
boolean getAdaptiveFetch();

/**
* Method to set state of adaptive fetch inside QueryExecutor.
*
* @param adaptiveFetch desired state of adaptive fetch
*/
void setAdaptiveFetch(boolean adaptiveFetch);

/**
* Method to add query to adaptive fetch monitoring inside QueryExecutor.
*
* @param adaptiveFetch state of adaptive fetch used during adding query
* @param cursor Cursor used by resultSet, containing query, have to be able to cast to
* Portal class.
*/
void addQueryToAdaptiveFetchMonitoring(boolean adaptiveFetch, ResultCursor cursor);

/**
* Method to remove query from adaptive fetch monitoring inside QueryExecutor
*
* @param adaptiveFetch state of adaptive fetch used during removing query
* @param cursor Cursor used by resultSet, containing query, have to be able to cast to
* Portal class.
*/
void removeQueryFromAdaptiveFetchMonitoring(boolean adaptiveFetch, ResultCursor cursor);
}

0 comments on commit 5bb5f40

Please sign in to comment.