Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[CONJ-563] Driver deregistration now close active thread pools.
Customs pool provided interface will have to implements a new
close method.
  • Loading branch information
rusher committed Sep 10, 2019
1 parent 6b473f7 commit cbe7f56
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 30 deletions.
3 changes: 2 additions & 1 deletion src/main/java/org/mariadb/jdbc/Driver.java
Expand Up @@ -62,6 +62,7 @@
import java.util.List;
import java.util.Properties;
import org.mariadb.jdbc.internal.util.DefaultOptions;
import org.mariadb.jdbc.internal.util.DeRegister;
import org.mariadb.jdbc.internal.util.Options;
import org.mariadb.jdbc.internal.util.constant.Version;

Expand All @@ -70,7 +71,7 @@ public final class Driver implements java.sql.Driver {

static {
try {
DriverManager.registerDriver(new Driver());
DriverManager.registerDriver(new Driver(), new DeRegister());
} catch (SQLException e) {
throw new RuntimeException("Could not register driver", e);
}
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/org/mariadb/jdbc/internal/util/DeRegister.java
@@ -0,0 +1,13 @@
package org.mariadb.jdbc.internal.util;

import org.mariadb.jdbc.internal.util.scheduler.*;

import java.sql.*;

public class DeRegister implements DriverAction {

@Override
public void deregister() {
SchedulerServiceProviderHolder.getSchedulerProvider().close();
}
}
Expand Up @@ -59,42 +59,102 @@
import java.util.concurrent.TimeUnit;

/**
* Provider for when ever an internal thread pool is needed. This can allow library users to
* Provider for when ever an internal thread pool is needed. This can allow library users to
* override our default pooling behavior with possibly better and faster options.
*/
public class SchedulerServiceProviderHolder {

/**
* The default provider will construct a new pool on every request.
*/
public static final SchedulerProvider DEFAULT_PROVIDER = new SchedulerProvider() {
@Override
public DynamicSizedSchedulerInterface getScheduler(int minimumThreads, String poolName,
int maximumPoolSize) {
return new DynamicSizedSchedulerImpl(minimumThreads, poolName, maximumPoolSize);
}
/** The default provider will construct a new pool on every request. */
public static final SchedulerProvider DEFAULT_PROVIDER =
new SchedulerProvider() {

@Override
public ScheduledThreadPoolExecutor getFixedSizeScheduler(int minimumThreads, String poolName) {
return new FixedSizedSchedulerImpl(minimumThreads, poolName);
}
private DynamicSizedSchedulerInterface dynamicSizedScheduler;
private FixedSizedSchedulerImpl fixedSizedScheduler;
private ScheduledThreadPoolExecutor timeoutScheduler;
private ThreadPoolExecutor threadPoolExecutor;

@Override
public ScheduledThreadPoolExecutor getTimeoutScheduler() {
ScheduledThreadPoolExecutor timeoutScheduler = new ScheduledThreadPoolExecutor(1,
new MariaDbThreadFactory("MariaDb-timeout"));
timeoutScheduler.setRemoveOnCancelPolicy(true);
return timeoutScheduler;
}
@Override
public DynamicSizedSchedulerInterface getScheduler(
int minimumThreads, String poolName, int maximumPoolSize) {
if (dynamicSizedScheduler == null) {
synchronized (this) {
if (dynamicSizedScheduler == null) {
dynamicSizedScheduler =
new DynamicSizedSchedulerImpl(minimumThreads, poolName, maximumPoolSize);
}
}
}
return dynamicSizedScheduler;
}

@Override
@SuppressWarnings("unchecked")
public ThreadPoolExecutor getBulkScheduler() {
return new ThreadPoolExecutor(5, 100, 1, TimeUnit.MINUTES, new SynchronousQueue(),
new MariaDbThreadFactory("MariaDb-bulk"));
}
@Override
public ScheduledThreadPoolExecutor getFixedSizeScheduler(
int minimumThreads, String poolName) {
if (fixedSizedScheduler == null) {
synchronized (this) {
if (fixedSizedScheduler == null) {
fixedSizedScheduler = new FixedSizedSchedulerImpl(minimumThreads, poolName);
}
}
}
return fixedSizedScheduler;
}

};
@Override
public ScheduledThreadPoolExecutor getTimeoutScheduler() {
if (timeoutScheduler == null) {
synchronized (this) {
if (timeoutScheduler == null) {
timeoutScheduler =
new ScheduledThreadPoolExecutor(1, new MariaDbThreadFactory("MariaDb-timeout"));
timeoutScheduler.setRemoveOnCancelPolicy(true);
}
}
}
return timeoutScheduler;
}

@Override
@SuppressWarnings("unchecked")
public ThreadPoolExecutor getBulkScheduler() {
if (threadPoolExecutor == null) {
synchronized (this) {
if (threadPoolExecutor == null) {
threadPoolExecutor =
new ThreadPoolExecutor(
5,
100,
1,
TimeUnit.MINUTES,
new SynchronousQueue(),
new MariaDbThreadFactory("MariaDb-bulk"));
}
}
}
return threadPoolExecutor;
}

public synchronized void close() {

if (dynamicSizedScheduler != null) {
dynamicSizedScheduler.shutdownNow();
}
if (fixedSizedScheduler != null) {
fixedSizedScheduler.shutdownNow();
}
if (timeoutScheduler != null) {
timeoutScheduler.shutdownNow();
}
if (threadPoolExecutor != null) {
threadPoolExecutor.shutdownNow();
}

dynamicSizedScheduler = null;
fixedSizedScheduler = null;
timeoutScheduler = null;
threadPoolExecutor = null;
}
};

private static volatile SchedulerProvider currentProvider = null;

Expand Down Expand Up @@ -193,6 +253,8 @@ DynamicSizedSchedulerInterface getScheduler(int minimumThreads, String poolName,
ScheduledThreadPoolExecutor getTimeoutScheduler();

ThreadPoolExecutor getBulkScheduler();

void close();
}


Expand Down
56 changes: 56 additions & 0 deletions src/test/java/org/mariadb/jdbc/DriverTest.java
Expand Up @@ -78,8 +78,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
Expand All @@ -90,6 +92,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.mariadb.jdbc.internal.util.DefaultOptions;
import org.mariadb.jdbc.internal.util.DeRegister;
import org.mariadb.jdbc.internal.util.constant.HaMode;


Expand Down Expand Up @@ -1687,4 +1690,57 @@ public void databaseType() throws SQLException {
}
}
}

@Test
public void forcePoolClose() throws SQLException {
try (Connection conn = setConnection()) {
Statement stmt = conn.createStatement();
stmt.execute("CREATE TEMPORARY TABLE forcePoolClose(id int)");
//force use (and launch) of pipeline thread pool
try (PreparedStatement p = conn.prepareStatement("INSERT INTO forcePoolClose(id) VALUES (?)")) {
p.setInt(1, 1);
p.addBatch();
p.setInt(1, 2);
p.addBatch();
p.executeBatch();
}
new Thread(() -> {
try {
stmt.setQueryTimeout(1);
stmt.execute("select sleep(0.5)");
} catch (SQLException sqle) {

}
}).start();

}

//force de-registration
for (java.sql.Driver drv : Collections.list(DriverManager.getDrivers())) {
if (drv.acceptsURL("jdbc:mariadb:")) {
DriverManager.deregisterDriver(drv);

Iterator<Thread> it = Thread.getAllStackTraces().keySet().iterator();
Thread thread;
while (it.hasNext()) {
thread = it.next();
if (thread.getName().contains("MariaDb-bulk-")) {
for (StackTraceElement ste : thread.getStackTrace()) {
System.out.println(ste);
}
assertFalse(thread.isAlive());
}
}
}
}

//force registration
DriverManager.registerDriver(new org.mariadb.jdbc.Driver(), new DeRegister());

//ensure registration is ok
try (Connection conn = setConnection()) {

}
}

}
Expand Up @@ -148,6 +148,11 @@ public ScheduledThreadPoolExecutor getTimeoutScheduler() {
public ScheduledThreadPoolExecutor getBulkScheduler() {
throw new UnsupportedOperationException();
}

@Override
public void close() {
throw new UnsupportedOperationException();
}
};

SchedulerServiceProviderHolder.setSchedulerProvider(emptyProvider);
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/logback-test.xml
Expand Up @@ -60,7 +60,7 @@
</encoder>
</appender>

<logger additivity="false" level="trace" name="org.mariadb.jdbc">
<logger additivity="false" level="debug" name="org.mariadb.jdbc">
<appender-ref ref="STDOUT"/>
</logger>

Expand Down

0 comments on commit cbe7f56

Please sign in to comment.