Permalink
Browse files

fix: statement.cancel and statement.setQueryTimeout should be thread-…

…safe

Statement.cancel was not thread-safe, thus query timeout from one statement might cancel subsequent statement.
The change introduces state tracking of a statement (idle, in-query, cancelling, cancelled), so out-of-order cancels can be detected and ignored

fixes #412
  • Loading branch information...
vlsi committed Nov 3, 2015
1 parent 892d7af commit bc3d8488a0a4fa16e712be83d8debaf4b9130dfa
Showing with 138 additions and 31 deletions.
  1. +138 −31 org/postgresql/jdbc2/AbstractJdbc2Statement.java
@@ -41,6 +41,8 @@
import java.util.Map;
import java.util.TimeZone;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.postgresql.Driver;
import org.postgresql.core.BaseConnection;
@@ -85,7 +87,34 @@
protected final int resultsettype; // the resultset type to return (ResultSet.TYPE_xxx)
protected final int concurrency; // is it updateable or not? (ResultSet.CONCUR_xxx)
protected int fetchdirection = ResultSet.FETCH_FORWARD; // fetch direction hint (currently ignored)
/**
* Protects current statement from cancelTask starting, waiting for a bit, and waking up exactly on subsequent query execution.
* The idea is to atomically compare and swap the reference to the task, so the task can detect that statement executes different
* query than the one the cancelTask was created.
* Note: the field must be set/get/compareAndSet via {@link #CANCEL_TIMER_UPDATER} as per {@link AtomicReferenceFieldUpdater} javadoc.
*/
private volatile TimerTask cancelTimerTask = null;
private static final AtomicReferenceFieldUpdater<AbstractJdbc2Statement, TimerTask> CANCEL_TIMER_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(AbstractJdbc2Statement.class, TimerTask.class, "cancelTimerTask");
/**
* Protects statement from out-of-order cancels. It protects from both {@link #setQueryTimeout(int)} and {@link #cancel()} induced ones.
*
* .execute() and friends change statementState to STATE_IN_QUERY during execute.
* .cancel() ignores cancel request if state is IDLE.
* In case .execute() observes non-IN_QUERY state as it completes the query, it waits till
* STATE_CANCELLED.
* Note: the field must be set/get/compareAndSet via {@link #STATE_UPDATER} as per {@link AtomicIntegerFieldUpdater} javadoc.
*/
private volatile int statementState = STATE_IDLE;
private final static int STATE_IDLE = 0;
private final static int STATE_IN_QUERY = 1;
private final static int STATE_CANCELLING = 2;
private final static int STATE_CANCELLED = 3;
private static final AtomicIntegerFieldUpdater<AbstractJdbc2Statement> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(AbstractJdbc2Statement.class, "statementState");
/**
* Does the caller of execute/executeUpdate want generated keys for this
@@ -102,7 +131,7 @@
public boolean wantsGeneratedKeysAlways = false;
// The connection who created us
protected BaseConnection connection;
protected final BaseConnection connection;
/** The warnings chain. */
protected SQLWarning warnings = null;
@@ -115,7 +144,7 @@
/** Number of rows to get in a batch. */
protected int fetchSize = 0;
/** Timeout (in milli-seconds) for a query */
/** Timeout (in milliseconds) for a query */
protected int timeout = 0;
protected boolean replaceProcessingEnabled = true;
@@ -773,11 +802,11 @@ public void setQueryTimeout(int seconds) throws SQLException
}
/*
* The queryTimeout limit is the number of seconds the driver
* The queryTimeout limit is the number of milliseconds the driver
* will wait for a Statement to execute. If the limit is
* exceeded, a SQLException is thrown.
*
* @return the current query timeout limit in seconds; 0 = unlimited
* @return the current query timeout limit in milliseconds; 0 = unlimited
* @exception SQLException if a database access error occurs
*/
public int getQueryTimeoutMs() throws SQLException
@@ -789,7 +818,7 @@ public int getQueryTimeoutMs() throws SQLException
/*
* Sets the queryTimeout limit
*
* @param seconds - the new query timeout limit in seconds
* @param seconds - the new query timeout limit in milliseconds
* @exception SQLException if a database access error occurs
*/
public void setQueryTimeoutMs(int millis) throws SQLException
@@ -920,7 +949,7 @@ public void close() throws SQLException
if (isClosed)
return ;
killTimerTask();
cleanupTimer();
closeForNextExecution();
@@ -2953,7 +2982,26 @@ public void handleCompletion() throws SQLException {
*/
public void cancel() throws SQLException
{
connection.cancelQuery();
if (!STATE_UPDATER.compareAndSet(this, STATE_IN_QUERY, STATE_CANCELLING))
{
// Not in query, there's nothing to cancel
return;
}
try
{
// Synchronize on connection to avoid spinning in killTimerTask
synchronized (connection)
{
connection.cancelQuery();
}
} finally
{
STATE_UPDATER.set(this, STATE_CANCELLED);
synchronized (connection)
{
connection.notifyAll(); // wake-up killTimerTask
}
}
}
public Connection getConnection() throws SQLException
@@ -3478,39 +3526,98 @@ public void registerOutParameter(int parameterIndex, int sqlType, String typeNam
throw Driver.notImplemented(this.getClass(), "registerOutParameter(int,int,String)");
}
protected synchronized void startTimer()
private void startTimer()
{
if (timeout == 0)
return;
/*
* there shouldn't be any previous timer active, but better safe than
* sorry.
*/
cleanupTimer();
/*
* there shouldn't be any previous timer active, but better safe than
* sorry.
*/
killTimerTask();
STATE_UPDATER.set(this, STATE_IN_QUERY);
cancelTimerTask = new TimerTask() {
public void run()
{
try {
AbstractJdbc2Statement.this.cancel();
} catch (SQLException e) {
}
}
};
if (timeout == 0)
{
return;
}
connection.addTimerTask(cancelTimerTask, timeout);
TimerTask cancelTask = new TimerTask() {
public void run()
{
try
{
if (!CANCEL_TIMER_UPDATER.compareAndSet(AbstractJdbc2Statement.this, this, null))
{
return; // Nothing to do here, statement has already finished and cleared cancelTimerTask reference
}
AbstractJdbc2Statement.this.cancel();
} catch (SQLException e)
{
}
}
};
CANCEL_TIMER_UPDATER.set(this, cancelTask);
connection.addTimerTask(cancelTask, timeout);
}
private synchronized void killTimerTask()
/*
* Clears {@link #cancelTimerTask} if any.
* Returns true if and only if "cancel" timer task would never invoke {@link #cancel()}.
*/
private boolean cleanupTimer()
{
if (cancelTimerTask != null) {
cancelTimerTask.cancel();
cancelTimerTask = null;
connection.purgeTimerTasks();
TimerTask timerTask = CANCEL_TIMER_UPDATER.get(this);
if (timerTask == null)
{
// If timeout is zero, then timer task did not exist, so we safely report "all clear"
return timeout == 0;
}
if (!CANCEL_TIMER_UPDATER.compareAndSet(this, timerTask, null))
{
// Failed to update reference -> timer has just fired, so we must wait for the query state to become "cancelling".
return false;
}
timerTask.cancel();
connection.purgeTimerTasks();
// All clear
return true;
}
private void killTimerTask()
{
boolean timerTaskIsClear = cleanupTimer();
// The order is important here: in case we need to wait for the cancel task, the state must be
// kept STATE_IN_QUERY, so cancelTask would be able to cancel the query.
// It is believed that this case is very rare, so "additional cancel and wait below" would not harm it.
if (timerTaskIsClear && STATE_UPDATER.compareAndSet(this, STATE_IN_QUERY, STATE_IDLE))
{
return;
}
// Being here means someone managed to call .cancel() and our connection did not receive "timeout error"
// We wait till state becomes "cancelled"
boolean interrupted = false;
while (!STATE_UPDATER.compareAndSet(this, STATE_CANCELLED, STATE_IDLE))
{
synchronized (connection)
{
try
{
// Note: wait timeout here is irrelevant since synchronized(connection) would block until .cancel finishes
connection.wait(10);
} catch (InterruptedException e)
{
interrupted = true;
}
}
}
if (interrupted)
{
Thread.currentThread().interrupt();
}
}
protected boolean getForceBinaryTransfer()
{
return forceBinaryTransfers;

0 comments on commit bc3d848

Please sign in to comment.