Skip to content

Commit

Permalink
Merge pull request #121 from qos-ch/callableConnector
Browse files Browse the repository at this point in the history
SocketConnector is now a Callable, all tests pass
  • Loading branch information
ceki committed May 3, 2013
2 parents 2db3b0f + 15adfd3 commit 530e965
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 165 deletions.
Expand Up @@ -20,6 +20,8 @@
import java.net.InetAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;

import javax.net.SocketFactory;
Expand Down Expand Up @@ -51,6 +53,7 @@ public class SocketReceiver extends ReceiverBase

private String receiverId;
private volatile Socket socket;
private Future<Socket> connectorTask;

/**
* {@inheritDoc}
Expand Down Expand Up @@ -109,24 +112,51 @@ protected Runnable getRunnableTask() {
public void run() {
try {
LoggerContext lc = (LoggerContext) getContext();
SocketConnector connector = createConnector(address, port, 0,
reconnectionDelay);
while (!Thread.currentThread().isInterrupted()) {
try {
getContext().getExecutorService().execute(connector);
} catch (RejectedExecutionException ex) {
break; // executor is shutting down...
}
socket = connector.awaitConnection();
SocketConnector connector = createConnector(address, port, 0,
reconnectionDelay);
connectorTask = activateConnector(connector);
if (connectorTask == null)
break;
socket = waitForConnectorToReturnASocket();
if (socket == null)
break;
dispatchEvents(lc);
connector = createConnector(address, port, reconnectionDelay);
}
} catch (InterruptedException ex) {
assert true; // ok... we'll exit now
}
addInfo("shutting down");
}

private SocketConnector createConnector(InetAddress address, int port,
int initialDelay, int retryDelay) {
SocketConnector connector = newConnector(address, port, initialDelay,
retryDelay);
connector.setExceptionHandler(this);
connector.setSocketFactory(getSocketFactory());
return connector;
}


private Future<Socket> activateConnector(SocketConnector connector) {
try {
return getContext().getExecutorService().submit(connector);
} catch (RejectedExecutionException ex) {
return null;
}
}

private Socket waitForConnectorToReturnASocket() throws InterruptedException {
try {
Socket s = connectorTask.get();
connectorTask = null;
return s;
} catch (ExecutionException e) {
return null;
}
}

private void dispatchEvents(LoggerContext lc) {
try {
socket.setSoTimeout(acceptConnectionTimeout);
Expand Down Expand Up @@ -166,19 +196,6 @@ public void connectionFailed(SocketConnector connector, Exception ex) {
}
}

private SocketConnector createConnector(InetAddress address, int port,
int delay) {
return createConnector(address, port, delay, delay);
}

private SocketConnector createConnector(InetAddress address, int port,
int initialDelay, int retryDelay) {
SocketConnector connector = newConnector(address, port, initialDelay,
retryDelay);
connector.setExceptionHandler(this);
connector.setSocketFactory(getSocketFactory());
return connector;
}

protected SocketConnector newConnector(InetAddress address,
int port, int initialDelay, int retryDelay) {
Expand Down
Expand Up @@ -244,14 +244,7 @@ public MockSocketConnector(Socket socket) {
this.socket = socket;
}

public void run() {
}

public Socket awaitConnection() throws InterruptedException {
return awaitConnection(Long.MAX_VALUE);
}

public Socket awaitConnection(long delay) throws InterruptedException {
public Socket call() throws InterruptedException {
return socket;
}

Expand Down
Expand Up @@ -23,10 +23,12 @@
import java.net.UnknownHostException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;


import javax.net.SocketFactory;

import ch.qos.logback.core.AppenderBase;
Expand Down Expand Up @@ -78,7 +80,7 @@ public abstract class AbstractSocketAppender<E> extends AppenderBase<E>
private BlockingQueue<E> queue;
private String peerId;
private Future<?> task;
private Future<?> connectorTask;
private Future<Socket> connectorTask;

private volatile Socket socket;

Expand Down Expand Up @@ -183,16 +185,44 @@ public final void run() {
if(connectorTask == null)
break;

socket = connector.awaitConnection();
connectorTask = null;
socket = waitForConnectorToReturnASocket();
if(socket == null)
break;
dispatchEvents();
}
} catch (InterruptedException ex) {
assert true; // ok... we'll exit now
}
addInfo("shutting down");
}


private SocketConnector createConnector(InetAddress address, int port,
int initialDelay, int retryDelay) {
SocketConnector connector = newConnector(address, port, initialDelay,
retryDelay);
connector.setExceptionHandler(this);
connector.setSocketFactory(getSocketFactory());
return connector;
}

private Future<Socket> activateConnector(SocketConnector connector) {
try {
return getContext().getExecutorService().submit(connector);
} catch (RejectedExecutionException ex) {
return null;
}
}

private Socket waitForConnectorToReturnASocket() throws InterruptedException {
try {
Socket s = connectorTask.get();
connectorTask = null;
return s;
} catch (ExecutionException e) {
return null;
}
}

private void dispatchEvents() throws InterruptedException {
try {
socket.setSoTimeout(acceptConnectionTimeout);
Expand Down Expand Up @@ -235,22 +265,6 @@ public void connectionFailed(SocketConnector connector, Exception ex) {
}
}

private SocketConnector createConnector(InetAddress address, int port,
int initialDelay, int retryDelay) {
SocketConnector connector = newConnector(address, port, initialDelay,
retryDelay);
connector.setExceptionHandler(this);
connector.setSocketFactory(getSocketFactory());
return connector;
}

private Future<?> activateConnector(SocketConnector connector) {
try {
return getContext().getExecutorService().submit(connector);
} catch (RejectedExecutionException ex) {
return null;
}
}


/**
Expand Down
Expand Up @@ -13,19 +13,15 @@
*/
package ch.qos.logback.core.net;

import ch.qos.logback.core.util.DelayStrategy;
import ch.qos.logback.core.util.FixedDelay;

import java.io.IOException;
import java.net.InetAddress;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import javax.net.SocketFactory;

import ch.qos.logback.core.util.DelayStrategy;
import ch.qos.logback.core.util.FixedDelay;

/**
* Default implementation of {@link SocketConnector}.
*
Expand All @@ -34,17 +30,12 @@
*/
public class DefaultSocketConnector implements SocketConnector {


private final Lock lock = new ReentrantLock();
private final Condition connectCondition = lock.newCondition();

private final InetAddress address;
private final int port;
private final DelayStrategy delayStrategy;

private ExceptionHandler exceptionHandler;
private SocketFactory socketFactory;
private DelayStrategy delayStrategy;
private Socket socket;

/**
* Constructs a new connector.
Expand Down Expand Up @@ -75,26 +66,16 @@ public DefaultSocketConnector(InetAddress address, int port,
}

/**
* Loops until the desired connection is established.
* Loops until the desired connection is established and returns the resulting connector.
*/
public void run() {
preventReuse();
inCaseOfMissingFieldsFallbackToDefaults();
try {
while (!Thread.currentThread().isInterrupted()) {
Thread.sleep(delayStrategy.nextDelay());
Socket newSocket = createSocket();
if(newSocket != null) {
socket = newSocket;
signalConnected();
// connection established, we are done
break;
}
}
} catch (InterruptedException ex) {
// we have been interrupted
public Socket call() throws InterruptedException {
useDefaultsForMissingFields();
Socket socket = createSocket();
while (socket == null && !Thread.currentThread().isInterrupted()) {
Thread.sleep(delayStrategy.nextDelay());
socket = createSocket();
}
System.out.println("Exiting connector");
return socket;
}

private Socket createSocket() {
Expand All @@ -107,13 +88,7 @@ private Socket createSocket() {
return newSocket;
}

private void preventReuse() {
if (socket != null) {
throw new IllegalStateException("connector cannot be reused");
}
}

private void inCaseOfMissingFieldsFallbackToDefaults() {
private void useDefaultsForMissingFields() {
if (exceptionHandler == null) {
exceptionHandler = new ConsoleExceptionHandler();
}
Expand All @@ -122,42 +97,6 @@ private void inCaseOfMissingFieldsFallbackToDefaults() {
}
}

/**
* Signals any threads waiting on {@code connectCondition} that the
* connection has been established.
*/
private void signalConnected() {
lock.lock();
try {
connectCondition.signalAll();
} finally {
lock.unlock();
}
}

/**
* {@inheritDoc}
*/
public Socket awaitConnection() throws InterruptedException {
return awaitConnection(Long.MAX_VALUE);
}

/**
* {@inheritDoc}
*/
public Socket awaitConnection(long delay) throws InterruptedException {
lock.lock();
try {
boolean timeout = false;
while (socket == null && !timeout) {
timeout = !connectCondition.await(delay, TimeUnit.MILLISECONDS);
}
return socket;
} finally {
lock.unlock();
}
}

/**
* {@inheritDoc}
*/
Expand Down
Expand Up @@ -14,6 +14,7 @@
package ch.qos.logback.core.net;

import java.net.Socket;
import java.util.concurrent.Callable;

import javax.net.SocketFactory;

Expand All @@ -25,7 +26,7 @@
*
* @author Carl Harris
*/
public interface SocketConnector extends Runnable {
public interface SocketConnector extends Callable<Socket> {

/**
* An exception handler that is notified of all exceptions that occur
Expand All @@ -41,21 +42,12 @@ public interface ExceptionHandler {
* @return the connected socket
* @throws InterruptedException
*/
Socket awaitConnection() throws InterruptedException;
Socket call() throws InterruptedException;

/**
* Blocks the calling thread until a connection is successfully
* established or timeout occurs.
* @param delay the maximum time to wait (in milliseconds)
* @return the connected socket or {@code null} if timeout occurs
* @throws InterruptedException
*/
Socket awaitConnection(long delay) throws InterruptedException;

/**
* Sets the connector's exception handler.
* <p>
* The handler must be set before the {@link #run()} method is invoked.
* The handler must be set before the {@link #call()} method is invoked.
* @param exceptionHandler the handler to set
*/
void setExceptionHandler(ExceptionHandler exceptionHandler);
Expand Down

0 comments on commit 530e965

Please sign in to comment.