Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quickfixj-core/src/main/java/quickfix/SocketAcceptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void stop(boolean forceDisconnect) {
stopSessionTimer();
} finally {
try {
eventHandlingStrategy.stopHandlingMessages();
eventHandlingStrategy.stopHandlingMessages(true);
} finally {
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
Expand Down
2 changes: 1 addition & 1 deletion quickfixj-core/src/main/java/quickfix/SocketInitiator.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void stop(boolean forceDisconnect) {
stopInitiators();
} finally {
try {
eventHandlingStrategy.stopHandlingMessages();
eventHandlingStrategy.stopHandlingMessages(true);
} finally {
Session.unregisterSessions(getSessions(), true);
clearConnectorSessions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,14 @@ protected void stopSessionTimer() {
}
}

// visible for testing
boolean checkSessionTimerRunning() {
if ( sessionTimerFuture != null ) {
return !sessionTimerFuture.isDone();
}
return false;
}

protected ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ private void block() {
sessionConnector.stopSessionTimer();
// reset the stoptime
stopTime = 0;
return;
}
return;
}
}
try {
Expand Down Expand Up @@ -167,16 +167,27 @@ private synchronized void startHandlingMessages() {
isStopped = false;
}

/**
* Stops processing of messages without waiting for message processing
* thread to finish.
*
* It is advised to call stopHandlingMessages(true) instead of this method.
*/
public synchronized void stopHandlingMessages() {
for (Session session : sessionConnector.getSessionMap().values()) {
onMessage(session, END_OF_STREAM);
}
isStopped = true;
}

/**
* Stops processing of messages and optionally waits for message processing
* thread to finish.
*
* @param join true to wait for thread to finish
*/
public void stopHandlingMessages(boolean join) {
stopHandlingMessages();
messageProcessingThread.interrupt();
if (join) {
try {
messageProcessingThread.join();
Expand Down Expand Up @@ -227,12 +238,6 @@ public void start() {
executor.execute(wrapper);
}

public void interrupt() {
if (executor instanceof DedicatedThreadExecutor) {
((DedicatedThreadExecutor)executor).interrupt();
}
}

/**
* Provides the Thread::join and Thread::isAlive semantics on the nested Runnable.
*/
Expand Down Expand Up @@ -290,12 +295,6 @@ public void execute(Runnable command) {
thread.setDaemon(true);
thread.start();
}

public void interrupt() {
if (thread != null) {
thread.interrupt();
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,59 +113,58 @@ public void stopDispatcherThreads() {
}
}

/**
* A stand-in for the Thread class that delegates to an Executor.
* Implements all the API required by pre-existing QFJ code.
*/
protected static abstract class ThreadAdapter implements Runnable {

private final Executor executor;
private final String name;

public ThreadAdapter(String name, Executor executor) {
this.name = name;
this.executor = executor != null ? executor : new DedicatedThreadExecutor(name);
}

public void start() {
executor.execute(this);
}

@Override
public final void run() {
Thread currentThread = Thread.currentThread();
String threadName = currentThread.getName();
try {
if (!name.equals(threadName)) {
currentThread.setName(name + " (" + threadName + ")");
}
doRun();
} finally {
currentThread.setName(threadName);
}
}

abstract void doRun();

/**
* An Executor that uses it's own dedicated Thread.
* Provides equivalent behavior to the prior non-Executor approach.
*/
static final class DedicatedThreadExecutor implements Executor {

private final String name;

DedicatedThreadExecutor(String name) {
this.name = name;
}

@Override
public void execute(Runnable command) {
new Thread(command, name).start();
}

}
/**
* A stand-in for the Thread class that delegates to an Executor.
* Implements all the API required by pre-existing QFJ code.
*/
protected static abstract class ThreadAdapter implements Runnable {

private final Executor executor;
private final String name;

public ThreadAdapter(String name, Executor executor) {
this.name = name;
this.executor = executor != null ? executor : new DedicatedThreadExecutor(name);
}

public void start() {
executor.execute(this);
}

@Override
public final void run() {
Thread currentThread = Thread.currentThread();
String threadName = currentThread.getName();
try {
if (!name.equals(threadName)) {
currentThread.setName(name + " (" + threadName + ")");
}
doRun();
} finally {
currentThread.setName(threadName);
}
}

abstract void doRun();

/**
* An Executor that uses its own dedicated Thread. Provides equivalent
* behavior to the prior non-Executor approach.
*/
static final class DedicatedThreadExecutor implements Executor {

private final String name;

DedicatedThreadExecutor(String name) {
this.name = name;
}

@Override
public void execute(Runnable command) {
new Thread(command, name).start();
}

}
}

protected class MessageDispatchingThread extends ThreadAdapter {
Expand Down Expand Up @@ -226,7 +225,7 @@ void doRun() {
}
}
if (!messages.isEmpty()) {
final List<Message> tempList = new ArrayList<>();
final List<Message> tempList = new ArrayList<>(messages.size());
queueTracker.drainTo(tempList);
for (Message message : tempList) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.junit.AfterClass;
import static org.junit.Assert.assertTrue;

/**
*
Expand Down Expand Up @@ -149,6 +150,23 @@ public void testMultipleStartStop() throws Exception {
}
}

/**
* During quick restarts: make sure that session timer is started and not stopped via
* block() method called from Message Processor thread.
*/
@Test
public void testMultipleStartSessionTimer() throws Exception {
SessionSettings settings = new SessionSettings();
SessionConnector connector = new SessionConnectorUnderTest(settings, sessionFactory);
ehs = new SingleThreadedEventHandlingStrategy(connector, 1000);
for (int i = 0; i < 1000; i++) {
connector.startSessionTimer();
ehs.blockInThread();
assertTrue(connector.checkSessionTimerRunning());
ehs.stopHandlingMessages(true);
}
}

@Test
public void shouldCleanUpAcceptorQFJMessageProcessorThreadAfterInterrupt() throws Exception {
assertQFJMessageProcessorThreads(0);
Expand Down