Skip to content

Commit

Permalink
Merge branch 'useContextExecutorService2'
Browse files Browse the repository at this point in the history
  • Loading branch information
ceharris committed Apr 23, 2013
2 parents 7da92b5 + 902d4bb commit 3193a37
Show file tree
Hide file tree
Showing 18 changed files with 114 additions and 599 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,6 @@
*/
package ch.qos.logback.classic.net;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import ch.qos.logback.core.spi.ContextAwareBase;
import ch.qos.logback.core.spi.LifeCycle;

Expand All @@ -29,7 +25,6 @@
public abstract class ReceiverBase extends ContextAwareBase
implements LifeCycle {

private ExecutorService executor;
private boolean started;

/**
Expand All @@ -40,15 +35,10 @@ public final void start() {
if (getContext() == null) {
throw new IllegalStateException("context not set");
}

executor = createExecutorService();
if (shouldStart()) {
executor.execute(getRunnableTask());
getContext().getExecutorService().execute(getRunnableTask());
started = true;
}
else {
executor.shutdownNow();
}
}

/**
Expand All @@ -62,7 +52,6 @@ public final void stop() {
catch (RuntimeException ex) {
addError("on stop: " + ex, ex);
}
executor.shutdownNow();
started = false;
}

Expand Down Expand Up @@ -95,28 +84,4 @@ public final boolean isStarted() {
*/
protected abstract Runnable getRunnableTask();

/**
* Creates an executor for concurrent execution of tasks associated with
* the receiver (including the receiver's {@link #run()} task itself.
* <p>
* Subclasses may override to provide a custom executor.
*
* @return executor service
*/
protected ExecutorService createExecutorService() {
return Executors.newCachedThreadPool();
}

/**
* Provides access to the receiver's executor.
* <p>
* A subclass may use the executor returned by this method to run
* concurrent tasks as needed.
*
* @return executor
*/
protected Executor getExecutor() {
return executor;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public void run() {
reconnectionDelay);
while (!Thread.currentThread().isInterrupted()) {
try {
getExecutor().execute(connector);
getContext().getExecutorService().execute(connector);
} catch (RejectedExecutionException ex) {
// executor is shutting down...
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import javax.net.ServerSocketFactory;

import ch.qos.logback.classic.net.ReceiverBase;
import ch.qos.logback.core.net.SocketAppenderBase;
import ch.qos.logback.core.net.server.ServerListener;
import ch.qos.logback.core.net.server.ServerRunner;
import ch.qos.logback.core.net.server.ThreadPoolFactoryBean;
import ch.qos.logback.core.util.CloseUtil;

/**
Expand All @@ -45,7 +43,6 @@ public class ServerSocketReceiver extends ReceiverBase {
private int backlog = DEFAULT_BACKLOG;

private String address;
private ThreadPoolFactoryBean threadPool;

private ServerSocket serverSocket;
private ServerRunner runner;
Expand All @@ -61,7 +58,7 @@ protected boolean shouldStart() {
ServerListener<RemoteAppenderClient> listener =
createServerListener(serverSocket);

runner = createServerRunner(listener, getExecutor());
runner = createServerRunner(listener, getContext().getExecutorService());
runner.setContext(getContext());
return true;
}
Expand All @@ -83,11 +80,6 @@ protected ServerRunner createServerRunner(
return new RemoteAppenderServerRunner(listener, executor);
}

@Override
protected ExecutorService createExecutorService() {
return getThreadPool().createExecutor();
}

@Override
protected Runnable getRunnableTask() {
return runner;
Expand Down Expand Up @@ -183,24 +175,4 @@ public void setAddress(String address) {
this.address = address;
}

/**
* Gets the server's thread pool configuration.
* @return thread pool configuration; if no thread pool configuration was
* provided, a default configuration is returned
*/
public ThreadPoolFactoryBean getThreadPool() {
if (threadPool == null) {
return new ThreadPoolFactoryBean();
}
return threadPool;
}

/**
* Sets the server's thread pool configuration.
* @param threadPool the configuration to set
*/
public void setThreadPool(ThreadPoolFactoryBean threadPool) {
this.threadPool = threadPool;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package ch.qos.logback.classic.net.server;

import ch.qos.logback.core.joran.spi.DefaultNestedComponentRegistry;
import ch.qos.logback.core.net.server.ThreadPoolFactoryBean;
import ch.qos.logback.core.net.ssl.SSLConfiguration;

/**
Expand All @@ -27,8 +26,6 @@ public class SocketServerNestedComponentRegistryRules {
public static void addDefaultNestedComponentRegistryRules(
DefaultNestedComponentRegistry registry) {

registry.add(ServerSocketReceiver.class, "threadPool",
ThreadPoolFactoryBean.class);
registry.add(SSLServerSocketReceiver.class, "ssl",
SSLConfiguration.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.net.SocketFactory;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.LoggerFactory;

import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
Expand All @@ -54,20 +52,19 @@
*/
public class SocketReceiverTest {

private static final int DELAY = 200;
private static final int DELAY = 1000;
private static final String TEST_HOST_NAME = "NOT.A.VALID.HOST.NAME";


private ServerSocket serverSocket;
private Socket socket;
private ExecutorService executor = Executors.newCachedThreadPool();
private MockSocketFactory socketFactory = new MockSocketFactory();
private MockSocketConnector connector;
private MockAppender appender;
private LoggerContext lc;
private Logger logger;

private InstrumentedSocketReceiver remote =
private InstrumentedSocketReceiver receiver =
new InstrumentedSocketReceiver();

@Before
Expand All @@ -77,8 +74,9 @@ public void setUp() throws Exception {
serverSocket.getLocalPort());
connector = new MockSocketConnector(socket);

lc = (LoggerContext) LoggerFactory.getILoggerFactory();
remote.setContext(lc);
lc = new LoggerContext();
lc.reset();
receiver.setContext(lc);
appender = new MockAppender();
appender.start();
logger = lc.getLogger(getClass());
Expand All @@ -87,10 +85,9 @@ public void setUp() throws Exception {

@After
public void tearDown() throws Exception {
remote.stop();
if (!remote.isExecutorCreated()) {
executor.shutdownNow();
}
receiver.stop();
ExecutorService executor = lc.getExecutorService();
executor.shutdownNow();
assertTrue(executor.awaitTermination(DELAY, TimeUnit.MILLISECONDS));
socket.close();
serverSocket.close();
Expand All @@ -99,73 +96,73 @@ public void tearDown() throws Exception {

@Test
public void testStartNoRemoteAddress() throws Exception {
remote.start();
assertFalse(remote.isStarted());
receiver.start();
assertFalse(receiver.isStarted());
int count = lc.getStatusManager().getCount();
Status status = lc.getStatusManager().getCopyOfStatusList().get(count - 1);
assertTrue(status.getMessage().contains("host"));
}

@Test
public void testStartNoPort() throws Exception {
remote.setRemoteHost(TEST_HOST_NAME);
remote.start();
assertFalse(remote.isStarted());
receiver.setRemoteHost(TEST_HOST_NAME);
receiver.start();
assertFalse(receiver.isStarted());
int count = lc.getStatusManager().getCount();
Status status = lc.getStatusManager().getCopyOfStatusList().get(count - 1);
assertTrue(status.getMessage().contains("port"));
}

@Test
public void testStartUnknownHost() throws Exception {
remote.setPort(6000);
remote.setRemoteHost(TEST_HOST_NAME);
remote.start();
assertFalse(remote.isStarted());
receiver.setPort(6000);
receiver.setRemoteHost(TEST_HOST_NAME);
receiver.start();
assertFalse(receiver.isStarted());
int count = lc.getStatusManager().getCount();
Status status = lc.getStatusManager().getCopyOfStatusList().get(count - 1);
assertTrue(status.getMessage().contains("unknown host"));
}

@Test
public void testStartStop() throws Exception {
remote.setRemoteHost(InetAddress.getLocalHost().getHostName());
remote.setPort(6000);
remote.setAcceptConnectionTimeout(DELAY / 2);
remote.start();
assertTrue(remote.isStarted());
remote.awaitConnectorCreated(DELAY);
remote.stop();
assertFalse(remote.isStarted());
receiver.setRemoteHost(InetAddress.getLocalHost().getHostName());
receiver.setPort(6000);
receiver.setAcceptConnectionTimeout(DELAY / 2);
receiver.start();
assertTrue(receiver.isStarted());
receiver.awaitConnectorCreated(DELAY);
receiver.stop();
assertFalse(receiver.isStarted());
}

@Test
public void testServerSlowToAcceptConnection() throws Exception {
remote.setRemoteHost(InetAddress.getLocalHost().getHostName());
remote.setPort(6000);
remote.setAcceptConnectionTimeout(DELAY / 4);
remote.start();
assertTrue(remote.awaitConnectorCreated(DELAY / 2));
receiver.setRemoteHost(InetAddress.getLocalHost().getHostName());
receiver.setPort(6000);
receiver.setAcceptConnectionTimeout(DELAY / 4);
receiver.start();
assertTrue(receiver.awaitConnectorCreated(DELAY / 2));
// note that we don't call serverSocket.accept() here
// but stop (in tearDown) should still clean up everything
}

@Test
public void testServerDropsConnection() throws Exception {
remote.setRemoteHost(InetAddress.getLocalHost().getHostName());
remote.setPort(6000);
remote.start();
assertTrue(remote.awaitConnectorCreated(DELAY));
receiver.setRemoteHost(InetAddress.getLocalHost().getHostName());
receiver.setPort(6000);
receiver.start();
assertTrue(receiver.awaitConnectorCreated(DELAY));
Socket socket = serverSocket.accept();
socket.close();
}

@Test
public void testDispatchEventForEnabledLevel() throws Exception {
remote.setRemoteHost(InetAddress.getLocalHost().getHostName());
remote.setPort(6000);
remote.start();
assertTrue(remote.awaitConnectorCreated(DELAY));
receiver.setRemoteHost(InetAddress.getLocalHost().getHostName());
receiver.setPort(6000);
receiver.start();
assertTrue(receiver.awaitConnectorCreated(DELAY));
Socket socket = serverSocket.accept();

ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
Expand All @@ -187,10 +184,10 @@ public void testDispatchEventForEnabledLevel() throws Exception {

@Test
public void testNoDispatchEventForDisabledLevel() throws Exception {
remote.setRemoteHost(InetAddress.getLocalHost().getHostName());
remote.setPort(6000);
remote.start();
assertTrue(remote.awaitConnectorCreated(DELAY));
receiver.setRemoteHost(InetAddress.getLocalHost().getHostName());
receiver.setPort(6000);
receiver.start();
assertTrue(receiver.awaitConnectorCreated(DELAY));
Socket socket = serverSocket.accept();

ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
Expand All @@ -212,7 +209,6 @@ public void testNoDispatchEventForDisabledLevel() throws Exception {
private class InstrumentedSocketReceiver extends SocketReceiver {

private boolean connectorCreated;
private boolean executorCreated;

@Override
protected synchronized SocketConnector newConnector(
Expand All @@ -227,11 +223,6 @@ protected SocketFactory getSocketFactory() {
return socketFactory;
}

@Override
protected ExecutorService createExecutorService() {
return executor;
}

public synchronized boolean awaitConnectorCreated(long delay)
throws InterruptedException {
while (!connectorCreated) {
Expand All @@ -240,10 +231,6 @@ public synchronized boolean awaitConnectorCreated(long delay)
return connectorCreated;
}

public boolean isExecutorCreated() {
return executorCreated;
}

}

/**
Expand Down
Loading

0 comments on commit 3193a37

Please sign in to comment.