Skip to content

Commit

Permalink
feat: synchronize statement executions (e.g. avoid deadlock when Conn…
Browse files Browse the repository at this point in the history
…ection.isValid is executed from concurrent threads)

This does not bring full thread safety to the driver, however,
it should reduce the number of issues when connections and statements
are accidentally executed concurrently in several threads.

fixes #1768
  • Loading branch information
vlsi committed Jul 27, 2022
1 parent fd31a06 commit 4673fd2
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 120 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ test: update JUnit to 5.8.2
chore: added Gradle Wrapper Validation for verifying gradle-wrapper.jar
chore: added "permissions: contents: read" for GitHub Actions to avoid unintentional modifications by the CI
chore: support building pgjdbc with Java 17
feat: synchronize statement executions (e.g. avoid deadlock when Connection.isValid is executed from concurrent threads)

### Fixed

Expand Down
124 changes: 62 additions & 62 deletions pgjdbc/src/main/java/org/postgresql/jdbc/PgCallableStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,79 +80,79 @@ public int executeUpdate() throws SQLException {

@Override
public boolean executeWithFlags(int flags) throws SQLException {
boolean hasResultSet = super.executeWithFlags(flags);
int[] functionReturnType = this.functionReturnType;
if (!isFunction || !returnTypeSet || functionReturnType == null) {
return hasResultSet;
}

// If we are executing and there are out parameters
// callable statement function set the return data
if (!hasResultSet) {
throw new PSQLException(GT.tr("A CallableStatement was executed with nothing returned."),
PSQLState.NO_DATA);
}
synchronized (this) {
boolean hasResultSet = super.executeWithFlags(flags);
int[] functionReturnType = this.functionReturnType;
if (!isFunction || !returnTypeSet || functionReturnType == null) {
return hasResultSet;
}

ResultSet rs = castNonNull(getResultSet());
if (!rs.next()) {
throw new PSQLException(GT.tr("A CallableStatement was executed with nothing returned."),
PSQLState.NO_DATA);
}
// If we are executing and there are out parameters
// callable statement function set the return data
if (!hasResultSet) {
throw new PSQLException(GT.tr("A CallableStatement was executed with nothing returned."),
PSQLState.NO_DATA);
}

// figure out how many columns
int cols = rs.getMetaData().getColumnCount();
ResultSet rs = castNonNull(getResultSet());
if (!rs.next()) {
throw new PSQLException(GT.tr("A CallableStatement was executed with nothing returned."),
PSQLState.NO_DATA);
}

int outParameterCount = preparedParameters.getOutParameterCount();
// figure out how many columns
int cols = rs.getMetaData().getColumnCount();

if (cols != outParameterCount) {
throw new PSQLException(
GT.tr("A CallableStatement was executed with an invalid number of parameters"),
PSQLState.SYNTAX_ERROR);
}
int outParameterCount = preparedParameters.getOutParameterCount();

// reset last result fetched (for wasNull)
lastIndex = 0;

// allocate enough space for all possible parameters without regard to in/out
@Nullable Object[] callResult = new Object[preparedParameters.getParameterCount() + 1];
this.callResult = callResult;

// move them into the result set
for (int i = 0, j = 0; i < cols; i++, j++) {
// find the next out parameter, the assumption is that the functionReturnType
// array will be initialized with 0 and only out parameters will have values
// other than 0. 0 is the value for java.sql.Types.NULL, which should not
// conflict
while (j < functionReturnType.length && functionReturnType[j] == 0) {
j++;
if (cols != outParameterCount) {
throw new PSQLException(
GT.tr("A CallableStatement was executed with an invalid number of parameters"),
PSQLState.SYNTAX_ERROR);
}

callResult[j] = rs.getObject(i + 1);
int columnType = rs.getMetaData().getColumnType(i + 1);
// reset last result fetched (for wasNull)
lastIndex = 0;

// allocate enough space for all possible parameters without regard to in/out
@Nullable Object[] callResult = new Object[preparedParameters.getParameterCount() + 1];
this.callResult = callResult;

// move them into the result set
for (int i = 0, j = 0; i < cols; i++, j++) {
// find the next out parameter, the assumption is that the functionReturnType
// array will be initialized with 0 and only out parameters will have values
// other than 0. 0 is the value for java.sql.Types.NULL, which should not
// conflict
while (j < functionReturnType.length && functionReturnType[j] == 0) {
j++;
}

if (columnType != functionReturnType[j]) {
// this is here for the sole purpose of passing the cts
if (columnType == Types.DOUBLE && functionReturnType[j] == Types.REAL) {
// return it as a float
Object result = callResult[j];
if (result != null) {
callResult[j] = ((Double) result).floatValue();
callResult[j] = rs.getObject(i + 1);
int columnType = rs.getMetaData().getColumnType(i + 1);

if (columnType != functionReturnType[j]) {
// this is here for the sole purpose of passing the cts
if (columnType == Types.DOUBLE && functionReturnType[j] == Types.REAL) {
// return it as a float
Object result = callResult[j];
if (result != null) {
callResult[j] = ((Double) result).floatValue();
}
} else if (columnType == Types.REF_CURSOR && functionReturnType[j] == Types.OTHER) {
// For backwards compatibility reasons we support that ref cursors can be
// registered with both Types.OTHER and Types.REF_CURSOR so we allow
// this specific mismatch
} else {
throw new PSQLException(GT.tr(
"A CallableStatement function was executed and the out parameter {0} was of type {1} however type {2} was registered.",
i + 1, "java.sql.Types=" + columnType, "java.sql.Types=" + functionReturnType[j]),
PSQLState.DATA_TYPE_MISMATCH);
}
} else if (columnType == Types.REF_CURSOR && functionReturnType[j] == Types.OTHER) {
// For backwards compatibility reasons we support that ref cursors can be
// registered with both Types.OTHER and Types.REF_CURSOR so we allow
// this specific mismatch
} else {
throw new PSQLException(GT.tr(
"A CallableStatement function was executed and the out parameter {0} was of type {1} however type {2} was registered.",
i + 1, "java.sql.Types=" + columnType, "java.sql.Types=" + functionReturnType[j]),
PSQLState.DATA_TYPE_MISMATCH);
}
}

}
rs.close();
synchronized (this) {
}
rs.close();
result = null;
}
return false;
Expand Down
9 changes: 7 additions & 2 deletions pgjdbc/src/main/java/org/postgresql/jdbc/PgConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -1454,8 +1454,13 @@ public boolean isValid(int timeout) throws SQLException {
statement.execute("IDENTIFY_SYSTEM");
statement.close();
} else {
if (checkConnectionQuery == null) {
checkConnectionQuery = prepareStatement("");
PreparedStatement checkConnectionQuery;
synchronized (this) {
checkConnectionQuery = this.checkConnectionQuery;
if (checkConnectionQuery == null) {
checkConnectionQuery = prepareStatement("");
this.checkConnectionQuery = checkConnectionQuery;
}
}
checkConnectionQuery.executeUpdate();
}
Expand Down
42 changes: 25 additions & 17 deletions pgjdbc/src/main/java/org/postgresql/jdbc/PgPreparedStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ public ResultSet executeQuery(String sql) throws SQLException {
*/
@Override
public ResultSet executeQuery() throws SQLException {
if (!executeWithFlags(0)) {
throw new PSQLException(GT.tr("No results were returned by the query."), PSQLState.NO_DATA);
}
synchronized (this) {
if (!executeWithFlags(0)) {
throw new PSQLException(GT.tr("No results were returned by the query."), PSQLState.NO_DATA);
}

return getSingleResultSet();
return getSingleResultSet();
}
}

@Override
Expand All @@ -146,16 +148,20 @@ public int executeUpdate(String sql) throws SQLException {

@Override
public int executeUpdate() throws SQLException {
executeWithFlags(QueryExecutor.QUERY_NO_RESULTS);
checkNoResultUpdate();
return getUpdateCount();
synchronized (this) {
executeWithFlags(QueryExecutor.QUERY_NO_RESULTS);
checkNoResultUpdate();
return getUpdateCount();
}
}

@Override
public long executeLargeUpdate() throws SQLException {
executeWithFlags(QueryExecutor.QUERY_NO_RESULTS);
checkNoResultUpdate();
return getLargeUpdateCount();
synchronized (this) {
executeWithFlags(QueryExecutor.QUERY_NO_RESULTS);
checkNoResultUpdate();
return getLargeUpdateCount();
}
}

@Override
Expand All @@ -167,20 +173,22 @@ public boolean execute(String sql) throws SQLException {

@Override
public boolean execute() throws SQLException {
return executeWithFlags(0);
synchronized (this) {
return executeWithFlags(0);
}
}

public boolean executeWithFlags(int flags) throws SQLException {
try {
checkClosed();
synchronized (this) {
checkClosed();

if (connection.getPreferQueryMode() == PreferQueryMode.SIMPLE) {
flags |= QueryExecutor.QUERY_EXECUTE_AS_SIMPLE;
}
if (connection.getPreferQueryMode() == PreferQueryMode.SIMPLE) {
flags |= QueryExecutor.QUERY_EXECUTE_AS_SIMPLE;
}

execute(preparedQuery, preparedParameters, flags);
execute(preparedQuery, preparedParameters, flags);

synchronized (this) {
checkClosed();
return (result != null && result.getResultSet() != null);
}
Expand Down
92 changes: 53 additions & 39 deletions pgjdbc/src/main/java/org/postgresql/jdbc/PgStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,13 @@ public void handleWarning(SQLWarning warning) {

@Override
public ResultSet executeQuery(String sql) throws SQLException {
if (!executeWithFlags(sql, 0)) {
throw new PSQLException(GT.tr("No results were returned by the query."), PSQLState.NO_DATA);
}
synchronized (this) {
if (!executeWithFlags(sql, 0)) {
throw new PSQLException(GT.tr("No results were returned by the query."), PSQLState.NO_DATA);
}

return getSingleResultSet();
return getSingleResultSet();
}
}

protected ResultSet getSingleResultSet() throws SQLException {
Expand All @@ -262,9 +264,11 @@ protected ResultSet getSingleResultSet() throws SQLException {

@Override
public int executeUpdate(String sql) throws SQLException {
executeWithFlags(sql, QueryExecutor.QUERY_NO_RESULTS);
checkNoResultUpdate();
return getUpdateCount();
synchronized (this) {
executeWithFlags(sql, QueryExecutor.QUERY_NO_RESULTS);
checkNoResultUpdate();
return getUpdateCount();
}
}

protected final void checkNoResultUpdate() throws SQLException {
Expand Down Expand Up @@ -404,17 +408,19 @@ protected boolean isOneShotQuery(@Nullable CachedQuery cachedQuery) {
protected final void execute(CachedQuery cachedQuery,
@Nullable ParameterList queryParameters, int flags)
throws SQLException {
try {
executeInternal(cachedQuery, queryParameters, flags);
} catch (SQLException e) {
// Don't retry composite queries as it might get partially executed
if (cachedQuery.query.getSubqueries() != null
|| !connection.getQueryExecutor().willHealOnRetry(e)) {
throw e;
synchronized (this) {
try {
executeInternal(cachedQuery, queryParameters, flags);
} catch (SQLException e) {
// Don't retry composite queries as it might get partially executed
if (cachedQuery.query.getSubqueries() != null
|| !connection.getQueryExecutor().willHealOnRetry(e)) {
throw e;
}
cachedQuery.query.close();
// Execute the query one more time
executeInternal(cachedQuery, queryParameters, flags);
}
cachedQuery.query.close();
// Execute the query one more time
executeInternal(cachedQuery, queryParameters, flags);
}
}

Expand Down Expand Up @@ -1090,9 +1096,11 @@ public long[] executeLargeBatch() throws SQLException {

@Override
public long executeLargeUpdate(String sql) throws SQLException {
executeWithFlags(sql, QueryExecutor.QUERY_NO_RESULTS);
checkNoResultUpdate();
return getLargeUpdateCount();
synchronized (this) {
executeWithFlags(sql, QueryExecutor.QUERY_NO_RESULTS);
checkNoResultUpdate();
return getLargeUpdateCount();
}
}

@Override
Expand All @@ -1116,15 +1124,17 @@ public long executeLargeUpdate(String sql, int[] columnIndexes) throws SQLExcept

@Override
public long executeLargeUpdate(String sql, String @Nullable [] columnNames) throws SQLException {
if (columnNames != null && columnNames.length == 0) {
return executeLargeUpdate(sql);
}
synchronized (this) {
if (columnNames != null && columnNames.length == 0) {
return executeLargeUpdate(sql);
}

wantsGeneratedKeysOnce = true;
if (!executeCachedSql(sql, 0, columnNames)) {
// no resultset returned. What's a pity!
wantsGeneratedKeysOnce = true;
if (!executeCachedSql(sql, 0, columnNames)) {
// no resultset returned. What's a pity!
}
return getLargeUpdateCount();
}
return getLargeUpdateCount();
}

public boolean isClosed() throws SQLException {
Expand Down Expand Up @@ -1240,15 +1250,17 @@ public int executeUpdate(String sql, int[] columnIndexes) throws SQLException {
}

public int executeUpdate(String sql, String @Nullable [] columnNames) throws SQLException {
if (columnNames != null && columnNames.length == 0) {
return executeUpdate(sql);
}
synchronized (this) {
if (columnNames != null && columnNames.length == 0) {
return executeUpdate(sql);
}

wantsGeneratedKeysOnce = true;
if (!executeCachedSql(sql, 0, columnNames)) {
// no resultset returned. What's a pity!
wantsGeneratedKeysOnce = true;
if (!executeCachedSql(sql, 0, columnNames)) {
// no resultset returned. What's a pity!
}
return getUpdateCount();
}
return getUpdateCount();
}

public boolean execute(String sql, int autoGeneratedKeys) throws SQLException {
Expand All @@ -1268,12 +1280,14 @@ public boolean execute(String sql, int @Nullable [] columnIndexes) throws SQLExc
}

public boolean execute(String sql, String @Nullable [] columnNames) throws SQLException {
if (columnNames != null && columnNames.length == 0) {
return execute(sql);
}
synchronized (this) {
if (columnNames != null && columnNames.length == 0) {
return execute(sql);
}

wantsGeneratedKeysOnce = true;
return executeCachedSql(sql, 0, columnNames);
wantsGeneratedKeysOnce = true;
return executeCachedSql(sql, 0, columnNames);
}
}

public int getResultSetHoldability() throws SQLException {
Expand Down
Loading

0 comments on commit 4673fd2

Please sign in to comment.