Skip to content

Commit

Permalink
Patched classes provided with ticket that fixes #895
Browse files Browse the repository at this point in the history
  • Loading branch information
Guido Medina committed Jun 23, 2016
1 parent 3aa1a46 commit a72d3ef
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 62 deletions.
Expand Up @@ -81,8 +81,8 @@ public void stop() {
}

public void stop(boolean forceDisconnect) {
stopInitiators();
logoutAllSessions(forceDisconnect);
stopSessionTimer();
if (!forceDisconnect) {

This comment has been minimized.

Copy link
@hemantsingh011

hemantsingh011 Apr 4, 2017

As we are are calling stopInitiators() which in results invoke stopSessionTimer, so calling logoutAllSessions() after that will not work as it results that logout message was not sent.
Can you please check?

This comment has been minimized.

Copy link
@chrjohn

chrjohn Apr 4, 2017

Member
waitForLogout();
}
Expand Down
40 changes: 24 additions & 16 deletions quickfixj-core/src/main/java/quickfix/mina/AbstractIoHandler.java
Expand Up @@ -19,24 +19,19 @@

package quickfix.mina;

import static quickfix.MessageUtils.parse;

import java.io.IOException;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.mina.filter.codec.ProtocolDecoderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import quickfix.InvalidMessage;
import quickfix.Message;
import quickfix.MessageUtils;
import quickfix.Session;
import quickfix.SessionID;
import quickfix.*;
import quickfix.field.MsgType;

import java.io.IOException;

import static quickfix.MessageUtils.parse;

/**
* Abstract class used for acceptor and initiator IO handlers.
*/
Expand All @@ -57,6 +52,15 @@ public void exceptionCaught(IoSession ioSession, Throwable cause) throws Excepti
Throwable realCause = cause;
if (cause instanceof ProtocolDecoderException && cause.getCause() != null) {
realCause = cause.getCause();
} else {
Throwable chain = cause;
while (chain != null && chain.getCause() != null) {
chain = chain.getCause();
if (chain instanceof IOException) {
realCause = chain;
break;
}
}
}
String reason;
if (realCause instanceof IOException) {
Expand All @@ -75,11 +79,15 @@ public void exceptionCaught(IoSession ioSession, Throwable cause) throws Excepti
reason = cause.toString();
}
if (disconnectNeeded) {
if (quickFixSession != null) {
quickFixSession.disconnect(reason, true);
} else {
log.error(reason, cause);
ioSession.closeNow();
try {
if (quickFixSession != null) {
quickFixSession.disconnect(reason, true);
} else {
log.error(reason, cause);
ioSession.closeNow();
}
} finally {
ioSession.setAttribute("QFJ_RESET_IO_CONNECTOR", Boolean.TRUE);
}
} else {
log.error(reason, cause);
Expand All @@ -97,7 +105,7 @@ public void sessionClosed(IoSession ioSession) throws Exception {
try {
Session quickFixSession = findQFSession(ioSession);
if (quickFixSession != null) {
eventHandlingStrategy.onMessage(quickFixSession, EventHandlingStrategy.END_OF_STREAM );
eventHandlingStrategy.onMessage(quickFixSession, EventHandlingStrategy.END_OF_STREAM);
ioSession.removeAttribute(SessionConnector.QF_SESSION);
}
ioSession.closeNow();
Expand Down
Expand Up @@ -19,16 +19,15 @@

package quickfix.mina;

import java.io.IOException;
import java.net.SocketAddress;
import quickfix.Session;

import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import quickfix.Responder;
import quickfix.Session;

import java.io.IOException;
import java.net.SocketAddress;

/**
* The class that partially integrates the QuickFIX/J Session to
Expand Down Expand Up @@ -85,6 +84,7 @@ public void disconnect() {
// close event from being processed by this thread (if
// this thread is the MINA IO processor thread.
ioSession.closeNow();
ioSession.setAttribute("QFJ_RESET_IO_CONNECTOR", Boolean.TRUE);
}

private void waitForScheduleMessagesToBeWritten() {
Expand Down
Expand Up @@ -19,25 +19,13 @@

package quickfix.mina.initiator;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLContext;

import org.apache.mina.core.filterchain.IoFilterChainBuilder;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import quickfix.ConfigError;
import quickfix.LogUtil;
import quickfix.Session;
Expand All @@ -52,6 +40,16 @@
import quickfix.mina.ssl.SSLFilter;
import quickfix.mina.ssl.SSLSupport;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class IoSessionInitiator {
private final static long CONNECT_POLL_TIMEOUT = 2000L;
private final ScheduledExecutorService executor;
Expand Down Expand Up @@ -80,13 +78,16 @@ public IoSessionInitiator(Session fixSession, SocketAddress[] socketAddresses, S
}

private static class ConnectTask implements Runnable {
private final boolean sslEnabled;
private final SocketAddress[] socketAddresses;
private final SocketAddress localAddress;
private final IoConnector ioConnector;
private final IoFilterChainBuilder userIoFilterChainBuilder;
private IoConnector ioConnector;
private final Session fixSession;
private final long[] reconnectIntervalInMillis;
private final NetworkingOptions networkingOptions;
private final EventHandlingStrategy eventHandlingStrategy;
private final SSLConfig sslConfig;
private final InitiatorIoHandler ioHandler;

private IoSession ioSession;
private long lastReconnectAttemptTime;
Expand All @@ -99,31 +100,40 @@ public ConnectTask(boolean sslEnabled, SocketAddress[] socketAddresses,
SocketAddress localAddress, IoFilterChainBuilder userIoFilterChainBuilder, Session fixSession,
long[] reconnectIntervalInMillis, NetworkingOptions networkingOptions,
EventHandlingStrategy eventHandlingStrategy, SSLConfig sslConfig) throws ConfigError, GeneralSecurityException {
this.sslEnabled = sslEnabled;
this.socketAddresses = socketAddresses;
this.localAddress = localAddress;
this.userIoFilterChainBuilder = userIoFilterChainBuilder;
this.fixSession = fixSession;
this.reconnectIntervalInMillis = reconnectIntervalInMillis;
this.networkingOptions = networkingOptions;
this.eventHandlingStrategy = eventHandlingStrategy;
this.sslConfig = sslConfig;
ioConnector = ProtocolFactory.createIoConnector(socketAddresses[0]);
CompositeIoFilterChainBuilder ioFilterChainBuilder = new CompositeIoFilterChainBuilder(
userIoFilterChainBuilder);
setupIoConnector();
}

private void setupIoConnector() throws ConfigError, GeneralSecurityException {
final IoConnector newConnector = ProtocolFactory.createIoConnector(socketAddresses[0]);
final CompositeIoFilterChainBuilder ioFilterChainBuilder = new CompositeIoFilterChainBuilder(userIoFilterChainBuilder);

if (sslEnabled) {
installSslFilter(ioFilterChainBuilder);
}

ioFilterChainBuilder.addLast(FIXProtocolCodecFactory.FILTER_NAME,
new ProtocolCodecFilter(new FIXProtocolCodecFactory()));
ioFilterChainBuilder.addLast(FIXProtocolCodecFactory.FILTER_NAME, new ProtocolCodecFilter(new FIXProtocolCodecFactory()));

ioConnector.setFilterChainBuilder(ioFilterChainBuilder);
ioHandler = new InitiatorIoHandler(fixSession, networkingOptions,
eventHandlingStrategy);
newConnector.setFilterChainBuilder(ioFilterChainBuilder);
newConnector.setHandler(new InitiatorIoHandler(fixSession, networkingOptions, eventHandlingStrategy));
if (ioConnector != null) {
ioConnector.dispose();
}
ioConnector = newConnector;
}

private void installSslFilter(CompositeIoFilterChainBuilder ioFilterChainBuilder)
throws GeneralSecurityException {
SSLContext sslContext = SSLContextFactory.getInstance(sslConfig);
SSLFilter sslFilter = new SSLFilter(sslContext);
final SSLContext sslContext = SSLContextFactory.getInstance(sslConfig);
final SSLFilter sslFilter = new SSLFilter(sslContext);
sslFilter.setUseClientMode(true);
sslFilter.setCipherSuites(sslConfig.getEnabledCipherSuites() != null ? sslConfig.getEnabledCipherSuites()
: SSLSupport.getDefaultCipherSuites(sslContext));
Expand All @@ -133,20 +143,24 @@ private void installSslFilter(CompositeIoFilterChainBuilder ioFilterChainBuilder
}

public synchronized void run() {
if (connectFuture == null) {
if (shouldReconnect()) {
connect();
resetIoConnector();
try {
if (connectFuture == null) {
if (shouldReconnect()) {
connect();
}
} else {
pollConnectFuture();
}
} else {
pollConnectFuture();
} catch (Throwable e) {
LogUtil.logThrowable(fixSession.getLog(), "Exception during ConnectTask run", e);
}
}

private void connect() {
lastReconnectAttemptTime = SystemTime.currentTimeMillis();
SocketAddress nextSocketAddress = getNextSocketAddress();
ioConnector.setHandler(ioHandler);
try {
lastReconnectAttemptTime = SystemTime.currentTimeMillis();
SocketAddress nextSocketAddress = getNextSocketAddress();
if (localAddress == null) {
connectFuture = ioConnector.connect(nextSocketAddress);
} else {
Expand Down Expand Up @@ -220,8 +234,7 @@ private void unresolveCurrentSocketAddress(SocketAddress socketAddress) {
}

private int getCurrentSocketAddressIndex() {
int currentSocketAddressIndex = (nextSocketAddressIndex + socketAddresses.length - 1) % socketAddresses.length;
return currentSocketAddressIndex;
return (nextSocketAddressIndex + socketAddresses.length - 1) % socketAddresses.length;
}

private boolean shouldReconnect() {
Expand Down Expand Up @@ -266,6 +279,22 @@ public synchronized long getLastConnectTime() {
public Session getFixSession() {
return fixSession;
}

private void resetIoConnector() {
if (ioSession != null && Boolean.TRUE.equals(ioSession.getAttribute("QFJ_RESET_IO_CONNECTOR"))) {
try {
setupIoConnector();
log.info("[" + fixSession.getSessionID() + "] - reset IoConnector");
if (connectFuture != null) {
connectFuture.cancel();
}
connectFuture = null;
ioSession = null;
} catch (Throwable e) {
log.error("[" + fixSession.getSessionID() + "] - Exception during resetIoConnector call", e);
}
}
}
}

synchronized void start() {
Expand Down
Expand Up @@ -19,14 +19,13 @@

package quickfix.mina;

import static org.mockito.Mockito.*;
import junit.framework.TestCase;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;

import java.net.InetSocketAddress;

import junit.framework.TestCase;

import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.future.WriteFuture;
import static org.mockito.Mockito.*;

public class IoSessionResponderTest extends TestCase {
public void testAsynchronousSend() throws Exception {
Expand Down Expand Up @@ -107,9 +106,10 @@ public void testDisconnect() throws Exception {

verify(mockProtocolSession).getScheduledWriteMessages();
verify(mockProtocolSession).closeNow();
verify(mockProtocolSession).setAttribute("QFJ_RESET_IO_CONNECTOR", Boolean.TRUE);

verifyNoMoreInteractions(mockProtocolSession);
}
}

public void testGetRemoteSocketAddress() throws Exception {
IoSession mockProtocolSession = mock(IoSession.class);
Expand Down

0 comments on commit a72d3ef

Please sign in to comment.