diff --git a/quickfixj-core/src/main/java/quickfix/Connector.java b/quickfixj-core/src/main/java/quickfix/Connector.java index 186425153b..e40ee983b7 100644 --- a/quickfixj-core/src/main/java/quickfix/Connector.java +++ b/quickfixj-core/src/main/java/quickfix/Connector.java @@ -41,7 +41,9 @@ public interface Connector { * connections. * This method must not be called by several threads concurrently. */ - void stop(); + default void stop() { + stop(false); + } /** * Stops all sessions, optionally waiting for logout completion. diff --git a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java index ca27ebcdfe..8fdc4e88ed 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/SocketAcceptor.java @@ -106,34 +106,28 @@ private void initialize() throws ConfigError { if (isStarted.equals(Boolean.FALSE)) { eventHandlingStrategy.setExecutor(longLivedExecutor); startAcceptingConnections(); - isStarted = Boolean.TRUE; eventHandlingStrategy.blockInThread(); + isStarted = Boolean.TRUE; } else { log.warn("Ignored attempt to start already running SocketAcceptor."); } } - @Override - public void stop() { - stop(false); - } - @Override public void stop(boolean forceDisconnect) { if (isStarted.equals(Boolean.TRUE)) { try { - try { - logoutAllSessions(forceDisconnect); - stopAcceptingConnections(); - } catch (ConfigError e) { - log.error("Error when stopping acceptor.", e); - } + logoutAllSessions(forceDisconnect); + stopAcceptingConnections(); stopSessionTimer(); } finally { - eventHandlingStrategy.stopHandlingMessages(); - Session.unregisterSessions(getSessions(), true); - clearConnectorSessions(); - isStarted = Boolean.FALSE; + try { + eventHandlingStrategy.stopHandlingMessages(); + } finally { + Session.unregisterSessions(getSessions(), true); + clearConnectorSessions(); + isStarted = Boolean.FALSE; + } } } } diff --git a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java index 5ca2d628e0..bf24294f42 100644 --- a/quickfixj-core/src/main/java/quickfix/SocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/SocketInitiator.java @@ -110,27 +110,7 @@ public SocketInitiator(SessionFactory sessionFactory, SessionSettings settings, public void start() throws ConfigError, RuntimeError { initialize(); } - - @Override - public void stop() { - stop(false); - } - - @Override - public void stop(boolean forceDisconnect) { - if (isStarted.equals(Boolean.TRUE)) { - try { - logoutAllSessions(forceDisconnect); - stopInitiators(); - } finally { - eventHandlingStrategy.stopHandlingMessages(); - Session.unregisterSessions(getSessions(), true); - clearConnectorSessions(); - isStarted = Boolean.FALSE; - } - } - } - + private void initialize() throws ConfigError { if (isStarted.equals(Boolean.FALSE)) { eventHandlingStrategy.setExecutor(longLivedExecutor); @@ -139,13 +119,31 @@ private void initialize() throws ConfigError { Session.registerSession(session); } startInitiators(); - isStarted = Boolean.TRUE; eventHandlingStrategy.blockInThread(); + isStarted = Boolean.TRUE; } else { log.warn("Ignored attempt to start already running SocketInitiator."); } } + @Override + public void stop(boolean forceDisconnect) { + if (isStarted.equals(Boolean.TRUE)) { + try { + logoutAllSessions(forceDisconnect); + stopInitiators(); + } finally { + try { + eventHandlingStrategy.stopHandlingMessages(); + } finally { + Session.unregisterSessions(getSessions(), true); + clearConnectorSessions(); + isStarted = Boolean.FALSE; + } + } + } + } + @Override protected EventHandlingStrategy getEventHandlingStrategy() { return eventHandlingStrategy; diff --git a/quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java index 8f35bb8010..a1190bb585 100644 --- a/quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java @@ -97,32 +97,22 @@ public ThreadedSocketAcceptor(SessionFactory sessionFactory, SessionSettings set eventHandlingStrategy = new ThreadPerSessionEventHandlingStrategy(this, DEFAULT_QUEUE_CAPACITY); } + @Override public void start() throws ConfigError, RuntimeError { eventHandlingStrategy.setExecutor(longLivedExecutor); startAcceptingConnections(); } - public void stop() { - stop(false); - } - + @Override public void stop(boolean forceDisconnect) { - try { - logoutAllSessions(forceDisconnect); - stopAcceptingConnections(); - } catch (ConfigError e) { - log.error("Error when stopping acceptor.", e); - } + logoutAllSessions(forceDisconnect); + stopAcceptingConnections(); stopSessionTimer(); eventHandlingStrategy.stopDispatcherThreads(); Session.unregisterSessions(getSessions(), true); clearConnectorSessions(); } - public void block() throws ConfigError, RuntimeError { - throw new UnsupportedOperationException("Blocking not supported: " + getClass()); - } - @Override protected EventHandlingStrategy getEventHandlingStrategy() { return eventHandlingStrategy; diff --git a/quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java b/quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java index bf42ce20a2..a97d55c2c8 100644 --- a/quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java +++ b/quickfixj-core/src/main/java/quickfix/ThreadedSocketInitiator.java @@ -99,16 +99,14 @@ public ThreadedSocketInitiator(SessionFactory sessionFactory, SessionSettings se eventHandlingStrategy = new ThreadPerSessionEventHandlingStrategy(this, DEFAULT_QUEUE_CAPACITY); } + @Override public void start() throws ConfigError, RuntimeError { eventHandlingStrategy.setExecutor(longLivedExecutor); createSessionInitiators(); startInitiators(); } - public void stop() { - stop(false); - } - + @Override public void stop(boolean forceDisconnect) { logoutAllSessions(forceDisconnect); stopInitiators(); @@ -117,10 +115,6 @@ public void stop(boolean forceDisconnect) { clearConnectorSessions(); } - public void block() throws ConfigError, RuntimeError { - throw new UnsupportedOperationException("Blocking not supported: " + getClass()); - } - @Override protected EventHandlingStrategy getEventHandlingStrategy() { return eventHandlingStrategy; diff --git a/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java b/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java index eeeae51b0e..e53ee09f3e 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java +++ b/quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java @@ -86,27 +86,27 @@ public SessionConnector(SessionSettings settings, SessionFactory sessionFactory) } } - /** - *
- * Supplies the Executors to be used for all message processing and timer activities. This will override the default - * behavior which uses internally created Threads. This enables scenarios such as a ResourceAdapter to supply the - * WorkManager (when adapted to the Executor API) so that all Application call-backs occur on container managed - * threads. - *
- *- * If using external Executors, this method should be called immediately after the constructor. Once set, the - * Executors cannot be changed. - *
- * - * @param executorFactory See {@link ExecutorFactory} for detailed requirements. - */ - public void setExecutorFactory(ExecutorFactory executorFactory) { - if (longLivedExecutor != null || shortLivedExecutor!=null) { - throw new IllegalStateException("Optional ExecutorFactory has already been set. It cannot be changed once set."); - } - longLivedExecutor = executorFactory.getLongLivedExecutor(); - shortLivedExecutor = executorFactory.getShortLivedExecutor(); - } + /** + *+ * Supplies the Executors to be used for all message processing and timer activities. This will override the default + * behavior which uses internally created Threads. This enables scenarios such as a ResourceAdapter to supply the + * WorkManager (when adapted to the Executor API) so that all Application call-backs occur on container managed + * threads. + *
+ *+ * If using external Executors, this method should be called immediately after the constructor. Once set, the + * Executors cannot be changed. + *
+ * + * @param executorFactory See {@link ExecutorFactory} for detailed requirements. + */ + public void setExecutorFactory(ExecutorFactory executorFactory) { + if (longLivedExecutor != null || shortLivedExecutor != null) { + throw new IllegalStateException("Optional ExecutorFactory has already been set. It cannot be changed once set."); + } + longLivedExecutor = executorFactory.getLongLivedExecutor(); + shortLivedExecutor = executorFactory.getShortLivedExecutor(); + } public void addPropertyChangeListener(PropertyChangeListener listener) { propertyChangeSupport.addPropertyChangeListener(listener); @@ -312,11 +312,11 @@ private String getLogSuffix(SessionID sessionID, IoSession protocolSession) { } protected void startSessionTimer() { - Runnable timerTask = new SessionTimerTask(); - if (shortLivedExecutor != null) { - timerTask = new DelegatingTask(timerTask, shortLivedExecutor); - } - sessionTimerFuture = scheduledExecutorService.scheduleAtFixedRate(timerTask, 0, 1000L, + Runnable timerTask = new SessionTimerTask(); + if (shortLivedExecutor != null) { + timerTask = new DelegatingTask(timerTask, shortLivedExecutor); + } + sessionTimerFuture = scheduledExecutorService.scheduleAtFixedRate(timerTask, 0, 1000L, TimeUnit.MILLISECONDS); log.info("SessionTimer started"); } @@ -352,54 +352,52 @@ public void run() { * Delegates QFJ Timer Task to an Executor and blocks the QFJ Timer Thread until * the Task execution completes. */ - static final class DelegatingTask implements Runnable { - - private final BlockingSupportTask delegate; - private final Executor executor; - - DelegatingTask(Runnable delegate, Executor executor) { - this.delegate = new BlockingSupportTask(delegate); - this.executor = executor; - } - - @Override - public void run() { - executor.execute(delegate); - try { - delegate.await(); - } catch (InterruptedException e) { - } - } - - static final class BlockingSupportTask implements Runnable { - - private final CountDownLatch latch = new CountDownLatch(1); - private final Runnable delegate; - - BlockingSupportTask(Runnable delegate) { - this.delegate = delegate; - } - - @Override - public void run() { - Thread currentThread = Thread.currentThread(); - String threadName = currentThread.getName(); - try { - currentThread.setName("QFJ Timer (" + threadName + ")"); - delegate.run(); - } finally { - latch.countDown(); - currentThread.setName(threadName); - } - } - - void await() throws InterruptedException { - latch.await(); - } - - } - - } + static final class DelegatingTask implements Runnable { + + private final BlockingSupportTask delegate; + private final Executor executor; + + DelegatingTask(Runnable delegate, Executor executor) { + this.delegate = new BlockingSupportTask(delegate); + this.executor = executor; + } + + @Override + public void run() { + executor.execute(delegate); + try { + delegate.await(); + } catch (InterruptedException e) { + } + } + + static final class BlockingSupportTask implements Runnable { + + private final CountDownLatch latch = new CountDownLatch(1); + private final Runnable delegate; + + BlockingSupportTask(Runnable delegate) { + this.delegate = delegate; + } + + @Override + public void run() { + Thread currentThread = Thread.currentThread(); + String threadName = currentThread.getName(); + try { + currentThread.setName("QFJ Timer (" + threadName + ")"); + delegate.run(); + } finally { + latch.countDown(); + currentThread.setName(threadName); + } + } + + void await() throws InterruptedException { + latch.await(); + } + } + } private static class QFTimerThreadFactory implements ThreadFactory { diff --git a/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java b/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java index c78e19fe3e..a949bffe37 100644 --- a/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java +++ b/quickfixj-core/src/main/java/quickfix/mina/acceptor/AbstractSocketAcceptor.java @@ -242,7 +242,7 @@ private void createSessions(SessionSettings settings) throws ConfigError, FieldC } } - protected void stopAcceptingConnections() throws ConfigError { + protected void stopAcceptingConnections() { Iterator