Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
MariaDbStatement: Removed "TimerTask"
Instead of using timers which create threads on demand, this leverages the existing "DynamicSizedScheduler" concept to allow a pool to reuse possible threads.
Thus avoiding the need to create possibly lots of threads if there is a server side issue requiring lots of queries to timeout.  Using this dynamic pool we can potentially reusue threads.
  • Loading branch information
jentfoo authored and Mike Jensen committed Mar 22, 2016
1 parent 46c4166 commit efc6405
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 34 deletions.
Expand Up @@ -288,11 +288,7 @@ private void executeQueryProlog(PrepareResult prepareResult) throws SQLException
Reset timeout after query, re-throw SQL exception
*/
private void executeQueryEpilog(QueryException exception, String sql) throws SQLException {

if (timerTask != null) {
timerTask.cancel();
timerTask = null;
}
stopTimeoutTask();

if (isTimedout) {
isTimedout = false;
Expand Down
64 changes: 36 additions & 28 deletions src/main/java/org/mariadb/jdbc/MariaDbStatement.java
Expand Up @@ -51,8 +51,9 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS

import org.mariadb.jdbc.internal.queryresults.UpdateResult;
import org.mariadb.jdbc.internal.util.ExceptionMapper;
import org.mariadb.jdbc.internal.util.dao.PrepareResult;
import org.mariadb.jdbc.internal.util.dao.QueryException;
import org.mariadb.jdbc.internal.util.scheduler.DynamicSizedSchedulerInterface;
import org.mariadb.jdbc.internal.util.scheduler.SchedulerServiceProviderHolder;
import org.mariadb.jdbc.internal.queryresults.AbstractQueryResult;
import org.mariadb.jdbc.internal.queryresults.ResultSetType;
import org.mariadb.jdbc.internal.protocol.Protocol;
Expand All @@ -61,11 +62,15 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS
import java.io.InputStream;
import java.sql.*;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;


public class MariaDbStatement implements Statement {
private static volatile Timer timer;
private static final DynamicSizedSchedulerInterface timeoutScheduler =
SchedulerServiceProviderHolder.getScheduler(0);
/**
* the protocol used to talk to the server.
*/
Expand All @@ -82,7 +87,7 @@ public class MariaDbStatement implements Statement {
* The actual query result.
*/
protected AbstractQueryResult queryResult;
protected TimerTask timerTask;
protected Future<?> timerTaskFuture;
protected boolean isRewriteable = true;
protected int rewriteOffset = -1;
protected ResultSet batchResultSet = null;
Expand Down Expand Up @@ -113,26 +118,11 @@ public MariaDbStatement(MariaDbConnection connection, int autoGeneratedKeys) {
cachedResultSets = new ArrayDeque<>();
}

private static Timer getTimer() {
Timer result = timer;
if (result == null) {
synchronized (MariaDbStatement.class) {
result = timer;
if (result == null) {
timer = result = new Timer("MariaDB-JDBC-Timer", true);
}
}
}
return result;
}

/**
* Provide a "cleanup" method that can be called after unloading driver, to fix Tomcat's obscure classpath handling.
*/
public static void unloadDriver() {
if (timer != null) {
timer.cancel();
}
// nothing to do here, as scheduler is already daemon thread
}

public boolean isStreaming() {
Expand All @@ -141,8 +131,11 @@ public boolean isStreaming() {

// Part of query prolog - setup timeout timer
protected void setTimerTask() {
assert (timerTask == null);
timerTask = new TimerTask() {
assert (timerTaskFuture == null);

// because canceling needs to establish a new connection, we want to ensure that this
// possible blocking call can be run in parallel if multiple queries need to timeout
timerTaskFuture = timeoutScheduler.addThreadAndSchedule(new Runnable() {
@Override
public void run() {
try {
Expand All @@ -151,8 +144,7 @@ public void run() {
} catch (Throwable e) {
}
}
};
getTimer().schedule(timerTask, queryTimeout * 1000);
}, queryTimeout, TimeUnit.SECONDS);
}

protected void executeQueryProlog() throws SQLException {
Expand Down Expand Up @@ -185,16 +177,32 @@ protected void cacheMoreResults() throws SQLException {
}
queryResult = saveResult;
}

protected void stopTimeoutTask() {
if (timerTaskFuture != null) {
if (! timerTaskFuture.cancel(true)) {
// could not cancel, task either started or already finished
// we must now wait for task to finish to ensure state modifications are done
try {
timerTaskFuture.get();
} catch (InterruptedException e) {
// reset interrupt status
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
// ignore error, likely due to interrupting during cancel
}
// we don't catch the exception if already canceled, that would indicate we tried
// to cancel in parallel (which this code currently is not designed for)
}
timerTaskFuture = null;
}
}

/*
Reset timeout after query, re-throw SQL exception
*/
protected void executeQueryEpilog(QueryException queryException) throws SQLException {

if (timerTask != null) {
timerTask.cancel();
timerTask = null;
}
stopTimeoutTask();

if (isTimedout) {
isTimedout = false;
Expand Down
Expand Up @@ -49,7 +49,10 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS

package org.mariadb.jdbc.internal.util.scheduler;

import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DynamicSizedSchedulerImpl extends ScheduledThreadPoolExecutor implements DynamicSizedSchedulerInterface {
/**
Expand All @@ -62,6 +65,49 @@ public DynamicSizedSchedulerImpl(int corePoolSize) {

@Override
public void setPoolSize(int newSize) {
super.setCorePoolSize(newSize);
synchronized (this) {
super.setCorePoolSize(newSize);
}
}

@Override
public void adjustPoolSize(int delta) {
// locked to avoid check then act race condition
synchronized (this) {
super.setCorePoolSize(Math.max(0, super.getCorePoolSize() + delta));
}
}

@Override
public Future<?> addThreadAndExecute(final Runnable task) {
return addThreadAndSchedule(task, 0, TimeUnit.MILLISECONDS);
}

@Override
public Future<?> addThreadAndSchedule(final Runnable task, long delay, TimeUnit unit) {
adjustPoolSize(1);

FutureTask<?> result = new PoolSizeDecreaseFuture(task);
if (delay == 0) {
// execute is slightly better if we can, as it avoids wrapping the future in another future
super.execute(result);
} else {
super.schedule(result, delay, unit);
// can not return future from schedule above
// we must return our decreasing future to handle Future.cancel case
}
return result;
}

private class PoolSizeDecreaseFuture extends FutureTask<Object> {
public PoolSizeDecreaseFuture(Runnable runnable) {
super(runnable, null);
}

@Override
protected void done() {
// invoked when task is complete or canceled
adjustPoolSize(-1);
}
}
}
Expand Up @@ -49,7 +49,9 @@ WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWIS

package org.mariadb.jdbc.internal.util.scheduler;

import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public interface DynamicSizedSchedulerInterface extends ScheduledExecutorService {
/**
Expand All @@ -58,4 +60,32 @@ public interface DynamicSizedSchedulerInterface extends ScheduledExecutorService
* @param newSize New pool size that is superior to 0
*/
public void setPoolSize(int newSize);

/**
* Adjusts the pool size by a given delta. This should atomically change the pool size in a
* thread safe way.
*
* @param delta A positive or negative number to adjust the pool size by.
*/
public void adjustPoolSize(int delta);

/**
* Used when you want to ensure an extra thread is available to execute task. This will adjust
* the pool size so that a thread can be created (if necessary), and then execute the task.
* Once the task completes the pool size will be adjusted back down.
*
* @param task Task to be executed
* @return Future which represents task execution
*/
public Future<?> addThreadAndExecute(Runnable task);

/**
* Used when you want to ensure an extra thread is available for scheduled task. This will
* adjust the pool size so that a thread can be created (if necessary), and then execute the
* task. Once the task completes the pool size will be adjusted back down.
*
* @param task Task to be executed
* @return Future which represents task execution
*/
public Future<?> addThreadAndSchedule(Runnable task, long delay, TimeUnit unit);
}

0 comments on commit efc6405

Please sign in to comment.