Skip to content

Commit

Permalink
connection logic migrated to ConnectionRunner, all tests pass
Browse files Browse the repository at this point in the history
  • Loading branch information
ceki committed May 7, 2013
1 parent 703e62c commit 11249f0
Show file tree
Hide file tree
Showing 15 changed files with 145 additions and 200 deletions.
Expand Up @@ -35,22 +35,6 @@ public class SSLSocketAppender extends AbstractSSLSocketAppender<IAccessEvent> {
public SSLSocketAppender() {
}

/**
* Connects to remote server at <code>address</code> and <code>port</code>.
*/
@Deprecated
public SSLSocketAppender(String host, int port) {
super(host, port);
}

/**
* Connects to remote server at <code>address</code> and <code>port</code>.
*/
@Deprecated
public SSLSocketAppender(InetAddress address, int port) {
super(address.getHostAddress(), port);
}

@Override
protected void postProcessEvent(IAccessEvent event) {
event.prepareForDeferredProcessing();
Expand Down
Expand Up @@ -39,22 +39,6 @@ public class SocketAppender extends AbstractSocketAppender<IAccessEvent> {
public SocketAppender() {
}

/**
* Connects to remote server at <code>address</code> and <code>port</code>.
*/
@Deprecated
public SocketAppender(InetAddress address, int port) {
super(address.getHostAddress(), port);
}

/**
* Connects to remote server at <code>host</code> and <code>port</code>.
*/
@Deprecated
public SocketAppender(String host, int port) {
super(host, port);
}

@Override
protected void postProcessEvent(IAccessEvent event) {
event.prepareForDeferredProcessing();
Expand Down
Expand Up @@ -37,21 +37,6 @@ public class SSLSocketAppender extends AbstractSSLSocketAppender<ILoggingEvent>
public SSLSocketAppender() {
}

/**
* Connects to remote server at <code>address</code> and <code>port</code>.
*/
@Deprecated
public SSLSocketAppender(String host, int port) {
super(host, port);
}

/**
* Connects to remote server at <code>address</code> and <code>port</code>.
*/
@Deprecated
public SSLSocketAppender(InetAddress address, int port) {
super(address.getHostAddress(), port);
}

@Override
protected void postProcessEvent(ILoggingEvent event) {
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.net.ServerSocket;

import ch.qos.logback.core.status.StatusChecker;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -49,6 +50,7 @@ public class ServerSocketReceiverTest {

private ServerSocket serverSocket;
private InstrumentedServerSocketReceiver receiver;
StatusChecker statusChecker = new StatusChecker(context);

@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -88,11 +90,7 @@ public void testStopThrowsException() throws Exception {
runner.setStopException(ex);
receiver.stop();

Status status = context.getLastStatus();
assertNotNull(status);
assertTrue(status instanceof ErrorStatus);
assertTrue(status.getMessage().contains(ex.getMessage()));
assertSame(ex, status.getThrowable());
statusChecker.containsException(IOException.class);
}

@Test
Expand Down
Expand Up @@ -49,7 +49,8 @@ protected AbstractSSLSocketAppender() {
public void start() {
try {
if(isStarted()) {
new SSLConnectionRunner(context, getRemoteHost(), getPort(), getSsl());
super.connectionRunner = new SSLConnectionRunner(this, getRemoteHost(), getPort(),
reconnectionDuration, getSsl());
}
super.start();

Expand Down
Expand Up @@ -35,18 +35,23 @@
import ch.qos.logback.core.CoreConstants;
import ch.qos.logback.core.spi.PreSerializationTransformer;
import ch.qos.logback.core.util.CloseUtil;
import ch.qos.logback.core.util.Duration;

/**
* An abstract base for module specific {@code SocketAppender}
* implementations in other logback modules.
*
*
* @author Ceki G&uuml;lc&uuml;
* @author S&eacute;bastien Pennec
* @author Carl Harris
*/

public abstract class AbstractSocketAppender<E> extends AppenderBase<E>
implements Runnable, SocketConnector.ExceptionHandler {
implements Runnable {

static String NO_PORT_ERROR_URL = CoreConstants.CODES_URL + "#socket_no_port";
static String NO_HOST_ERROR_URL = CoreConstants.CODES_URL + "#socket_no_host";


/**
* The default port number of remote logging server (4560).
Expand All @@ -58,51 +63,58 @@ public abstract class AbstractSocketAppender<E> extends AppenderBase<E>
* for the remote peer.
*/
public static final int DEFAULT_QUEUE_SIZE = 0;

/**
* Default timeout when waiting for the remote server to accept our
* connection.
*/
private static final int DEFAULT_ACCEPT_CONNECTION_DELAY = 5000;

/**
* The default reconnection delay (30000 milliseconds or 30 seconds).
*/
public static final int DEFAULT_RECONNECTION_DELAY = 30000;

private String remoteHost;
private int port = DEFAULT_PORT;

private int queueSize = DEFAULT_QUEUE_SIZE;
private int acceptConnectionTimeout = DEFAULT_ACCEPT_CONNECTION_DELAY;


Duration reconnectionDuration = null;

private BlockingQueue<E> queue;
private Future<?> task;


private volatile Socket socket;

private ConnectionRunner connectionRunner;
ConnectionRunner connectionRunner;

/**
* Constructs a new appender.
*/
protected AbstractSocketAppender() {
}

/**
* Constructs a new appender that will connect to the given remote host
* Constructs a new appender that will connect to the given remote host
* and port.
* <p>
* This constructor was introduced primarily to allow the encapsulation
* of the this class to be improved in a manner that is least disruptive
* to <em>existing</em> subclasses. <strong>This constructor will be
* <p/>
* This constructor was introduced primarily to allow the encapsulation
* of the this class to be improved in a manner that is least disruptive
* to <em>existing</em> subclasses. <strong>This constructor will be
* removed in future release</strong>.
*
*
* @param remoteHost target remote host
* @param port target port on remote host
* @param port target port on remote host
*/
@Deprecated
protected AbstractSocketAppender(String remoteHost, int port) {
this.remoteHost = remoteHost;
this.port = port;
}

/**
* {@inheritDoc}
*/
Expand All @@ -111,27 +123,24 @@ public void start() {
int errorCount = 0;
if (port <= 0) {
errorCount++;
addError("No port was configured for appender"
+ name
+ " For more information, please visit http://logback.qos.ch/codes.html#socket_no_port");
addError("No port was configured for appender [" + name + "]");
addError("For more information, please visit " + NO_PORT_ERROR_URL);
}

if (remoteHost == null) {
errorCount++;
addError("No remote host was configured for appender"
+ name
+ " For more information, please visit http://logback.qos.ch/codes.html#socket_no_host");
addError("No remote host was configured for appender" + name);
addError("For more information, please visit " + NO_HOST_ERROR_URL);
}

if (queueSize < 0) {
errorCount++;
addError("Queue size must be non-negative");
}


if (errorCount == 0) {
queue = newBlockingQueue(queueSize);
connectionRunner = new ConnectionRunner(context, remoteHost, port);
connectionRunner = new ConnectionRunner(this, remoteHost, port, reconnectionDuration);
task = getContext().getExecutorService().submit(this);
super.start();
}
Expand All @@ -154,7 +163,7 @@ public void stop() {
*/
@Override
protected void append(E event) {
if (event == null || !isStarted()) return;
if (event == null || !isStarted()) return;
queue.offer(event);
}

Expand All @@ -165,7 +174,7 @@ public final void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
socket = connectionRunner.connect();
if(socket == null)
if (socket == null)
break;
dispatchEvents();
}
Expand All @@ -176,7 +185,6 @@ public final void run() {
}



private void dispatchEvents() throws InterruptedException {

try {
Expand Down Expand Up @@ -205,37 +213,39 @@ private void dispatchEvents() throws InterruptedException {
socket = null;
addInfo(connectionRunner.getPeerId() + "connection closed");
}
}
/**
}

/**
* Creates a blocking queue that will be used to hold logging events until
* they can be delivered to the remote receiver.
* <p>
* The default implementation creates a (bounded) {@link ArrayBlockingQueue}
* <p/>
* The default implementation creates a (bounded) {@link ArrayBlockingQueue}
* for positive queue sizes. Otherwise it creates a {@link SynchronousQueue}.
* <p>
* <p/>
* This method is exposed primarily to support instrumentation for unit
* testing.
*
*
* @param queueSize size of the queue
* @return
*/
BlockingQueue<E> newBlockingQueue(int queueSize) {
return queueSize <= 0 ?
new SynchronousQueue<E>() : new ArrayBlockingQueue<E>(queueSize);
return queueSize <= 0 ?
new SynchronousQueue<E>() : new ArrayBlockingQueue<E>(queueSize);
}

/**
* Post-processes an event before it is serialized for delivery to the
* remote receiver.
*
* @param event the event to post-process
*/
protected abstract void postProcessEvent(E event);

/**
* Get the pre-serialization transformer that will be used to transform
* each event into a Serializable object before delivery to the remote
* receiver.
*
* @return transformer object
*/
protected abstract PreSerializationTransformer<E> getPST();
Expand Down Expand Up @@ -284,43 +294,45 @@ public int getPort() {
}



/**
* The <b>queueSize</b> property takes a non-negative integer representing
* the number of logging events to retain for delivery to the remote receiver.
* When the queue size is zero, event delivery to the remote receiver is
* synchronous. When the queue size is greater than zero, the
* synchronous. When the queue size is greater than zero, the
* {@link #append(Object)} method returns immediately after enqueing the
* event, assuming that there is space available in the queue. Using a
* event, assuming that there is space available in the queue. Using a
* non-zero queue length can improve performance by eliminating delays
* caused by transient network delays. If the queue is full when the
* {@link #append(Object)} method is called, the event is summarily
* {@link #append(Object)} method is called, the event is summarily
* and silently dropped.
*
*
* @param queueSize the queue size to set.
*/
public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}

/**
* Returns the value of the <b>queueSize</b> property.
*/
public int getQueueSize() {
return queueSize;
}

/**
* Sets the timeout that controls how long we'll wait for the remote
* peer to accept our connection attempt.
* <p>
* <p/>
* This property is configurable primarily to support instrumentation
* for unit testing.
*
*
* @param acceptConnectionTimeout timeout value in milliseconds
*/
void setAcceptConnectionTimeout(int acceptConnectionTimeout) {
this.acceptConnectionTimeout = acceptConnectionTimeout;
}

void setReconnectionDelay(Duration duration) {
this.reconnectionDuration = duration;
}
}

0 comments on commit 11249f0

Please sign in to comment.