Skip to content

Commit

Permalink
Fix:save points causing server to run out of resources (#1409)
Browse files Browse the repository at this point in the history
* Savepoints created by autosavepoint logic will be cleaned up after 1000 successful transactions

* if setSavePoint is called on the connection any automatically created savepoints will be released and autosave disabled.
  • Loading branch information
davecramer committed Feb 24, 2019
1 parent 7ae1e83 commit af8f883
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 25 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ In addition to the standard connection parameters the driver supports a number o
| socketFactory | String | null | Specify a socket factory for socket creation |
| socketFactoryArg (deprecated) | String | null | Argument forwarded to constructor of SocketFactory class. |
| autosave | String | never | Specifies what the driver should do if a query fails, possible values: always, never, conservative |
| cleanupSavePoints | Boolean | false | In Autosave mode the driver sets a SAVEPOINT for every query. It is possible to exhaust the server shared buffers. Setting this to true will release each SAVEPOINT at the cost of an additional round trip. |
| preferQueryMode | String | extended | Specifies which mode is used to execute queries to database, possible values: extended, extendedForPrepared, extendedCacheEverything, simple |
| reWriteBatchedInserts | Boolean | false | Enable optimization to rewrite and collapse compatible INSERT statements that are batched. |

Expand Down
8 changes: 8 additions & 0 deletions docs/documentation/head/connect.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ Connection conn = DriverManager.getConnection(url);

The default is `never`

* **cleanupSavePoints** = boolean

Determines if the SAVEPOINT created in autosave mode is released prior to the statement. This is
done to avoid running out of shared buffers on the server in the case where 1000's of queries are
performed.

The default is 'false'

* **binaryTransferEnable** = String

A comma separated list of types to enable binary transfer. Either OID numbers or names.
Expand Down
5 changes: 5 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/PGProperty.java
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,11 @@ public enum PGProperty {
+ " like 'cached statement cannot change return type' or 'statement XXX is not valid' so JDBC driver rollsback and retries", false,
"always", "never", "conservative"),

/**
*
*/
CLEANUP_SAVEPOINTS("cleanupSavepoints", "false","Determine whether SAVEPOINTS used in AUTOSAVE will be released per query or not",
false, "true", "false"),
/**
* Configure optimization to enable batch insert re-writing.
*/
Expand Down
2 changes: 2 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/QueryExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public interface QueryExecutor extends TypeTransferModeRegistry {
*/
int QUERY_EXECUTE_AS_SIMPLE = 1024;

int MAX_SAVE_POINTS = 1000;

This comment has been minimized.

Copy link
@fb-datax

fb-datax Feb 25, 2019

I believe this variable isn't necessary anymore

This comment has been minimized.

Copy link
@davecramer

davecramer Feb 25, 2019

Author Member

Pretty sure I use that instead of 1000


/**
* Execute a Query, passing results to a provided ResultHandler.
*
Expand Down
50 changes: 47 additions & 3 deletions pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public QueryExecutorImpl(PGStream pgStream, String user, String database,
super(pgStream, user, database, cancelSignalTimeout, info);

this.allowEncodingChanges = PGProperty.ALLOW_ENCODING_CHANGES.getBoolean(info);
this.cleanupSavePoints = PGProperty.CLEANUP_SAVEPOINTS.getBoolean(info);
this.replicationProtocol = new V3ReplicationProtocol(this, pgStream);
readStartupMessages();
}
Expand Down Expand Up @@ -338,6 +339,9 @@ public synchronized void execute(Query query, ParameterList parameters, ResultHa

try {
handler.handleCompletion();
if (cleanupSavePoints) {
releaseSavePoint(autosave, flags);
}
} catch (SQLException e) {
rollbackIfRequired(autosave, e);
}
Expand All @@ -347,23 +351,47 @@ private boolean sendAutomaticSavepoint(Query query, int flags) throws IOExceptio
if (((flags & QueryExecutor.QUERY_SUPPRESS_BEGIN) == 0
|| getTransactionState() == TransactionState.OPEN)
&& query != restoreToAutoSave
&& !query.getNativeSql().equalsIgnoreCase("COMMIT")
&& getAutoSave() != AutoSave.NEVER
// If query has no resulting fields, it cannot fail with 'cached plan must not change result type'
// thus no need to set a safepoint before such query
// thus no need to set a savepoint before such query
&& (getAutoSave() == AutoSave.ALWAYS
// If CompositeQuery is observed, just assume it might fail and set the savepoint
|| !(query instanceof SimpleQuery)
|| ((SimpleQuery) query).getFields() != null)) {

/*
create a different SAVEPOINT the first time so that all subsequent SAVEPOINTS can be released
easily. There have been reports of server resources running out if there are too many
SAVEPOINTS.
*/
sendOneQuery(autoSaveQuery, SimpleQuery.NO_PARAMETERS, 1, 0,
QUERY_NO_RESULTS | QUERY_NO_METADATA
// PostgreSQL does not support bind, exec, simple, sync message flow,
// so we force autosavepoint to use simple if the main query is using simple
| QUERY_EXECUTE_AS_SIMPLE);

return true;

}
return false;
}

private void releaseSavePoint(boolean autosave, int flags) throws SQLException {
if ( autosave
&& getAutoSave() == AutoSave.ALWAYS
&& getTransactionState() == TransactionState.OPEN) {
try {
sendOneQuery(releaseAutoSave, SimpleQuery.NO_PARAMETERS, 1, 0,
QUERY_NO_RESULTS | QUERY_NO_METADATA
| QUERY_EXECUTE_AS_SIMPLE);

} catch (IOException ex) {
throw new PSQLException(GT.tr("Error releasing savepoint"), PSQLState.IO_ERROR);
}
}
}

private void rollbackIfRequired(boolean autosave, SQLException e) throws SQLException {
if (autosave
&& getTransactionState() == TransactionState.FAILED
Expand Down Expand Up @@ -2095,8 +2123,10 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
// For simple 'Q' queries, executeQueue is cleared via ReadyForQuery message
}

if (currentQuery == autoSaveQuery) {
// ignore "SAVEPOINT" status from autosave query
// we want to make sure we do not add any results from these queries to the result set
if (currentQuery == autoSaveQuery
|| currentQuery == releaseAutoSave) {
// ignore "SAVEPOINT" or RELEASE SAVEPOINT status from autosave query
break;
}

Expand Down Expand Up @@ -2714,6 +2744,7 @@ public boolean getIntegerDateTimes() {

private long nextUniqueID = 1;
private final boolean allowEncodingChanges;
private final boolean cleanupSavePoints;


/**
Expand All @@ -2737,13 +2768,26 @@ public boolean getIntegerDateTimes() {
SqlCommand.createStatementTypeInfo(SqlCommandType.BLANK)
), null, false);


private final SimpleQuery autoSaveQuery =
new SimpleQuery(
new NativeQuery("SAVEPOINT PGJDBC_AUTOSAVE", new int[0], false, SqlCommand.BLANK),
null, false);


private final SimpleQuery releaseAutoSave =
new SimpleQuery(
new NativeQuery("RELEASE SAVEPOINT PGJDBC_AUTOSAVE", new int[0], false, SqlCommand.BLANK),
null, false);

/*
In autosave mode we use this query to roll back errored transactions
*/
private final SimpleQuery restoreToAutoSave =
new SimpleQuery(
new NativeQuery("ROLLBACK TO SAVEPOINT PGJDBC_AUTOSAVE", new int[0], false, SqlCommand.BLANK),
null, false);



}
16 changes: 16 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/ds/common/BaseDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -1312,6 +1312,22 @@ public void setAutosave(AutoSave autoSave) {
PGProperty.AUTOSAVE.set(properties, autoSave.value());
}

/**
* see PGProperty#CLEANUP_SAVEPOINTS
* @return boolean indicating property set
*/
public boolean getCleanupSavepoints() {
return PGProperty.CLEANUP_SAVEPOINTS.getBoolean(properties);
}

/**
* see PGProperty#CLEANUP_SAVEPOINTS
* @param cleanupSavepoints will cleanup savepoints after a successful transaction
*/
public void setCleanupSavepoints(boolean cleanupSavepoints) {
PGProperty.CLEANUP_SAVEPOINTS.set(properties, cleanupSavepoints);
}

/**
* @return boolean indicating property is enabled or not.
* @see PGProperty#REWRITE_BATCHED_INSERTS
Expand Down
1 change: 1 addition & 0 deletions pgjdbc/src/main/java/org/postgresql/jdbc/PgStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ public boolean executeWithFlags(int flags) throws SQLException {
}

protected void closeForNextExecution() throws SQLException {

// Every statement execution clears any previous warnings.
clearWarnings();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
public class AutoRollbackTestSuite extends BaseTest4 {
private static final AtomicInteger counter = new AtomicInteger();

private enum CleanSavePoint {
TRUE,
FALSE
}

private enum FailMode {
/**
* Executes "select 1/0" and causes transaction failure (if autocommit=no).
Expand Down Expand Up @@ -111,6 +116,7 @@ private enum ContinueMode {
}

private final AutoSave autoSave;
private final CleanSavePoint cleanSavePoint;
private final AutoCommit autoCommit;
private final FailMode failMode;
private final ContinueMode continueMode;
Expand All @@ -119,10 +125,11 @@ private enum ContinueMode {
private final TestStatement testSql;
private final ReturnColumns cols;

public AutoRollbackTestSuite(AutoSave autoSave, AutoCommit autoCommit,
public AutoRollbackTestSuite(AutoSave autoSave, CleanSavePoint cleanSavePoint, AutoCommit autoCommit,
FailMode failMode, ContinueMode continueMode, boolean flushCacheOnDeallocate,
boolean trans, TestStatement testSql, ReturnColumns cols) {
this.autoSave = autoSave;
this.cleanSavePoint = cleanSavePoint;
this.autoCommit = autoCommit;
this.failMode = failMode;
this.continueMode = continueMode;
Expand Down Expand Up @@ -170,35 +177,39 @@ protected void updateProperties(Properties props) {
}


@Parameterized.Parameters(name = "{index}: autorollback(autoSave={0}, autoCommit={1}, failMode={2}, continueMode={3}, flushOnDeallocate={4}, hastransaction={5}, sql={6}, columns={7})")
@Parameterized.Parameters(name = "{index}: autorollback(autoSave={0}, cleanSavePoint={1}, autoCommit={2}, failMode={3}, continueMode={4}, flushOnDeallocate={5}, hastransaction={6}, sql={7}, columns={8})")
public static Iterable<Object[]> data() {
Collection<Object[]> ids = new ArrayList<Object[]>();
boolean[] booleans = new boolean[] {true, false};
for (AutoSave autoSave : AutoSave.values()) {
for (AutoCommit autoCommit : AutoCommit.values()) {
for (FailMode failMode : FailMode.values()) {
// ERROR: DISCARD ALL cannot run inside a transaction block
if (failMode == FailMode.DISCARD && autoCommit == AutoCommit.NO) {
continue;
}
for (ContinueMode continueMode : ContinueMode.values()) {
if (failMode == FailMode.ALTER && continueMode != ContinueMode.SELECT) {
for (CleanSavePoint cleanSavePoint:CleanSavePoint.values()) {
for (AutoCommit autoCommit : AutoCommit.values()) {
for (FailMode failMode : FailMode.values()) {
// ERROR: DISCARD ALL cannot run inside a transaction block
if (failMode == FailMode.DISCARD && autoCommit == AutoCommit.NO) {
continue;
}
for (boolean flushCacheOnDeallocate : booleans) {
if (!(flushCacheOnDeallocate || DEALLOCATES.contains(failMode))) {
for (ContinueMode continueMode : ContinueMode.values()) {
if (failMode == FailMode.ALTER && continueMode != ContinueMode.SELECT) {
continue;
}

for (boolean trans : new boolean[]{true, false}) {
// continueMode would commit, and autoCommit=YES would commit,
// so it does not make sense to test trans=true for those cases
if (trans && (continueMode == ContinueMode.COMMIT || autoCommit != AutoCommit.NO)) {
for (boolean flushCacheOnDeallocate : booleans) {
if (!(flushCacheOnDeallocate || DEALLOCATES.contains(failMode))) {
continue;
}
for (TestStatement statement : TestStatement.values()) {
for (ReturnColumns columns : ReturnColumns.values()) {
ids.add(new Object[]{autoSave, autoCommit, failMode, continueMode, flushCacheOnDeallocate, trans, statement, columns});

for (boolean trans : new boolean[]{true, false}) {
// continueMode would commit, and autoCommit=YES would commit,
// so it does not make sense to test trans=true for those cases
if (trans && (continueMode == ContinueMode.COMMIT
|| autoCommit != AutoCommit.NO)) {
continue;
}
for (TestStatement statement : TestStatement.values()) {
for (ReturnColumns columns : ReturnColumns.values()) {
ids.add(new Object[]{autoSave, cleanSavePoint, autoCommit, failMode, continueMode,
flushCacheOnDeallocate, trans, statement, columns});
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,11 @@ public void testDataSourceProperties() throws Exception {
assertTrue("Missing getter/setter for property [" + property.getName() + "] in ["
+ BaseDataSource.class + "]", propertyDescriptors.containsKey(property.getName()));

assertNotNull("Not getter for property [" + property.getName() + "] in ["
assertNotNull("No getter for property [" + property.getName() + "] in ["
+ BaseDataSource.class + "]",
propertyDescriptors.get(property.getName()).getReadMethod());

assertNotNull("Not setter for property [" + property.getName() + "] in ["
assertNotNull("No setter for property [" + property.getName() + "] in ["
+ BaseDataSource.class + "]",
propertyDescriptors.get(property.getName()).getWriteMethod());
}
Expand Down

0 comments on commit af8f883

Please sign in to comment.