Skip to content

Commit

Permalink
ongoing work
Browse files Browse the repository at this point in the history
  • Loading branch information
ceki committed May 7, 2013
1 parent 82960a7 commit 703e62c
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 153 deletions.
Expand Up @@ -19,6 +19,7 @@
import ch.qos.logback.core.net.ssl.ConfigurableSSLSocketFactory;
import ch.qos.logback.core.net.ssl.SSLComponent;
import ch.qos.logback.core.net.ssl.SSLConfiguration;
import ch.qos.logback.core.net.ssl.SSLConnectionRunner;
import ch.qos.logback.core.net.ssl.SSLParametersConfiguration;

/**
Expand All @@ -30,7 +31,9 @@
public abstract class AbstractSSLSocketAppender<E> extends AbstractSocketAppender<E>
implements SSLComponent {

private SSLConfiguration ssl;

private SSLConfiguration sslConfiguration;

private SocketFactory socketFactory;

/**
Expand All @@ -39,44 +42,17 @@ public abstract class AbstractSSLSocketAppender<E> extends AbstractSocketAppende
protected AbstractSSLSocketAppender() {
}

/**
* Constructs a new appender that will connect to the given remote host
* and port.
* <p>
* This constructor was introduced primarily to allow the encapsulation
* of the base {@link AbstractSocketAppender} to be improved in a manner that
* is least disruptive to <em>existing</em> subclasses. <strong>This
* constructor will be removed in future release</strong>.
* @param remoteHost target remote host
* @param port target port on remote host
*/
@Deprecated
protected AbstractSSLSocketAppender(String remoteHost, int port) {
super(remoteHost, port);
}

/**
* Gets an {@link SocketFactory} that produces SSL sockets using an
* {@link SSLContext} that is derived from the appender's configuration.
* @return socket factory
*/
@Override
protected SocketFactory getSocketFactory() {
return socketFactory;
}

/**
* {@inheritDoc}
*/
@Override
public void start() {
try {
SSLContext sslContext = getSsl().createContext(this);
SSLParametersConfiguration parameters = getSsl().getParameters();
parameters.setContext(getContext());
socketFactory = new ConfigurableSSLSocketFactory(parameters,
sslContext.getSocketFactory());
if(isStarted()) {
new SSLConnectionRunner(context, getRemoteHost(), getPort(), getSsl());
}
super.start();

}
catch (Exception ex) {
addError(ex.getMessage(), ex);
Expand All @@ -89,18 +65,18 @@ public void start() {
* default configuration is returned
*/
public SSLConfiguration getSsl() {
if (ssl == null) {
ssl = new SSLConfiguration();
if (sslConfiguration == null) {
sslConfiguration = new SSLConfiguration();
}
return ssl;
return sslConfiguration;
}

/**
* Sets the SSL configuration.
* @param ssl the SSL configuration to set
*/
public void setSsl(SSLConfiguration ssl) {
this.ssl = ssl;
this.sslConfiguration = ssl;
}

}
Expand Up @@ -53,11 +53,6 @@ public abstract class AbstractSocketAppender<E> extends AppenderBase<E>
*/
public static final int DEFAULT_PORT = 4560;

/**
* The default reconnection delay (30000 milliseconds or 30 seconds).
*/
public static final int DEFAULT_RECONNECTION_DELAY = 30000;

/**
* Default size of the queue used to hold logging events that are destined
* for the remote peer.
Expand All @@ -72,18 +67,18 @@ public abstract class AbstractSocketAppender<E> extends AppenderBase<E>

private String remoteHost;
private int port = DEFAULT_PORT;
private InetAddress address;
private int reconnectionDelay = DEFAULT_RECONNECTION_DELAY;

private int queueSize = DEFAULT_QUEUE_SIZE;
private int acceptConnectionTimeout = DEFAULT_ACCEPT_CONNECTION_DELAY;

private BlockingQueue<E> queue;
private String peerId;
private Future<?> task;
private Future<Socket> connectorTask;


private volatile Socket socket;

private ConnectionRunner connectionRunner;

/**
* Constructs a new appender.
*/
Expand Down Expand Up @@ -133,18 +128,10 @@ public void start() {
addError("Queue size must be non-negative");
}

if (errorCount == 0) {
try {
address = InetAddress.getByName(remoteHost);
} catch (UnknownHostException ex) {
addError("unknown host: " + remoteHost);
errorCount++;
}
}

if (errorCount == 0) {
queue = newBlockingQueue(queueSize);
peerId = "remote peer " + remoteHost + ":" + port + ": ";
connectionRunner = new ConnectionRunner(context, remoteHost, port);
task = getContext().getExecutorService().submit(this);
super.start();
}
Expand All @@ -158,8 +145,7 @@ public void stop() {
if (!isStarted()) return;
CloseUtil.closeQuietly(socket);
task.cancel(true);
if(connectorTask != null)
connectorTask.cancel(true);
connectionRunner.stop();
super.stop();
}

Expand All @@ -178,14 +164,7 @@ protected void append(E event) {
public final void run() {
try {
while (!Thread.currentThread().isInterrupted()) {
SocketConnector connector = createConnector(address, port, 0,
reconnectionDelay);

connectorTask = activateConnector(connector);
if(connectorTask == null)
break;

socket = waitForConnectorToReturnASocket();
socket = connectionRunner.connect();
if(socket == null)
break;
dispatchEvents();
Expand All @@ -196,39 +175,15 @@ public final void run() {
addInfo("shutting down");
}

private SocketConnector createConnector(InetAddress address, int port,
int initialDelay, int retryDelay) {
SocketConnector connector = newConnector(address, port, initialDelay,
retryDelay);
connector.setExceptionHandler(this);
connector.setSocketFactory(getSocketFactory());
return connector;
}

private Future<Socket> activateConnector(SocketConnector connector) {
try {
return getContext().getExecutorService().submit(connector);
} catch (RejectedExecutionException ex) {
return null;
}
}

private Socket waitForConnectorToReturnASocket() throws InterruptedException {
try {
Socket s = connectorTask.get();
connectorTask = null;
return s;
} catch (ExecutionException e) {
return null;
}
}

private void dispatchEvents() throws InterruptedException {

try {
socket.setSoTimeout(acceptConnectionTimeout);
ObjectOutputStream oos = new ObjectOutputStream(socket.getOutputStream());
socket.setSoTimeout(0);
addInfo(peerId + "connection established");
addInfo(connectionRunner.getPeerId() + "connection established");
int counter = 0;
while (true) {
E event = queue.take();
Expand All @@ -244,57 +199,15 @@ private void dispatchEvents() throws InterruptedException {
}
}
} catch (IOException ex) {
addInfo(peerId + "connection failed: " + ex);
addInfo(connectionRunner.getPeerId() + " connection failed: " + ex);
} finally {
CloseUtil.closeQuietly(socket);
socket = null;
addInfo(peerId + "connection closed");
addInfo(connectionRunner.getPeerId() + "connection closed");
}
}

/**
* {@inheritDoc}
*/
public void connectionFailed(SocketConnector connector, Exception ex) {
if (ex instanceof InterruptedException) {
addInfo("connector interrupted");
} else if (ex instanceof ConnectException) {
addInfo(peerId + "connection refused");
} else {
addInfo(peerId + ex);
}
}



/**
* Creates a new {@link SocketConnector}.
* <p>
* The default implementation creates an instance of {@link DefaultSocketConnector}.
* A subclass may override to provide a different {@link SocketConnector}
* implementation.
*
* @param address target remote address
* @param port target remote port
* @param initialDelay delay before the first connection attempt
* @param retryDelay delay before a reconnection attempt
* @return socket connector
*/
protected SocketConnector newConnector(InetAddress address,
int port, int initialDelay, int retryDelay) {
return new DefaultSocketConnector(address, port, initialDelay, retryDelay);
}

/**
* Gets the default {@link SocketFactory} for the platform.
* <p>
* Subclasses may override to provide a custom socket factory.
*/
protected SocketFactory getSocketFactory() {
return SocketFactory.getDefault();
}

/**
/**
* Creates a blocking queue that will be used to hold logging events until
* they can be delivered to the remote receiver.
* <p>
Expand Down Expand Up @@ -370,25 +283,7 @@ public int getPort() {
return port;
}

/**
* The <b>reconnectionDelay</b> property takes a positive integer representing
* the number of milliseconds to wait between each failed connection attempt
* to the server. The default value of this option is 30000 which corresponds
* to 30 seconds.
*
* <p>
* Setting this option to zero turns off reconnection capability.
*/
public void setReconnectionDelay(int delay) {
this.reconnectionDelay = delay;
}

/**
* Returns value of the <b>reconnectionDelay</b> property.
*/
public int getReconnectionDelay() {
return reconnectionDelay;
}

/**
* The <b>queueSize</b> property takes a non-negative integer representing
Expand Down

0 comments on commit 703e62c

Please sign in to comment.