Skip to content
Permalink
Browse files

feat: read only transactions (#1252)

* feat: read only transactions

If autocommit is set to false, read only will be set on begin
transaction.
If autocommit is true, it will continue to be managed at session level.
The queries to change session have been cached to avoid re-parsing each
time readonly value changes.

#1228
#848

* feat: read only transactions

* checkstyle and hamcrest test import

* add connection property with 3 options to control read only behavior

* fix missing property methods on BaseDataSource 
* avoid redundant static modifier

* more loosely couple read only hints to backend

* return default read only mode from data source.

* avoid case conversion
  • Loading branch information
bokken authored and davecramer committed Nov 25, 2019
1 parent 69320c7 commit 050797934a8a9c0ce2dff068eba14931988370ca
@@ -10,6 +10,7 @@
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;

import java.sql.Connection;
import java.sql.DriverPropertyInfo;
import java.util.Properties;

@@ -443,7 +444,19 @@
+ "to the database specified in the dbname parameter, "
+ "which will allow the connection to be used for logical replication "
+ "from that database. "
+ "(backend >= 9.4)");
+ "(backend >= 9.4)"),

/**
* Connection parameter to control behavior when
* {@link Connection#setReadOnly(boolean)} is set to {@code true}.
*/
READ_ONLY_MODE("readOnlyMode", "transaction",
"Controls the behavior when a connection is set to be read only, one of 'ignore', 'transaction', or 'always' "
+ "When 'ignore', setting readOnly has no effect. "
+ "When 'transaction' setting readOnly to 'true' will cause transactions to BEGIN READ ONLY if autocommit is 'false'. "
+ "When 'always' setting readOnly to 'true' will set the session to READ ONLY if autoCommit is 'true' "
+ "and the transaction to BEGIN READ ONLY if autocommit is 'false'.",
false, "ignore", "transaction", "always");

private final String name;
private final String defaultValue;
@@ -6,6 +6,7 @@
package org.postgresql.core;

import org.postgresql.PGConnection;
import org.postgresql.PGProperty;
import org.postgresql.jdbc.FieldMetadata;
import org.postgresql.jdbc.TimestampUtils;
import org.postgresql.util.LruCache;
@@ -202,4 +203,13 @@ CachedQuery createQuery(String sql, boolean escapeProcessing, boolean isParamete
* @param flushCacheOnDeallocate true if statement cache should be reset when "deallocate/discard" message observed
*/
void setFlushCacheOnDeallocate(boolean flushCacheOnDeallocate);

/**
* Indicates if statements to backend should be hinted as read only.
*
* @return Indication if hints to backend (such as when transaction begins)
* should be read only.
* @see PGProperty#READ_ONLY_MODE
*/
boolean hintReadOnly();
}
@@ -121,6 +121,11 @@

int MAX_SAVE_POINTS = 1000;

/**
* Flag indicating that when beginning a transaction, it should be read only.
*/
int QUERY_READ_ONLY_HINT = 2048;

/**
* Execute a Query, passing results to a provided ResultHandler.
*
@@ -371,9 +371,7 @@ private boolean sendAutomaticSavepoint(Query query, int flags) throws IOExceptio
// PostgreSQL does not support bind, exec, simple, sync message flow,
// so we force autosavepoint to use simple if the main query is using simple
| QUERY_EXECUTE_AS_SIMPLE);

return true;

}
return false;
}
@@ -548,7 +546,9 @@ private ResultHandler sendQueryPreamble(final ResultHandler delegateHandler, int

beginFlags = updateQueryMode(beginFlags);

sendOneQuery(beginTransactionQuery, SimpleQuery.NO_PARAMETERS, 0, 0, beginFlags);
final SimpleQuery beginQuery = ((flags & QueryExecutor.QUERY_READ_ONLY_HINT) == 0) ? beginTransactionQuery : beginReadOnlyTransactionQuery;

sendOneQuery(beginQuery, SimpleQuery.NO_PARAMETERS, 0, 0, beginFlags);

// Insert a handler that intercepts the BEGIN.
return new ResultHandlerDelegate(delegateHandler) {
@@ -1145,8 +1145,8 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)
try {
if (op == null) {
throw new PSQLException(GT
.tr("Received CommandComplete ''{0}'' without an active copy operation", status),
PSQLState.OBJECT_NOT_IN_STATE);
.tr("Received CommandComplete ''{0}'' without an active copy operation", status),
PSQLState.OBJECT_NOT_IN_STATE);
}
op.handleCommandStatus(status);
} catch (SQLException se) {
@@ -1171,7 +1171,7 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)

if (op != null) {
error = new PSQLException(GT.tr("Got CopyInResponse from server during an active {0}",
op.getClass().getName()), PSQLState.OBJECT_NOT_IN_STATE);
op.getClass().getName()), PSQLState.OBJECT_NOT_IN_STATE);
}

op = new CopyInImpl();
@@ -1184,8 +1184,7 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)
LOGGER.log(Level.FINEST, " <=BE CopyOutResponse");

if (op != null) {
error =
new PSQLException(GT.tr("Got CopyOutResponse from server during an active {0}",
error = new PSQLException(GT.tr("Got CopyOutResponse from server during an active {0}",
op.getClass().getName()), PSQLState.OBJECT_NOT_IN_STATE);
}

@@ -1199,8 +1198,7 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)
LOGGER.log(Level.FINEST, " <=BE CopyBothResponse");

if (op != null) {
error =
new PSQLException(GT.tr("Got CopyBothResponse from server during an active {0}",
error = new PSQLException(GT.tr("Got CopyBothResponse from server during an active {0}",
op.getClass().getName()), PSQLState.OBJECT_NOT_IN_STATE);
}

@@ -1220,11 +1218,11 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)
byte[] buf = pgStream.receive(len);
if (op == null) {
error = new PSQLException(GT.tr("Got CopyData without an active copy operation"),
PSQLState.OBJECT_NOT_IN_STATE);
PSQLState.OBJECT_NOT_IN_STATE);
} else if (!(op instanceof CopyOut)) {
error = new PSQLException(
GT.tr("Unexpected copydata from server for {0}", op.getClass().getName()),
PSQLState.COMMUNICATION_ERROR);
GT.tr("Unexpected copydata from server for {0}", op.getClass().getName()),
PSQLState.COMMUNICATION_ERROR);
} else {
op.handleCopydata(buf);
}
@@ -1242,7 +1240,7 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)

if (!(op instanceof CopyOut)) {
error = new PSQLException("Got CopyDone while not copying from server",
PSQLState.OBJECT_NOT_IN_STATE);
PSQLState.OBJECT_NOT_IN_STATE);
}

// keep receiving since we expect a CommandComplete
@@ -1283,7 +1281,7 @@ CopyOperationImpl processCopyResults(CopyOperationImpl op, boolean block)

default:
throw new IOException(
GT.tr("Unexpected packet type during copy: {0}", Integer.toString(c)));
GT.tr("Unexpected packet type during copy: {0}", Integer.toString(c)));
}

// Collect errors into a neat chain for completeness
@@ -2792,6 +2790,11 @@ public boolean getIntegerDateTimes() {
new NativeQuery("BEGIN", new int[0], false, SqlCommand.BLANK),
null, false);

private final SimpleQuery beginReadOnlyTransactionQuery =
new SimpleQuery(
new NativeQuery("BEGIN READ ONLY", new int[0], false, SqlCommand.BLANK),
null, false);

private final SimpleQuery emptyQuery =
new SimpleQuery(
new NativeQuery("", new int[0], false,
@@ -2815,7 +2818,4 @@ public boolean getIntegerDateTimes() {
new SimpleQuery(
new NativeQuery("ROLLBACK TO SAVEPOINT PGJDBC_AUTOSAVE", new int[0], false, SqlCommand.BLANK),
null, false);



}
@@ -912,6 +912,22 @@ public void setReadOnly(boolean readOnly) {
PGProperty.READ_ONLY.set(properties, readOnly);
}

/**
* @return The behavior when set read only
* @see PGProperty#READ_ONLY_MODE
*/
public String getReadOnlyMode() {
return PGProperty.READ_ONLY_MODE.get(properties);
}

/**
* @param mode the behavior when set read only
* @see PGProperty#READ_ONLY_MODE
*/
public void setReadOnlyMode(String mode) {
PGProperty.READ_ONLY_MODE.set(properties, mode);
}

/**
* @return true if driver should log unclosed connections
* @see PGProperty#LOG_UNCLOSED_CONNECTIONS
@@ -81,6 +81,12 @@
private static final SQLPermission SQL_PERMISSION_ABORT = new SQLPermission("callAbort");
private static final SQLPermission SQL_PERMISSION_NETWORK_TIMEOUT = new SQLPermission("setNetworkTimeout");

private enum ReadOnlyBehavior {
ignore,
transaction,
always;
}

//
// Data initialized on construction:
//
@@ -89,6 +95,8 @@
/* URL we were created via */
private final String creatingURL;

private final ReadOnlyBehavior readOnlyBehavior;

private Throwable openStackTrace;

/* Actual network handler */
@@ -99,6 +107,10 @@
/* Query that runs ROLLBACK */
private final Query rollbackQuery;

private final CachedQuery setSessionReadOnly;

private final CachedQuery setSessionNotReadOnly;

private final TypeInfo typeCache;

private boolean disableColumnSanitiser = false;
@@ -184,6 +196,8 @@ public PgConnection(HostSpec[] hostSpecs,

this.creatingURL = url;

this.readOnlyBehavior = getReadOnlyBehavior(PGProperty.READ_ONLY_MODE.get(info));

setDefaultFetchSize(PGProperty.DEFAULT_ROW_FETCH_SIZE.getInt(info));

setPrepareThreshold(PGProperty.PREPARE_THRESHOLD.getInt(info));
@@ -199,6 +213,9 @@ public PgConnection(HostSpec[] hostSpecs,
LOGGER.log(Level.WARNING, "Unsupported Server Version: {0}", queryExecutor.getServerVersion());
}

setSessionReadOnly = createQuery("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY", false, true);
setSessionNotReadOnly = createQuery("SET SESSION CHARACTERISTICS AS TRANSACTION READ WRITE", false, true);

// Set read-only early if requested
if (PGProperty.READ_ONLY.getBoolean(info)) {
setReadOnly(true);
@@ -292,6 +309,18 @@ public TimeZone get() {
replicationConnection = PGProperty.REPLICATION.get(info) != null;
}

private static ReadOnlyBehavior getReadOnlyBehavior(String property) {
try {
return ReadOnlyBehavior.valueOf(property);
} catch (IllegalArgumentException e) {
try {
return ReadOnlyBehavior.valueOf(property.toLowerCase(Locale.US));
} catch (IllegalArgumentException e2) {
return ReadOnlyBehavior.transaction;
}
}
}

private static Set<Integer> getBinaryOids(Properties info) throws PSQLException {
boolean binaryTransfer = PGProperty.BINARY_TRANSFER.getBoolean(info);
// Formats that currently have binary protocol support
@@ -453,6 +482,24 @@ public void execSQLUpdate(String s) throws SQLException {
stmt.close();
}

void execSQLUpdate(CachedQuery query) throws SQLException {
BaseStatement stmt = (BaseStatement) createStatement();
if (stmt.executeWithFlags(query, QueryExecutor.QUERY_NO_METADATA | QueryExecutor.QUERY_NO_RESULTS
| QueryExecutor.QUERY_SUPPRESS_BEGIN)) {
throw new PSQLException(GT.tr("A result was returned when none was expected."),
PSQLState.TOO_MANY_RESULTS);
}

// Transfer warnings to the connection, since the user never
// has a chance to see the statement itself.
SQLWarning warnings = stmt.getWarnings();
if (warnings != null) {
addWarning(warnings);
}

stmt.close();
}

/**
* <p>In SQL, a result table can be retrieved through a cursor that is named. The current row of a
* result can be updated or deleted using a positioned update/delete statement that references the
@@ -705,10 +752,8 @@ public void setReadOnly(boolean readOnly) throws SQLException {
PSQLState.ACTIVE_SQL_TRANSACTION);
}

if (readOnly != this.readOnly) {
String readOnlySql
= "SET SESSION CHARACTERISTICS AS TRANSACTION " + (readOnly ? "READ ONLY" : "READ WRITE");
execSQLUpdate(readOnlySql); // nb: no BEGIN triggered.
if (readOnly != this.readOnly && autoCommit && this.readOnlyBehavior == ReadOnlyBehavior.always) {
execSQLUpdate(readOnly ? setSessionReadOnly : setSessionNotReadOnly);
}

this.readOnly = readOnly;
@@ -721,6 +766,11 @@ public boolean isReadOnly() throws SQLException {
return readOnly;
}

@Override
public boolean hintReadOnly() {
return readOnly && readOnlyBehavior != ReadOnlyBehavior.ignore;
}

@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
checkClosed();
@@ -733,6 +783,21 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
commit();
}

// if the connection is read only, we need to make sure session settings are
// correct when autocommit status changed
if (this.readOnly && readOnlyBehavior == ReadOnlyBehavior.always) {
// if we are turning on autocommit, we need to set session
// to read only
if (autoCommit) {
this.autoCommit = true;
execSQLUpdate(setSessionReadOnly);
} else {
// if we are turning auto commit off, we need to
// disable session
execSQLUpdate(setSessionNotReadOnly);
}
}

this.autoCommit = autoCommit;
LOGGER.log(Level.FINE, " setAutoCommit = {0}", autoCommit);
}
@@ -408,6 +408,9 @@ private void executeInternal(CachedQuery cachedQuery, ParameterList queryParamet
if (connection.getAutoCommit()) {
flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
}
if (connection.hintReadOnly()) {
flags |= QueryExecutor.QUERY_READ_ONLY_HINT;
}

// updateable result sets do not yet support binary updates
if (concurrency != ResultSet.CONCUR_READ_ONLY) {
@@ -810,6 +813,9 @@ private BatchResultHandler internalExecuteBatch() throws SQLException {
if (connection.getAutoCommit()) {
flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
}
if (connection.hintReadOnly()) {
flags |= QueryExecutor.QUERY_READ_ONLY_HINT;
}

BatchResultHandler handler;
handler = createBatchHandler(queries, parameterLists);

0 comments on commit 0507979

Please sign in to comment.
You can’t perform that action at this time.