Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion quickfixj-core/src/main/java/quickfix/Connector.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public interface Connector {
* connections.
* This method must not be called by several threads concurrently.
*/
void stop();
default void stop() {
stop(false);
}

/**
* Stops all sessions, optionally waiting for logout completion.
Expand Down
26 changes: 10 additions & 16 deletions quickfixj-core/src/main/java/quickfix/SocketAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,34 +106,28 @@ private void initialize() throws ConfigError {
if (isStarted.equals(Boolean.FALSE)) {
eventHandlingStrategy.setExecutor(longLivedExecutor);
startAcceptingConnections();
isStarted = Boolean.TRUE;
eventHandlingStrategy.blockInThread();
isStarted = Boolean.TRUE;
} else {
log.warn("Ignored attempt to start already running SocketAcceptor.");
}
}

@Override
public void stop() {
stop(false);
}

@Override
public void stop(boolean forceDisconnect) {
if (isStarted.equals(Boolean.TRUE)) {
try {
try {
logoutAllSessions(forceDisconnect);
stopAcceptingConnections();
} catch (ConfigError e) {
log.error("Error when stopping acceptor.", e);
}
logoutAllSessions(forceDisconnect);
stopAcceptingConnections();
stopSessionTimer();
} finally {
eventHandlingStrategy.stopHandlingMessages();
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
isStarted = Boolean.FALSE;
try {
eventHandlingStrategy.stopHandlingMessages();
} finally {
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
isStarted = Boolean.FALSE;
}
}
}
}
Expand Down
42 changes: 20 additions & 22 deletions quickfixj-core/src/main/java/quickfix/SocketInitiator.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,27 +110,7 @@ public SocketInitiator(SessionFactory sessionFactory, SessionSettings settings,
public void start() throws ConfigError, RuntimeError {
initialize();
}

@Override
public void stop() {
stop(false);
}

@Override
public void stop(boolean forceDisconnect) {
if (isStarted.equals(Boolean.TRUE)) {
try {
logoutAllSessions(forceDisconnect);
stopInitiators();
} finally {
eventHandlingStrategy.stopHandlingMessages();
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
isStarted = Boolean.FALSE;
}
}
}


private void initialize() throws ConfigError {
if (isStarted.equals(Boolean.FALSE)) {
eventHandlingStrategy.setExecutor(longLivedExecutor);
Expand All @@ -139,13 +119,31 @@ private void initialize() throws ConfigError {
Session.registerSession(session);
}
startInitiators();
isStarted = Boolean.TRUE;
eventHandlingStrategy.blockInThread();
isStarted = Boolean.TRUE;
} else {
log.warn("Ignored attempt to start already running SocketInitiator.");
}
}

@Override
public void stop(boolean forceDisconnect) {
if (isStarted.equals(Boolean.TRUE)) {
try {
logoutAllSessions(forceDisconnect);
stopInitiators();
} finally {
try {
eventHandlingStrategy.stopHandlingMessages();
} finally {
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
isStarted = Boolean.FALSE;
}
}
}
}

@Override
protected EventHandlingStrategy getEventHandlingStrategy() {
return eventHandlingStrategy;
Expand Down
18 changes: 4 additions & 14 deletions quickfixj-core/src/main/java/quickfix/ThreadedSocketAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,32 +97,22 @@ public ThreadedSocketAcceptor(SessionFactory sessionFactory, SessionSettings set
eventHandlingStrategy = new ThreadPerSessionEventHandlingStrategy(this, DEFAULT_QUEUE_CAPACITY);
}

@Override
public void start() throws ConfigError, RuntimeError {
eventHandlingStrategy.setExecutor(longLivedExecutor);
startAcceptingConnections();
}

public void stop() {
stop(false);
}

@Override
public void stop(boolean forceDisconnect) {
try {
logoutAllSessions(forceDisconnect);
stopAcceptingConnections();
} catch (ConfigError e) {
log.error("Error when stopping acceptor.", e);
}
logoutAllSessions(forceDisconnect);
stopAcceptingConnections();
stopSessionTimer();
eventHandlingStrategy.stopDispatcherThreads();
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
}

public void block() throws ConfigError, RuntimeError {
throw new UnsupportedOperationException("Blocking not supported: " + getClass());
}

@Override
protected EventHandlingStrategy getEventHandlingStrategy() {
return eventHandlingStrategy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,14 @@ public ThreadedSocketInitiator(SessionFactory sessionFactory, SessionSettings se
eventHandlingStrategy = new ThreadPerSessionEventHandlingStrategy(this, DEFAULT_QUEUE_CAPACITY);
}

@Override
public void start() throws ConfigError, RuntimeError {
eventHandlingStrategy.setExecutor(longLivedExecutor);
createSessionInitiators();
startInitiators();
}

public void stop() {
stop(false);
}

@Override
public void stop(boolean forceDisconnect) {
logoutAllSessions(forceDisconnect);
stopInitiators();
Expand All @@ -117,10 +115,6 @@ public void stop(boolean forceDisconnect) {
clearConnectorSessions();
}

public void block() throws ConfigError, RuntimeError {
throw new UnsupportedOperationException("Blocking not supported: " + getClass());
}

@Override
protected EventHandlingStrategy getEventHandlingStrategy() {
return eventHandlingStrategy;
Expand Down
146 changes: 72 additions & 74 deletions quickfixj-core/src/main/java/quickfix/mina/SessionConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,27 +86,27 @@ public SessionConnector(SessionSettings settings, SessionFactory sessionFactory)
}
}

/**
* <p>
* Supplies the Executors to be used for all message processing and timer activities. This will override the default
* behavior which uses internally created Threads. This enables scenarios such as a ResourceAdapter to supply the
* WorkManager (when adapted to the Executor API) so that all Application call-backs occur on container managed
* threads.
* </p>
* <p>
* If using external Executors, this method should be called immediately after the constructor. Once set, the
* Executors cannot be changed.
* </p>
*
* @param executorFactory See {@link ExecutorFactory} for detailed requirements.
*/
public void setExecutorFactory(ExecutorFactory executorFactory) {
if (longLivedExecutor != null || shortLivedExecutor!=null) {
throw new IllegalStateException("Optional ExecutorFactory has already been set. It cannot be changed once set.");
}
longLivedExecutor = executorFactory.getLongLivedExecutor();
shortLivedExecutor = executorFactory.getShortLivedExecutor();
}
/**
* <p>
* Supplies the Executors to be used for all message processing and timer activities. This will override the default
* behavior which uses internally created Threads. This enables scenarios such as a ResourceAdapter to supply the
* WorkManager (when adapted to the Executor API) so that all Application call-backs occur on container managed
* threads.
* </p>
* <p>
* If using external Executors, this method should be called immediately after the constructor. Once set, the
* Executors cannot be changed.
* </p>
*
* @param executorFactory See {@link ExecutorFactory} for detailed requirements.
*/
public void setExecutorFactory(ExecutorFactory executorFactory) {
if (longLivedExecutor != null || shortLivedExecutor != null) {
throw new IllegalStateException("Optional ExecutorFactory has already been set. It cannot be changed once set.");
}
longLivedExecutor = executorFactory.getLongLivedExecutor();
shortLivedExecutor = executorFactory.getShortLivedExecutor();
}

public void addPropertyChangeListener(PropertyChangeListener listener) {
propertyChangeSupport.addPropertyChangeListener(listener);
Expand Down Expand Up @@ -312,11 +312,11 @@ private String getLogSuffix(SessionID sessionID, IoSession protocolSession) {
}

protected void startSessionTimer() {
Runnable timerTask = new SessionTimerTask();
if (shortLivedExecutor != null) {
timerTask = new DelegatingTask(timerTask, shortLivedExecutor);
}
sessionTimerFuture = scheduledExecutorService.scheduleAtFixedRate(timerTask, 0, 1000L,
Runnable timerTask = new SessionTimerTask();
if (shortLivedExecutor != null) {
timerTask = new DelegatingTask(timerTask, shortLivedExecutor);
}
sessionTimerFuture = scheduledExecutorService.scheduleAtFixedRate(timerTask, 0, 1000L,
TimeUnit.MILLISECONDS);
log.info("SessionTimer started");
}
Expand Down Expand Up @@ -352,54 +352,52 @@ public void run() {
* Delegates QFJ Timer Task to an Executor and blocks the QFJ Timer Thread until
* the Task execution completes.
*/
static final class DelegatingTask implements Runnable {

private final BlockingSupportTask delegate;
private final Executor executor;

DelegatingTask(Runnable delegate, Executor executor) {
this.delegate = new BlockingSupportTask(delegate);
this.executor = executor;
}

@Override
public void run() {
executor.execute(delegate);
try {
delegate.await();
} catch (InterruptedException e) {
}
}

static final class BlockingSupportTask implements Runnable {

private final CountDownLatch latch = new CountDownLatch(1);
private final Runnable delegate;

BlockingSupportTask(Runnable delegate) {
this.delegate = delegate;
}

@Override
public void run() {
Thread currentThread = Thread.currentThread();
String threadName = currentThread.getName();
try {
currentThread.setName("QFJ Timer (" + threadName + ")");
delegate.run();
} finally {
latch.countDown();
currentThread.setName(threadName);
}
}

void await() throws InterruptedException {
latch.await();
}

}

}
static final class DelegatingTask implements Runnable {

private final BlockingSupportTask delegate;
private final Executor executor;

DelegatingTask(Runnable delegate, Executor executor) {
this.delegate = new BlockingSupportTask(delegate);
this.executor = executor;
}

@Override
public void run() {
executor.execute(delegate);
try {
delegate.await();
} catch (InterruptedException e) {
}
}

static final class BlockingSupportTask implements Runnable {

private final CountDownLatch latch = new CountDownLatch(1);
private final Runnable delegate;

BlockingSupportTask(Runnable delegate) {
this.delegate = delegate;
}

@Override
public void run() {
Thread currentThread = Thread.currentThread();
String threadName = currentThread.getName();
try {
currentThread.setName("QFJ Timer (" + threadName + ")");
delegate.run();
} finally {
latch.countDown();
currentThread.setName(threadName);
}
}

void await() throws InterruptedException {
latch.await();
}
}
}

private static class QFTimerThreadFactory implements ThreadFactory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void createSessions(SessionSettings settings) throws ConfigError, FieldC
}
}

protected void stopAcceptingConnections() throws ConfigError {
protected void stopAcceptingConnections() {
Iterator<IoAcceptor> ioIt = getEndpoints().iterator();
while (ioIt.hasNext()) {
IoAcceptor ioAcceptor = ioIt.next();
Expand Down