Permalink
Browse files

fix: prevent statement hang in case close() called when query is in p…

…rogress

Note: Statement#close() is still not thread-safe, however it is much more robust with the fix

fixes #1022
  • Loading branch information...
vlsi committed Nov 26, 2017
1 parent 8ba5841 commit 04c5dbb5058008a8ddad0194156af9819595c315
@@ -182,24 +182,10 @@ protected boolean isOneShotQuery(CachedQuery cachedQuery) {
}
@Override
public void close() throws SQLException {
if (isClosed) {
return;
}
public void closeImpl() throws SQLException {
if (preparedQuery != null) {
// See #368. We need to prevent closing the same statement twice
// Otherwise we might "release" a query that someone else is already using
// In other words, client does .close() as usual, however cleanup thread might fail to observe
// isClosed=true
synchronized (preparedQuery) {
if (!isClosed) {
((PgConnection) connection).releaseQuery(preparedQuery);
}
}
((PgConnection) connection).releaseQuery(preparedQuery);
}
super.close();
}
public void setNull(int parameterIndex, int sqlType) throws SQLException {
@@ -441,9 +441,7 @@ public void setCursorName(String name) throws SQLException {
// No-op.
}
// This is intentionally non-volatile to avoid performance hit in isClosed checks
// see #close()
protected boolean isClosed = false;
private volatile boolean isClosed = false;
public int getUpdateCount() throws SQLException {
checkClosed();
@@ -595,17 +593,27 @@ public void clearWarnings() throws SQLException {
*
* {@inheritDoc}
*/
public void close() throws SQLException {
public final void close() throws SQLException {
// closing an already closed Statement is a no-op.
if (isClosed) {
return;
synchronized (this) {
if (isClosed) {
return;
}
isClosed = true;
}
cleanupTimer();
cancel();
closeForNextExecution();
isClosed = true;
closeImpl();
}
/**
* This is guaranteed to be called exactly once even in case of concurrent {@link #close()} calls.
* @throws SQLException in case of error
*/
protected void closeImpl() throws SQLException {
}
/*
@@ -646,7 +654,7 @@ public boolean isUseServerPrepare() {
}
protected void checkClosed() throws SQLException {
if (isClosed) {
if (isClosed()) {
throw new PSQLException(GT.tr("This statement has been closed."),
PSQLState.OBJECT_NOT_IN_STATE);
}
@@ -805,18 +813,20 @@ protected BatchResultHandler createBatchHandler(Query[] queries,
}
public void cancel() throws SQLException {
if (!STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.CANCELING)) {
if (statementState == StatementCancelState.IDLE) {
return;
}
if (!STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY,
StatementCancelState.CANCELING)) {
// Not in query, there's nothing to cancel
return;
}
try {
// Synchronize on connection to avoid spinning in killTimerTask
synchronized (connection) {
// Synchronize on connection to avoid spinning in killTimerTask
synchronized (connection) {
try {
connection.cancelQuery();
}
} finally {
STATE_UPDATER.set(this, StatementCancelState.CANCELLED);
synchronized (connection) {
} finally {
STATE_UPDATER.set(this, StatementCancelState.CANCELLED);
connection.notifyAll(); // wake-up killTimerTask
}
}
@@ -925,8 +935,10 @@ private void killTimerTask() {
// "timeout error"
// We wait till state becomes "cancelled"
boolean interrupted = false;
while (!STATE_UPDATER.compareAndSet(this, StatementCancelState.CANCELLED, StatementCancelState.IDLE)) {
synchronized (connection) {
synchronized (connection) {
// state check is performed under synchronized so it detects "cancelled" state faster
// In other words, it prevents unnecessary ".wait()" call
while (!STATE_UPDATER.compareAndSet(this, StatementCancelState.CANCELLED, StatementCancelState.IDLE)) {
try {
// Note: wait timeout here is irrelevant since synchronized(connection) would block until
// .cancel finishes
@@ -87,6 +87,7 @@
OBJECT_NOT_IN_STATE("55000"),
OBJECT_IN_USE("55006"),
QUERY_CANCELED("57014"),
SYSTEM_ERROR("60000"),
IO_ERROR("58030"),
@@ -18,6 +18,7 @@
import org.postgresql.util.PSQLState;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -27,7 +28,9 @@
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -792,6 +795,118 @@ public void testMultipleCancels() throws Exception {
assertEquals(0, sharedTimer.getRefCount());
}
@Test(timeout = 10000)
public void testCloseInProgressStatement() throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
final Connection outerLockCon = TestUtil.openDB();
outerLockCon.setAutoCommit(false);
//Acquire an exclusive lock so we can block the notice generating statement
outerLockCon.createStatement().execute("LOCK TABLE test_lock IN ACCESS EXCLUSIVE MODE;");
try {
con.createStatement().execute("SET SESSION client_min_messages = 'NOTICE'");
con.createStatement()
.execute("CREATE OR REPLACE FUNCTION notify_then_sleep() RETURNS VOID AS "
+ "$BODY$ "
+ "BEGIN "
+ "RAISE NOTICE 'start';"
+ "LOCK TABLE test_lock IN ACCESS EXCLUSIVE MODE;"
+ "END "
+ "$BODY$ "
+ "LANGUAGE plpgsql;");
int cancels = 0;
for (int i = 0; i < 100; i++) {
final Statement st = con.createStatement();
executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
long start = System.currentTimeMillis();
while (st.getWarnings() == null) {
long dt = System.currentTimeMillis() - start;
if (dt > 10000) {
throw new IllegalStateException("Expected to receive a notice within 10 seconds");
}
}
st.close();
return null;
}
});
st.setQueryTimeout(120);
try {
st.execute("select notify_then_sleep()");
} catch (SQLException e) {
Assert.assertEquals(
"Query is expected to be cancelled via st.close(), got " + e.getMessage(),
PSQLState.QUERY_CANCELED.getState(),
e.getSQLState()
);
cancels++;
} finally {
TestUtil.closeQuietly(st);
}
}
Assert.assertNotEquals("At least one QUERY_CANCELED state is expected", 0, cancels);
} finally {
executor.shutdown();
TestUtil.closeQuietly(outerLockCon);
}
}
@Test(timeout = 20000)
public void testFastCloses() throws SQLException {
ExecutorService executor = Executors.newSingleThreadExecutor();
con.createStatement().execute("SET SESSION client_min_messages = 'NOTICE'");
con.createStatement()
.execute("CREATE OR REPLACE FUNCTION notify_then_sleep() RETURNS VOID AS "
+ "$BODY$ "
+ "BEGIN "
+ "RAISE NOTICE 'start';"
+ "EXECUTE pg_sleep(1);" // Note: timeout value does not matter here, we just test if test crashes or locks somehow
+ "END "
+ "$BODY$ "
+ "LANGUAGE plpgsql;");
Map<String, Integer> cnt = new HashMap<String, Integer>();
final Random rnd = new Random();
for (int i = 0; i < 1000; i++) {
final Statement st = con.createStatement();
executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
int s = rnd.nextInt(10);
if (s > 8) {
Thread.sleep(s - 9);
}
st.close();
return null;
}
});
ResultSet rs = null;
String sqlState = "0";
try {
rs = st.executeQuery("select 1");
// Acceptable
} catch (SQLException e) {
sqlState = e.getSQLState();
if (!PSQLState.OBJECT_NOT_IN_STATE.getState().equals(sqlState)
&& !PSQLState.QUERY_CANCELED.getState().equals(sqlState)) {
Assert.assertEquals(
"Query is expected to be cancelled via st.close(), got " + e.getMessage(),
PSQLState.QUERY_CANCELED.getState(),
e.getSQLState()
);
}
} finally {
TestUtil.closeQuietly(rs);
TestUtil.closeQuietly(st);
}
Integer val = cnt.get(sqlState);
val = (val == null ? 0 : val) + 1;
cnt.put(sqlState, val);
}
System.out.println("[testFastCloses] total counts for each sql state: " + cnt);
executor.shutdown();
}
/**
* Tests that calling {@code java.sql.Statement#close()} from a concurrent thread does not result
* in {@link java.util.ConcurrentModificationException}

0 comments on commit 04c5dbb

Please sign in to comment.