diff --git a/logback-classic/src/main/java/ch/qos/logback/classic/net/server/ServerSocketReceiver.java b/logback-classic/src/main/java/ch/qos/logback/classic/net/server/ServerSocketReceiver.java index be0b8c11ff..b7b48675b6 100644 --- a/logback-classic/src/main/java/ch/qos/logback/classic/net/server/ServerSocketReceiver.java +++ b/logback-classic/src/main/java/ch/qos/logback/classic/net/server/ServerSocketReceiver.java @@ -18,19 +18,24 @@ import java.net.ServerSocket; import java.net.UnknownHostException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.net.ServerSocketFactory; import ch.qos.logback.classic.net.ReceiverBase; +import ch.qos.logback.core.CoreConstants; import ch.qos.logback.core.net.AbstractSocketAppender; import ch.qos.logback.core.net.server.ServerListener; import ch.qos.logback.core.net.server.ServerRunner; -import ch.qos.logback.core.util.CloseUtil; /** * A logging socket server that is configurable using Joran. * * @author Carl Harris + * @author Sebastian Gröbler */ public class ServerSocketReceiver extends ReceiverBase { @@ -44,8 +49,11 @@ public class ServerSocketReceiver extends ReceiverBase { private String address; - private ServerSocket serverSocket; private ServerRunner runner; + + private int corePoolSize = CoreConstants.CORE_POOL_SIZE; + private int maxPoolSize = CoreConstants.MAX_POOL_SIZE; + protected ExecutorService connectionPoolExecutorService; /** * Starts the server. @@ -58,13 +66,12 @@ protected boolean shouldStart() { ServerListener listener = createServerListener(serverSocket); - runner = createServerRunner(listener, getContext().getExecutorService()); + runner = createServerRunner(listener, getConnectionPoolExecutorService()); runner.setContext(getContext()); return true; } catch (Exception ex) { addError("server startup error: " + ex, ex); - CloseUtil.closeQuietly(serverSocket); return false; } } @@ -95,7 +102,29 @@ protected void onStop() { } catch (IOException ex) { addError("server shutdown error: " + ex, ex); + } finally { + shutDownExecutorService(); + } + } + + private synchronized void shutDownExecutorService() { + connectionPoolExecutorService.shutdownNow(); + connectionPoolExecutorService = null; + } + + private ExecutorService getConnectionPoolExecutorService() { + if (connectionPoolExecutorService == null) { + synchronized (this) { + if (connectionPoolExecutorService == null) { + connectionPoolExecutorService = new ThreadPoolExecutor( + getCorePoolSize(), + getMaxPoolSize(), + 0L, TimeUnit.MILLISECONDS, + new SynchronousQueue()); + } + } } + return connectionPoolExecutorService; } /** @@ -175,4 +204,37 @@ public void setAddress(String address) { this.address = address; } + /** + * Gets the core pool size for the socket client connection pool. + * The default value is {@link CoreConstants#CORE_POOL_SIZE}. + * @return the core pool size + */ + public int getCorePoolSize() { + return corePoolSize; + } + + /** + * Sets the core number of threads for the socket client connection pool. + * @param corePoolSize the core pool size + */ + public void setCorePoolSize(int corePoolSize) { + this.corePoolSize = corePoolSize; + } + + /** + * Gets the maximum pool size for the socket client connection pool. + * The default value is {@link CoreConstants#MAX_POOL_SIZE}. + * @return the maximum pool size + */ + public int getMaxPoolSize() { + return maxPoolSize; + } + + /** + * Sets the maximum allowed number of threads for the socket client connection pool. + * @param maxPoolSize the maximum pool size + */ + public void setMaxPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + } } diff --git a/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ConnectionDropCountingServerSocketAppender.java b/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ConnectionDropCountingServerSocketAppender.java new file mode 100644 index 0000000000..43b9aa8dc0 --- /dev/null +++ b/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ConnectionDropCountingServerSocketAppender.java @@ -0,0 +1,139 @@ +/** + * Logback: the reliable, generic, fast and flexible logging framework. + * Copyright (C) 1999-2014, QOS.ch. All rights reserved. + * + * This program and the accompanying materials are dual-licensed under + * either the terms of the Eclipse Public License v1.0 as published by + * the Eclipse Foundation + * + * or (per the licensee's choosing) + * + * under the terms of the GNU Lesser General Public License version 2.1 + * as published by the Free Software Foundation. + */ + +package ch.qos.logback.classic.net.server; + +import java.io.IOException; +import java.io.Serializable; +import java.net.ServerSocket; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; + +import ch.qos.logback.classic.net.LoggingEventPreSerializationTransformer; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.Context; +import ch.qos.logback.core.net.server.AbstractServerSocketAppender; +import ch.qos.logback.core.net.server.RemoteReceiverClient; +import ch.qos.logback.core.net.server.ServerListener; +import ch.qos.logback.core.spi.PreSerializationTransformer; +import ch.qos.logback.core.status.Status; + +/** + * Implementation of {@link AbstractServerSocketAppender} which allows to observe how many client connections have been closed. + * + * @author Sebastian Gröbler + */ +public class ConnectionDropCountingServerSocketAppender extends AbstractServerSocketAppender { + + private final CountDownLatch closeCounter; + private PreSerializationTransformer pst = new LoggingEventPreSerializationTransformer(); + + public ConnectionDropCountingServerSocketAppender(final CountDownLatch closeCounter) { + this.closeCounter = closeCounter; + } + + @Override + protected void postProcessEvent(final ILoggingEvent event) { + event.prepareForDeferredProcessing(); + } + + @Override + protected PreSerializationTransformer getPST() { + return pst; + } + + @Override + protected ServerListener createServerListener(final ServerSocket socket) { + return new DropCountingListener(super.createServerListener(socket)); + } + + private class DropCountingListener implements ServerListener { + + private final ServerListener decoratedListener; + + private DropCountingListener(final ServerListener decoratedListener) { + this.decoratedListener = decoratedListener; + } + + public RemoteReceiverClient acceptClient() throws IOException, InterruptedException { + return new DropCountingClient(decoratedListener.acceptClient()); + } + + public void close() { + decoratedListener.close(); + } + } + + private class DropCountingClient implements RemoteReceiverClient { + + private final RemoteReceiverClient decoratedClient; + + private DropCountingClient(final RemoteReceiverClient decoratedClient) { + this.decoratedClient = decoratedClient; + } + + public void close() { + closeCounter.countDown(); + decoratedClient.close(); + } + + public void run() { + decoratedClient.run(); + } + + public void setQueue(final BlockingQueue queue) { + decoratedClient.setQueue(queue); + } + + public boolean offer(final Serializable event) { + return decoratedClient.offer(event); + } + + public void setContext(final Context context) { + decoratedClient.setContext(context); + } + + public Context getContext() { + return decoratedClient.getContext(); + } + + public void addStatus(final Status status) { + decoratedClient.addStatus(status); + } + + public void addInfo(final String msg) { + decoratedClient.addInfo(msg); + } + + public void addInfo(final String msg, final Throwable ex) { + decoratedClient.addInfo(msg, ex); + } + + public void addWarn(final String msg) { + decoratedClient.addWarn(msg); + } + + public void addWarn(final String msg, final Throwable ex) { + decoratedClient.addWarn(msg, ex); + } + + public void addError(final String msg) { + decoratedClient.addError(msg); + } + + public void addError(final String msg, final Throwable ex) { + decoratedClient.addError(msg, ex); + } + } +} diff --git a/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ConnectionDropCountingServerSocketReceiver.java b/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ConnectionDropCountingServerSocketReceiver.java new file mode 100644 index 0000000000..3753b3df33 --- /dev/null +++ b/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ConnectionDropCountingServerSocketReceiver.java @@ -0,0 +1,79 @@ +/** + * Logback: the reliable, generic, fast and flexible logging framework. + * Copyright (C) 1999-2014, QOS.ch. All rights reserved. + * + * This program and the accompanying materials are dual-licensed under + * either the terms of the Eclipse Public License v1.0 as published by + * the Eclipse Foundation + * + * or (per the licensee's choosing) + * + * under the terms of the GNU Lesser General Public License version 2.1 + * as published by the Free Software Foundation. + */ +package ch.qos.logback.classic.net.server; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.concurrent.CountDownLatch; + +import ch.qos.logback.classic.LoggerContext; +import ch.qos.logback.core.net.server.ServerListener; + +/** + * Extension of {@link ServerSocketReceiver} which allows to observe how many client connections have been closed. + * + * @author Sebastian Gröbler + */ +public class ConnectionDropCountingServerSocketReceiver extends ServerSocketReceiver { + + protected final CountDownLatch closeCounter; + + public ConnectionDropCountingServerSocketReceiver(final CountDownLatch closeCounter) { + this.closeCounter = closeCounter; + } + + @Override + protected ServerListener createServerListener(final ServerSocket socket) { + return new DropCountingListener(super.createServerListener(socket)); + } + + private class DropCountingListener implements ServerListener { + + private final ServerListener decoratedListener; + + private DropCountingListener(final ServerListener decoratedListener) { + this.decoratedListener = decoratedListener; + } + + public RemoteAppenderClient acceptClient() throws IOException, InterruptedException { + return new DropCountingClient(decoratedListener.acceptClient()); + } + + public void close() { + decoratedListener.close(); + } + } + + private class DropCountingClient implements RemoteAppenderClient { + + private final RemoteAppenderClient decoratedClient; + + private DropCountingClient(final RemoteAppenderClient decoratedClient) { + this.decoratedClient = decoratedClient; + } + + public void setLoggerContext(final LoggerContext lc) { + decoratedClient.setLoggerContext(lc); + } + + public void close() { + closeCounter.countDown(); + decoratedClient.close(); + } + + public void run() { + decoratedClient.run(); + } + } +} diff --git a/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ServerSocketAppenderExecutorServiceIntegrationTest.java b/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ServerSocketAppenderExecutorServiceIntegrationTest.java new file mode 100644 index 0000000000..ed7f503397 --- /dev/null +++ b/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ServerSocketAppenderExecutorServiceIntegrationTest.java @@ -0,0 +1,116 @@ +/** + * Logback: the reliable, generic, fast and flexible logging framework. + * Copyright (C) 1999-2014, QOS.ch. All rights reserved. + * + * This program and the accompanying materials are dual-licensed under + * either the terms of the Eclipse Public License v1.0 as published by + * the Eclipse Foundation + * + * or (per the licensee's choosing) + * + * under the terms of the GNU Lesser General Public License version 2.1 + * as published by the Free Software Foundation. + */ + +package ch.qos.logback.classic.net.server; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import ch.qos.logback.classic.net.SocketReceiver; +import ch.qos.logback.core.Context; +import ch.qos.logback.core.net.AbstractSocketAppender; +import org.junit.After; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +/** + * Implicitly tests configurable max pool size of {@link ch.qos.logback.core.net.server.AbstractServerSocketAppender} and + * that each additional client connection is being dropped. + * + * @author Sebastian Gröbler + */ +public class ServerSocketAppenderExecutorServiceIntegrationTest { + + /** + * Default host for this integration test. + */ + private static final String DEFAULT_HOST = "localhost"; + + /** + * Default port for this integration test. + */ + private static final String DEFAULT_PORT = String.valueOf(AbstractSocketAppender.DEFAULT_PORT); + + /** + * Configurable host for this integration test. + */ + private static final String HOST = System.getProperty("integration.test.host", DEFAULT_HOST); + + /** + * Configurable port for this integration test. + */ + private static final int PORT = Integer.parseInt(System.getProperty("integration.test.port", DEFAULT_PORT)); + + /** + * The timeout for this test in case the expectations are not met. + */ + private static final int TEST_TIMEOUT = 2; + + /** + * The unit of the timeout for this test + */ + private static final TimeUnit TEST_TIMEOUT_UNIT = TimeUnit.SECONDS; + + /** + * The maximum pool size to test. + */ + private final static int MAX_POOL_SIZE = 2; + + /** + * The number of clients which can not connect because of pool size limitation. + */ + private final static int TOO_MANY_CLIENTS = 2; + + /** + * The total of all clients trying to establish a connection. + */ + private final static int CLIENT_COUNT = MAX_POOL_SIZE + TOO_MANY_CLIENTS; + + private final Context context = (Context) LoggerFactory.getILoggerFactory(); + private final CountDownLatch dropCounter = new CountDownLatch(TOO_MANY_CLIENTS); + private final ConnectionDropCountingServerSocketAppender socketAppender = new ConnectionDropCountingServerSocketAppender(dropCounter); + + @Before + public void beforeTest() { + socketAppender.setContext(context); + socketAppender.setAddress(HOST); + socketAppender.setPort(PORT); + socketAppender.setMaxPoolSize(MAX_POOL_SIZE); + socketAppender.start(); + } + + @After + public void afterTest() { + socketAppender.stop(); + } + + @Test + public void dropsClientsWhenMaxPoolSizeIsReached() throws InterruptedException { + + // when + for (int i = 0; i < CLIENT_COUNT; i++) { + final SocketReceiver socketReceiver = new SocketReceiver(); + socketReceiver.setContext(context); + socketReceiver.setRemoteHost(HOST); + socketReceiver.setPort(PORT); + socketReceiver.start(); + } + + // then + final boolean allTooManyClientsGotDroppedBeforeTimeout = dropCounter.await(TEST_TIMEOUT, TEST_TIMEOUT_UNIT); + assertTrue(allTooManyClientsGotDroppedBeforeTimeout); + } +} diff --git a/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ServerSocketReceiverExecutorServiceIntegrationTest.java b/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ServerSocketReceiverExecutorServiceIntegrationTest.java new file mode 100644 index 0000000000..0a75c5a547 --- /dev/null +++ b/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ServerSocketReceiverExecutorServiceIntegrationTest.java @@ -0,0 +1,115 @@ +/** + * Logback: the reliable, generic, fast and flexible logging framework. + * Copyright (C) 1999-2014, QOS.ch. All rights reserved. + * + * This program and the accompanying materials are dual-licensed under + * either the terms of the Eclipse Public License v1.0 as published by + * the Eclipse Foundation + * + * or (per the licensee's choosing) + * + * under the terms of the GNU Lesser General Public License version 2.1 + * as published by the Free Software Foundation. + */ + +package ch.qos.logback.classic.net.server; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import ch.qos.logback.classic.net.SocketAppender; +import ch.qos.logback.core.Context; +import ch.qos.logback.core.net.AbstractSocketAppender; +import org.junit.After; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.LoggerFactory; + +/** + * Implicitly tests configurable max pool size of {@link ServerSocketReceiver} and + * that each additional client connection is being dropped. + * + * @author Sebastian Gröbler + */ +public class ServerSocketReceiverExecutorServiceIntegrationTest { + + /** + * Default host for this integration test. + */ + private static final String DEFAULT_HOST = "localhost"; + + /** + * Default port for this integration test. + */ + private static final String DEFAULT_PORT = String.valueOf(AbstractSocketAppender.DEFAULT_PORT); + + /** + * Configurable host for this integration test. + */ + private static final String HOST = System.getProperty("integration.test.host", DEFAULT_HOST); + + /** + * Configurable port for this integration test. + */ + private static final int PORT = Integer.parseInt(System.getProperty("integration.test.port", DEFAULT_PORT)); + + /** + * The timeout for this test in case the expectations are not met. + */ + private static final int TEST_TIMEOUT = 2; + + /** + * The unit of the timeout for this test + */ + private static final TimeUnit TEST_TIMEOUT_UNIT = TimeUnit.SECONDS; + + /** + * The maximum pool size to test. + */ + private final static int MAX_POOL_SIZE = 2; + + /** + * The number of clients which can not connect because of pool size limitation. + */ + private final static int TOO_MANY_CLIENTS = 2; + + /** + * The total of all clients trying to establish a connection. + */ + private final static int CLIENT_COUNT = MAX_POOL_SIZE + TOO_MANY_CLIENTS; + + private final Context context = (Context) LoggerFactory.getILoggerFactory(); + private final CountDownLatch dropCounter = new CountDownLatch(TOO_MANY_CLIENTS); + private final ServerSocketReceiver serverSocketReceiver = new ConnectionDropCountingServerSocketReceiver(dropCounter); + + @Before + public void beforeTest() { + serverSocketReceiver.setContext(context); + serverSocketReceiver.setAddress(HOST); + serverSocketReceiver.setPort(PORT); + serverSocketReceiver.setMaxPoolSize(MAX_POOL_SIZE); + serverSocketReceiver.start(); + } + + @After + public void afterTest() { + serverSocketReceiver.stop(); + } + + @Test + public void dropsClientsWhenMaxPoolSizeIsReached() throws InterruptedException { + // when + for (int i = 0; i < CLIENT_COUNT; i++) { + final SocketAppender socketAppender = new SocketAppender(); + socketAppender.setContext(context); + socketAppender.setRemoteHost(HOST); + socketAppender.setPort(PORT); + socketAppender.start(); + } + + // then + final boolean allTooManyClientsGotDroppedBeforeTimeout = dropCounter.await(TEST_TIMEOUT, TEST_TIMEOUT_UNIT); + assertTrue(allTooManyClientsGotDroppedBeforeTimeout); + } +} diff --git a/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ServerSocketReceiverTest.java b/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ServerSocketReceiverTest.java index 4fc7bc4413..a5f15953b3 100644 --- a/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ServerSocketReceiverTest.java +++ b/logback-classic/src/test/java/ch/qos/logback/classic/net/server/ServerSocketReceiverTest.java @@ -13,30 +13,43 @@ */ package ch.qos.logback.classic.net.server; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.assertTrue; - import java.io.IOException; import java.net.ServerSocket; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - +import ch.qos.logback.core.CoreConstants; import ch.qos.logback.core.net.mock.MockContext; import ch.qos.logback.core.net.server.MockServerListener; import ch.qos.logback.core.net.server.MockServerRunner; +import ch.qos.logback.core.net.server.ServerListener; import ch.qos.logback.core.net.server.ServerSocketUtil; import ch.qos.logback.core.status.ErrorStatus; import ch.qos.logback.core.status.Status; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.hamcrest.core.IsNull.nullValue; +import org.junit.After; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; /** * Unit tests for {@link ServerSocketReceiver}. * * @author Carl Harris + * @author Sebastian Gröbler */ public class ServerSocketReceiverTest { @@ -102,4 +115,69 @@ public void testStopWhenNotStarted() throws Exception { assertEquals(0, runner.getStartCount()); } + @Test + public void providesDefaultValueForMaxPoolSize() throws Exception { + assertThat(receiver.getMaxPoolSize(), is(CoreConstants.MAX_POOL_SIZE)); + } + + @Test + public void providesDefaultValueForCorePoolSize() throws Exception { + assertThat(receiver.getCorePoolSize(), is(CoreConstants.CORE_POOL_SIZE)); + } + + @Test + public void allowsCustomValueForMaxPoolSize() throws Exception { + final int customValue = 128; + receiver.setMaxPoolSize(customValue); + + assertThat(receiver.getMaxPoolSize(), is(customValue)); + } + + @Test + public void allowsCustomValueForCorePoolSize() throws Exception { + final int customValue = 128; + receiver.setCorePoolSize(128); + + assertThat(receiver.getCorePoolSize(), is(customValue)); + } + + @Test + public void stopShutsDownConnectionPoolExecutorServiceWhenPresent() { + final ExecutorService executorService = mock(ExecutorService.class); + + receiver.start(); + receiver.connectionPoolExecutorService = executorService; + receiver.stop(); + + verify(executorService).shutdownNow(); + assertThat(receiver.connectionPoolExecutorService, is(nullValue())); + } + + @Test + public void testShouldStartUsesConnectionPoolExecutorService() { + + // given + final ServerSocketReceiver serverSocketReceiver = spy(new ServerSocketReceiver()); + final int corePoolSize = 21; + final int maxPoolSize = 42; + serverSocketReceiver.setCorePoolSize(corePoolSize); + serverSocketReceiver.setMaxPoolSize(maxPoolSize); + + // when + serverSocketReceiver.shouldStart(); + + // then + final ArgumentCaptor captor = ArgumentCaptor.forClass(Executor.class); + + verify(serverSocketReceiver).createServerRunner(any(ServerListener.class), captor.capture()); + + final Executor executor = captor.getValue(); + + assertThat(executor, instanceOf(ExecutorService.class)); + + final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; + + assertThat(threadPoolExecutor.getCorePoolSize(), is(corePoolSize)); + assertThat(threadPoolExecutor.getMaximumPoolSize(), is(maxPoolSize)); + } } diff --git a/logback-core/src/main/java/ch/qos/logback/core/CoreConstants.java b/logback-core/src/main/java/ch/qos/logback/core/CoreConstants.java index dc345574ac..dad868258d 100644 --- a/logback-core/src/main/java/ch/qos/logback/core/CoreConstants.java +++ b/logback-core/src/main/java/ch/qos/logback/core/CoreConstants.java @@ -13,16 +13,12 @@ */ package ch.qos.logback.core; -import ch.qos.logback.core.util.EnvUtil; - public class CoreConstants { /** * Number of idle threads to retain in a context's executor service. */ - // CORE_POOL_SIZE must be 1 for JDK 1.5. For JDK 1.6 or higher it's set to 0 - // so that there are no idle threads - public static final int CORE_POOL_SIZE = EnvUtil.isJDK5() ? 1 : 0; + public static final int CORE_POOL_SIZE = 0; /** * Maximum number of threads to allow in a context's executor service. diff --git a/logback-core/src/main/java/ch/qos/logback/core/net/server/AbstractServerSocketAppender.java b/logback-core/src/main/java/ch/qos/logback/core/net/server/AbstractServerSocketAppender.java index d6e1bc1777..ae667b5cf9 100644 --- a/logback-core/src/main/java/ch/qos/logback/core/net/server/AbstractServerSocketAppender.java +++ b/logback-core/src/main/java/ch/qos/logback/core/net/server/AbstractServerSocketAppender.java @@ -19,10 +19,15 @@ import java.net.ServerSocket; import java.net.UnknownHostException; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import javax.net.ServerSocketFactory; import ch.qos.logback.core.AppenderBase; +import ch.qos.logback.core.CoreConstants; import ch.qos.logback.core.net.AbstractSocketAppender; import ch.qos.logback.core.spi.PreSerializationTransformer; @@ -32,6 +37,7 @@ * implementations can derive from. * * @author Carl Harris + * @author Sebastian Gröbler */ public abstract class AbstractServerSocketAppender extends AppenderBase { @@ -53,6 +59,11 @@ public abstract class AbstractServerSocketAppender extends AppenderBase { private ServerRunner runner; + private int corePoolSize = CoreConstants.CORE_POOL_SIZE; + private int maxPoolSize = CoreConstants.MAX_POOL_SIZE; + protected ExecutorService connectionPoolExecutorService; + + @Override public void start() { if (isStarted()) return; @@ -61,7 +72,7 @@ public void start() { getPort(), getBacklog(), getInetAddress()); ServerListener listener = createServerListener(socket); - runner = createServerRunner(listener, getContext().getExecutorService()); + runner = createServerRunner(listener, getConnectionPoolExecutorService()); runner.setContext(getContext()); getContext().getExecutorService().execute(runner); super.start(); @@ -91,6 +102,8 @@ public void stop() { } catch (IOException ex) { addError("server shutdown error: " + ex, ex); + } finally { + shutDownExecutorService(); } } @@ -106,6 +119,26 @@ public void visit(RemoteReceiverClient client) { }); } + private synchronized void shutDownExecutorService() { + connectionPoolExecutorService.shutdownNow(); + connectionPoolExecutorService = null; + } + + private ExecutorService getConnectionPoolExecutorService() { + if (connectionPoolExecutorService == null) { + synchronized (this) { + if (connectionPoolExecutorService == null) { + connectionPoolExecutorService = new ThreadPoolExecutor( + getCorePoolSize(), + getMaxPoolSize(), + 0L, TimeUnit.MILLISECONDS, + new SynchronousQueue()); + } + } + } + return connectionPoolExecutorService; + } + /** * Post process an event received via {@link #append(E)}. * @param event @@ -214,5 +247,38 @@ public void setClientQueueSize(int clientQueueSize) { this.clientQueueSize = clientQueueSize; } + /** + * Gets the core pool size for the socket client connection pool. + * The default value is {@link CoreConstants#CORE_POOL_SIZE}. + * @return the core pool size + */ + public int getCorePoolSize() { + return corePoolSize; + } + + /** + * Sets the core number of threads for the socket client connection pool. + * @param corePoolSize the core pool size + */ + public void setCorePoolSize(int corePoolSize) { + this.corePoolSize = corePoolSize; + } + + /** + * Gets the maximum pool size for the socket client connection pool. + * The default value is {@link CoreConstants#MAX_POOL_SIZE}. + * @return the maximum pool size + */ + public int getMaxPoolSize() { + return maxPoolSize; + } + + /** + * * Sets the maximum allowed number of threads for the socket client connection pool. + * @param maxPoolSize the maximum pool size + */ + public void setMaxPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + } } diff --git a/logback-core/src/main/java/ch/qos/logback/core/net/server/RemoteReceiverClient.java b/logback-core/src/main/java/ch/qos/logback/core/net/server/RemoteReceiverClient.java index aba637a23b..d57b8ef618 100644 --- a/logback-core/src/main/java/ch/qos/logback/core/net/server/RemoteReceiverClient.java +++ b/logback-core/src/main/java/ch/qos/logback/core/net/server/RemoteReceiverClient.java @@ -25,7 +25,7 @@ * * @author Carl Harris */ -interface RemoteReceiverClient extends Client, ContextAware { +public interface RemoteReceiverClient extends Client, ContextAware { /** * Sets the client's event queue. diff --git a/logback-core/src/test/java/ch/qos/logback/core/net/server/AbstractServerSocketAppenderTest.java b/logback-core/src/test/java/ch/qos/logback/core/net/server/AbstractServerSocketAppenderTest.java index 8878075bad..3c27827eb1 100644 --- a/logback-core/src/test/java/ch/qos/logback/core/net/server/AbstractServerSocketAppenderTest.java +++ b/logback-core/src/test/java/ch/qos/logback/core/net/server/AbstractServerSocketAppenderTest.java @@ -13,27 +13,40 @@ */ package ch.qos.logback.core.net.server; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +import ch.qos.logback.core.CoreConstants; +import ch.qos.logback.core.net.mock.MockContext; +import ch.qos.logback.core.spi.PreSerializationTransformer; +import ch.qos.logback.core.status.ErrorStatus; +import ch.qos.logback.core.status.Status; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.hamcrest.core.IsNull.nullValue; +import org.junit.After; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.net.ServerSocket; - -import org.junit.After; import org.junit.Before; import org.junit.Test; - -import ch.qos.logback.core.net.mock.MockContext; -import ch.qos.logback.core.status.ErrorStatus; -import ch.qos.logback.core.status.Status; +import org.mockito.ArgumentCaptor; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; /** * Unit tests for {@link AbstractServerSocketAppender}. * * @author Carl Harris + * @author Sebastian Gröbler */ public class AbstractServerSocketAppenderTest { @@ -100,4 +113,86 @@ public void testStopWhenNotStarted() throws Exception { assertEquals(0, runner.getStartCount()); } + @Test + public void providesDefaultValueForMaxPoolSize() throws Exception { + assertThat(appender.getMaxPoolSize(), is(CoreConstants.MAX_POOL_SIZE)); + } + + @Test + public void providesDefaultValueForCorePoolSize() throws Exception { + assertThat(appender.getCorePoolSize(), is(CoreConstants.CORE_POOL_SIZE)); + } + + @Test + public void allowsCustomValueForMaxPoolSize() throws Exception { + final int customValue = 128; + appender.setMaxPoolSize(customValue); + + assertThat(appender.getMaxPoolSize(), is(customValue)); + } + + @Test + public void allowsCustomValueForCorePoolSize() throws Exception { + final int customValue = 128; + appender.setCorePoolSize(128); + + assertThat(appender.getCorePoolSize(), is(customValue)); + } + + @Test + public void stopShutsDownConnectionPoolExecutorServiceWhenPresent() { + final ExecutorService executorService = mock(ExecutorService.class); + + appender.start(); + appender.connectionPoolExecutorService = executorService; + appender.stop(); + + verify(executorService).shutdownNow(); + assertThat(appender.connectionPoolExecutorService, is(nullValue())); + } + + @Test + public void testStartUsesConnectionPoolExecutorService() { + + // given + final AbstractServerSocketAppender serverSocketAppender = spy(new MockServerSocketAppender()); + final int corePoolSize = 21; + final int maxPoolSize = 42; + serverSocketAppender.setCorePoolSize(corePoolSize); + serverSocketAppender.setMaxPoolSize(maxPoolSize); + + // when + serverSocketAppender.start(); + + // then + final ArgumentCaptor captor = ArgumentCaptor.forClass(Executor.class); + + verify(serverSocketAppender).createServerRunner(any(ServerListener.class), captor.capture()); + + final Executor executor = captor.getValue(); + + assertThat(executor, instanceOf(ExecutorService.class)); + + final ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor; + + assertThat(threadPoolExecutor.getCorePoolSize(), is(corePoolSize)); + assertThat(threadPoolExecutor.getMaximumPoolSize(), is(maxPoolSize)); + } + + /** + * Simple NOP implementation of abstract methods of {@link AbstractServerSocketAppender}. + */ + private static class MockServerSocketAppender extends AbstractServerSocketAppender { + + @Override + protected void postProcessEvent(Object event) { + // NOP + } + + @Override + protected PreSerializationTransformer getPST() { + final PreSerializationTransformer pst = mock(PreSerializationTransformer.class); + return pst; + } + } } diff --git a/logback-site/src/site/pages/manual/appenders.html b/logback-site/src/site/pages/manual/appenders.html index b55380ffda..234b8a6169 100755 --- a/logback-site/src/site/pages/manual/appenders.html +++ b/logback-site/src/site/pages/manual/appenders.html @@ -2042,6 +2042,18 @@

the appender, as described in Using SSL. + + corePoolSize + int + The minimum number of workers to keep alive for handling connections. The + default value is 0. + + + + maxPoolSize + int + The maximum number of workers for handling connections. The default is 32. +

The following example illustrates a configuration that uses diff --git a/logback-site/src/site/pages/manual/receivers.html b/logback-site/src/site/pages/manual/receivers.html index a37518213c..44344c64b5 100755 --- a/logback-site/src/site/pages/manual/receivers.html +++ b/logback-site/src/site/pages/manual/receivers.html @@ -171,6 +171,18 @@

Receivers the receiver, as described in Using SSL. + + corePoolSize + int + The minimum number of workers to keep alive for handling connections. The + default is 1 for JDK 1.5 and 0 for JDK 1.6 or higher. + + + + maxPoolSize + int + The maximum number of workers for handling connections. The default is 32. +

Using diff --git a/pom.xml b/pom.xml index 949ec850a5..260db4a79f 100755 --- a/pom.xml +++ b/pom.xml @@ -52,6 +52,7 @@ logback-classic/pom.xml and in setClasspath.cmd --> 1.7.7 4.10 + 1.9.5 1.4 2.6.1 2.0.7 @@ -85,6 +86,12 @@ ${junit.version} test + + org.mockito + mockito-core + ${mockito.version} + test + org.easytesting fest-assert