Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added TimeoutPoller to have one thread for timers #842

Merged
merged 15 commits into from
Nov 23, 2018
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
143 changes: 31 additions & 112 deletions src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,10 @@
import java.util.Set;
import java.util.SimpleTimeZone;
import java.util.TimeZone;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -6192,7 +6187,7 @@ final class TDSReaderMark {
final class TDSReader {
private final static Logger logger = Logger.getLogger("com.microsoft.sqlserver.jdbc.internals.TDS.Reader");
final private String traceID;
private TimeoutTimer tcpKeepAliveTimeoutTimer;
private TimeoutCommand timeoutCommand;
shayaantx marked this conversation as resolved.
Show resolved Hide resolved

final public String toString() {
return traceID;
Expand Down Expand Up @@ -6236,17 +6231,6 @@ private static int nextReaderID() {
this.tdsChannel = tdsChannel;
this.con = con;
this.command = command; // may be null
if (null != command) {
// if cancelQueryTimeout is set, we should wait for the total amount of queryTimeout + cancelQueryTimeout to
// terminate the connection.
this.tcpKeepAliveTimeoutTimer = (command.getCancelQueryTimeoutSeconds() > 0
&& command.getQueryTimeoutSeconds() > 0)
? (new TimeoutTimer(
command.getCancelQueryTimeoutSeconds()
+ command.getQueryTimeoutSeconds(),
null, con))
: null;
}
// if the logging level is not detailed than fine or more we will not have proper reader IDs.
if (logger.isLoggable(Level.FINE))
traceID = "TDSReader@" + nextReaderID() + " (" + con.toString() + ")";
Expand Down Expand Up @@ -6351,11 +6335,16 @@ synchronized final boolean readPacket() throws SQLServerException {
+ " should be less than numMsgsSent:" + tdsChannel.numMsgsSent;

TDSPacket newPacket = new TDSPacket(con.getTDSPacketSize());
if (null != tcpKeepAliveTimeoutTimer) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest(this.toString() + ": starting timer...");
if (null != command) {
// if cancelQueryTimeout is set, we should wait for the total amount of
// queryTimeout + cancelQueryTimeout to
// terminate the connection.
if ((command.getCancelQueryTimeoutSeconds() > 0 && command.getQueryTimeoutSeconds() > 0)) {
// if a timeout is configured with this object, add it to the timeout poller
int timeout = command.getCancelQueryTimeoutSeconds() + command.getQueryTimeoutSeconds();
this.timeoutCommand = new TdsTimeoutCommand(timeout, this.command, this.con);
TimeoutPoller.getTimeoutPoller().addTimeoutCommand(this.timeoutCommand);
shayaantx marked this conversation as resolved.
Show resolved Hide resolved
}
tcpKeepAliveTimeoutTimer.start();
}
// First, read the packet header.
for (int headerBytesRead = 0; headerBytesRead < TDS.PACKET_HEADER_SIZE;) {
Expand All @@ -6375,11 +6364,8 @@ synchronized final boolean readPacket() throws SQLServerException {
}

// if execution was subject to timeout then stop timing
if (null != tcpKeepAliveTimeoutTimer) {
if (logger.isLoggable(Level.FINEST)) {
logger.finest(this.toString() + ":stopping timer...");
}
tcpKeepAliveTimeoutTimer.stop();
if (this.timeoutCommand != null) {
TimeoutPoller.getTimeoutPoller().remove(this.timeoutCommand);
shayaantx marked this conversation as resolved.
Show resolved Hide resolved
}
// Header size is a 2 byte unsigned short integer in big-endian order.
int packetLength = Util.readUnsignedShortBigEndian(newPacket.header, TDS.PACKET_HEADER_MESSAGE_LENGTH);
Expand Down Expand Up @@ -6969,82 +6955,25 @@ final void trySetSensitivityClassification(SensitivityClassification sensitivity


/**
* Timer for use with Commands that support a timeout.
*
* Once started, the timer runs for the prescribed number of seconds unless stopped. If the timer runs out, it
* interrupts its associated Command with a reason like "timed out".
* The tds default implementation of a timeout command
*/
final class TimeoutTimer implements Runnable {
private static final String threadGroupName = "mssql-jdbc-TimeoutTimer";
private final int timeoutSeconds;
private final TDSCommand command;
private volatile Future<?> task;
private final SQLServerConnection con;

private static final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
private final AtomicReference<ThreadGroup> tgr = new AtomicReference<>();
private final AtomicInteger threadNumber = new AtomicInteger(0);

@Override
public Thread newThread(Runnable r) {
ThreadGroup tg = tgr.get();

if (tg == null || tg.isDestroyed()) {
tg = new ThreadGroup(threadGroupName);
tgr.set(tg);
}

Thread t = new Thread(tg, r, tg.getName() + "-" + threadNumber.incrementAndGet());
t.setDaemon(true);
return t;
}
});

private volatile boolean canceled = false;

TimeoutTimer(int timeoutSeconds, TDSCommand command, SQLServerConnection con) {
assert timeoutSeconds > 0;

this.timeoutSeconds = timeoutSeconds;
this.command = command;
this.con = con;
}

final void start() {
task = executor.submit(this);
}

final void stop() {
task.cancel(true);
canceled = true;
class TdsTimeoutCommand extends TimeoutCommand<TDSCommand> {
public TdsTimeoutCommand(int timeout, TDSCommand command, SQLServerConnection sqlServerConnection) {
super(timeout, command, sqlServerConnection);
}

public void run() {
int secondsRemaining = timeoutSeconds;
public void interrupt() {
TDSCommand command = getCommand();
SQLServerConnection sqlServerConnection = getSqlServerConnection();
try {
// Poll every second while time is left on the timer.
// Return if/when the timer is canceled.
do {
if (canceled)
return;

Thread.sleep(1000);
} while (--secondsRemaining > 0);
} catch (InterruptedException e) {
// re-interrupt the current thread, in order to restore the thread's interrupt status.
Thread.currentThread().interrupt();
return;
}

// If the timer wasn't canceled before it ran out of
// time then interrupt the registered command.
try {
// If TCP Connection to server is silently dropped, exceeding the query timeout on the same connection does
// If TCP Connection to server is silently dropped, exceeding the query timeout
// on the same connection does
// not throw SQLTimeoutException
// The application stops responding instead until SocketTimeoutException is thrown. In this case, we must
// The application stops responding instead until SocketTimeoutException is
// thrown. In this case, we must
// manually terminate the connection.
if (null == command && null != con) {
con.terminate(SQLServerException.DRIVER_ERROR_IO_FAILED,
if (null == command && null != sqlServerConnection) {
sqlServerConnection.terminate(SQLServerException.DRIVER_ERROR_IO_FAILED,
SQLServerException.getErrString("R_connectionIsClosed"));
} else {
// If the timer wasn't canceled before it ran out of
Expand All @@ -7061,7 +6990,6 @@ public void run() {
}
}


/**
* TDSCommand encapsulates an interruptable TDS conversation.
*
Expand Down Expand Up @@ -7095,10 +7023,6 @@ final void log(Level level, String message) {
logger.log(level, toString() + ": " + message);
}

// Optional timer that is set if the command was created with a non-zero timeout period.
// When the timer expires, the command is interrupted.
private final TimeoutTimer timeoutTimer;

// TDS channel accessors
// These are set/reset at command execution time.
// Volatile ensures visibility to execution thread and interrupt thread
Expand Down Expand Up @@ -7187,6 +7111,7 @@ protected void setProcessedResponse(boolean processedResponse) {
private volatile boolean readingResponse;
private int queryTimeoutSeconds;
private int cancelQueryTimeoutSeconds;
private TdsTimeoutCommand timeoutCommand;

protected int getQueryTimeoutSeconds() {
return this.queryTimeoutSeconds;
Expand All @@ -7213,7 +7138,6 @@ final boolean readingResponse() {
this.logContext = logContext;
this.queryTimeoutSeconds = queryTimeoutSeconds;
this.cancelQueryTimeoutSeconds = cancelQueryTimeoutSeconds;
this.timeoutTimer = (queryTimeoutSeconds > 0) ? (new TimeoutTimer(queryTimeoutSeconds, this, null)) : null;
}

/**
Expand Down Expand Up @@ -7602,11 +7526,9 @@ final TDSReader startResponse(boolean isAdaptive) throws SQLServerException {

// If command execution is subject to timeout then start timing until
// the server returns the first response packet.
if (null != timeoutTimer) {
if (logger.isLoggable(Level.FINEST))
logger.finest(this.toString() + ": Starting timer...");

timeoutTimer.start();
if (queryTimeoutSeconds > 0) {
this.timeoutCommand = new TdsTimeoutCommand(queryTimeoutSeconds, this, null);
TimeoutPoller.getTimeoutPoller().addTimeoutCommand(this.timeoutCommand);
}

if (logger.isLoggable(Level.FINEST))
Expand All @@ -7629,11 +7551,8 @@ final TDSReader startResponse(boolean isAdaptive) throws SQLServerException {
} finally {
// If command execution was subject to timeout then stop timing as soon
// as the server returns the first response packet or errors out.
if (null != timeoutTimer) {
if (logger.isLoggable(Level.FINEST))
logger.finest(this.toString() + ": Stopping timer...");

timeoutTimer.stop();
if (this.timeoutCommand != null) {
TimeoutPoller.getTimeoutPoller().remove(this.timeoutCommand);
}
}

Expand Down
72 changes: 16 additions & 56 deletions src/main/java/com/microsoft/sqlserver/jdbc/SQLServerBulkCopy.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,56 +247,16 @@ class BulkColumnMetaData {
private int srcColumnCount;

/**
* Timer for the bulk copy operation. The other timeout timers in the TDS layer only measure the response of the
* first packet from SQL Server.
* Timeout for the bulk copy command
*/
private final class BulkTimeoutTimer implements Runnable {
private final int timeoutSeconds;
private int secondsRemaining;
private final TDSCommand command;
private Thread timerThread;
private volatile boolean canceled = false;

BulkTimeoutTimer(int timeoutSeconds, TDSCommand command) {
assert timeoutSeconds > 0;
assert null != command;

this.timeoutSeconds = timeoutSeconds;
this.secondsRemaining = timeoutSeconds;
this.command = command;
}

final void start() {
timerThread = new Thread(this);
timerThread.setDaemon(true);
timerThread.start();
}

final void stop() {
canceled = true;
timerThread.interrupt();
private final class BulkTimeoutCommand extends TimeoutCommand<TDSCommand> {
public BulkTimeoutCommand(int timeout, TDSCommand command, SQLServerConnection sqlServerConnection) {
super(timeout, command, sqlServerConnection);
}

final boolean expired() {
return (secondsRemaining <= 0);
}

public void run() {
try {
// Poll every second while time is left on the timer.
// Return if/when the timer is canceled.
do {
if (canceled)
return;

Thread.sleep(1000);
} while (--secondsRemaining > 0);
} catch (InterruptedException e) {
// re-interrupt the current thread, in order to restore the thread's interrupt status.
Thread.currentThread().interrupt();
return;
}

@Override
public void interrupt() {
TDSCommand command = getCommand();
// If the timer wasn't canceled before it ran out of
// time then interrupt the registered command.
try {
Expand All @@ -310,7 +270,7 @@ public void run() {
}
}

private BulkTimeoutTimer timeoutTimer = null;
private BulkTimeoutCommand timeoutCommand;

/**
* The maximum temporal precision we can send when using varchar(precision) in bulkcommand, to send a
Expand Down Expand Up @@ -687,15 +647,15 @@ final class InsertBulk extends TDSCommand {
InsertBulk() {
super("InsertBulk", 0, 0);
int timeoutSeconds = copyOptions.getBulkCopyTimeout();
timeoutTimer = (timeoutSeconds > 0) ? (new BulkTimeoutTimer(timeoutSeconds, this)) : null;
timeoutCommand = timeoutSeconds > 0 ? new BulkTimeoutCommand(timeoutSeconds, this, null) : null;
}

final boolean doExecute() throws SQLServerException {
if (null != timeoutTimer) {
if (null != timeoutCommand) {
if (logger.isLoggable(Level.FINEST))
logger.finest(this.toString() + ": Starting bulk timer...");

timeoutTimer.start();
TimeoutPoller.getTimeoutPoller().addTimeoutCommand(timeoutCommand);
}

// doInsertBulk inserts the rows in one batch. It returns true if there are more rows in
Expand All @@ -712,18 +672,18 @@ final boolean doExecute() throws SQLServerException {

// Check whether it is a timeout exception.
if (rootCause instanceof SQLException) {
checkForTimeoutException((SQLException) rootCause, timeoutTimer);
checkForTimeoutException((SQLException) rootCause, timeoutCommand);
}

// It is not a timeout exception. Re-throw.
throw topLevelException;
}

if (null != timeoutTimer) {
if (null != timeoutCommand) {
if (logger.isLoggable(Level.FINEST))
logger.finest(this.toString() + ": Stopping bulk timer...");

timeoutTimer.stop();
TimeoutPoller.getTimeoutPoller().remove(timeoutCommand);
}

return true;
Expand Down Expand Up @@ -1188,9 +1148,9 @@ private void writeColumnMetaData(TDSWriter tdsWriter) throws SQLServerException
/**
* Helper method that throws a timeout exception if the cause of the exception was that the query was cancelled
*/
private void checkForTimeoutException(SQLException e, BulkTimeoutTimer timeoutTimer) throws SQLServerException {
private void checkForTimeoutException(SQLException e, TimeoutCommand timeoutCommand) throws SQLServerException {
shayaantx marked this conversation as resolved.
Show resolved Hide resolved
if ((null != e.getSQLState()) && (e.getSQLState().equals(SQLState.STATEMENT_CANCELED.getSQLStateCode()))
&& timeoutTimer.expired()) {
&& timeoutCommand.canTimeout()) {
// If SQLServerBulkCopy is managing the transaction, a rollback is needed.
if (copyOptions.isUseInternalTransaction()) {
connection.rollback();
Expand Down