Skip to content

Commit

Permalink
Issue real-logic#503: SessionProxy.sendSequenceReset() not called
Browse files Browse the repository at this point in the history
The test also tries to reproduce an inversion of SendSequenceReset and ResendRequest
messages, but it is not reproduced (it happens only when a SessionProxy causes the
ResendRequest to be sent asynchronously).
  • Loading branch information
pcdv committed Mar 15, 2024
1 parent e1961a0 commit ceb4702
Show file tree
Hide file tree
Showing 4 changed files with 423 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package uk.co.real_logic.artio.system_tests;

import org.agrona.ErrorHandler;
import org.agrona.concurrent.EpochNanoClock;
import org.junit.Test;
import uk.co.real_logic.artio.dictionary.generation.Exceptions;
import uk.co.real_logic.artio.engine.EngineConfiguration;
import uk.co.real_logic.artio.engine.FixEngine;
import uk.co.real_logic.artio.fields.EpochFractionFormat;
import uk.co.real_logic.artio.library.LibraryConfiguration;
import uk.co.real_logic.artio.protocol.GatewayPublication;
import uk.co.real_logic.artio.session.DirectSessionProxy;
import uk.co.real_logic.artio.session.SessionCustomisationStrategy;
import uk.co.real_logic.artio.session.SessionIdStrategy;
import uk.co.real_logic.artio.session.SessionProxy;
import uk.co.real_logic.artio.util.DebugFIXClient;
import uk.co.real_logic.artio.util.DebugServer;

import java.io.IOException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver;
import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.ACCEPTOR_ID;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.INITIATOR_ID;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingConfig;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingLibraryConfig;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.connect;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingConfig;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingLibraryConfig;

/**
* Try reproducing race between sent ResendRequest and ResetSequence message when both
* parties request a resend. Also checks that SessionProxy is invoked when a ResetSequence
* message must be sent.
*/
public class RaceResendResetTest extends AbstractGatewayToGatewaySystemTest {

private boolean sendResendRequestCalled;
private boolean sendSequenceResetCalled;
private boolean useProxy;

private void launch() {
mediaDriver = launchMediaDriver();
launchAccepting();
launchInitiating();
testSystem = new TestSystem(acceptingLibrary, initiatingLibrary);
}

private void launchInitiating() {
final EngineConfiguration initiatingConfig =
initiatingConfig(libraryAeronPort, nanoClock)
.deleteLogFileDirOnStart(true)
.initialAcceptedSessionOwner(SOLE_LIBRARY);
initiatingEngine = FixEngine.launch(initiatingConfig);
LibraryConfiguration lib = initiatingLibraryConfig(libraryAeronPort, initiatingHandler, nanoClock);
if (useProxy)
lib.sessionProxyFactory(this::sessionProxyFactory);
initiatingLibrary = connect(lib
);
}

private void launchAccepting() {
final EngineConfiguration acceptingConfig = acceptingConfig(port, ACCEPTOR_ID, INITIATOR_ID, nanoClock)
.deleteLogFileDirOnStart(true)
.initialAcceptedSessionOwner(SOLE_LIBRARY);
acceptingEngine = FixEngine.launch(acceptingConfig);

final LibraryConfiguration acceptingLibraryConfig = acceptingLibraryConfig(acceptingHandler, nanoClock);
acceptingLibrary = connect(acceptingLibraryConfig);
}

/**
* Sanity check that we can connect Artio to a debug server with canned messages.
*/
@Test
public void testDebugServer() throws IOException {
DebugServer srv = new DebugServer(port);
srv.setWaitForData(true);
srv.addFIXResponse("8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=1|52=20240315-10:52:24.098|98=0|108=10|141=N|35002=0|35003=0|10=024|");
srv.start();

mediaDriver = launchMediaDriver();
launchInitiating();
testSystem = new TestSystem(initiatingLibrary);
connectAndAcquire();
}

private SessionProxy sessionProxyFactory(
final int sessionBufferSize,
final GatewayPublication gatewayPublication,
final SessionIdStrategy sessionIdStrategy,
final SessionCustomisationStrategy customisationStrategy,
final EpochNanoClock clock,
final long connectionId,
final int libraryId,
final ErrorHandler errorHandler,
final EpochFractionFormat epochFractionPrecision) {
return new DirectSessionProxy(sessionBufferSize, gatewayPublication, sessionIdStrategy, customisationStrategy,
clock, connectionId, libraryId, errorHandler, epochFractionPrecision) {
@Override
public long sendResendRequest(int msgSeqNo, int beginSeqNo, int endSeqNo, int sequenceIndex, int lastMsgSeqNumProcessed) {
sendResendRequestCalled = true;
// try {
// Thread.sleep(10);
// } catch (InterruptedException ignored) {
// }
return super.sendResendRequest(msgSeqNo, beginSeqNo, endSeqNo, sequenceIndex, lastMsgSeqNumProcessed);
}

@Override
public long sendSequenceReset(int msgSeqNo, int newSeqNo, int sequenceIndex, int lastMsgSeqNumProcessed) {
sendSequenceResetCalled = true;
return super.sendSequenceReset(msgSeqNo, newSeqNo, sequenceIndex, lastMsgSeqNumProcessed);
}
};
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void shouldNotInvertResendAndReset() throws Exception {
useProxy = false;
reconnectTest();
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void shouldCallProxySendSequenceReset() throws Exception {
useProxy = true;
reconnectTest();
}

private void reconnectTest() throws Exception {
launch();

connectAndAcquire();

messagesCanBeExchanged();

disconnectSessions();
Exceptions.closeAll(this::closeAcceptingEngine);

assertEquals(3, acceptingSession.lastReceivedMsgSeqNum());
assertEquals(3, initiatingSession.lastReceivedMsgSeqNum());

DebugServer srv = new DebugServer(port);
srv.setWaitForData(true);
srv.addFIXResponse("8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=5|52=***|98=0|108=10|141=N|35002=0|35003=0|10=024|");
srv.addFIXResponse("8=FIX.4.4|9=94|35=2|49=acceptor|56=initiator|34=6|52=***|7=4|16=0|10=024|");
srv.start();

connectPersistentSessions(4, 4, false);

DebugFIXClient exchange = new DebugFIXClient(srv.popClient(5000));
exchange.popAndAssert("35=A 34=4");
exchange.popAndAssert("35=2 34=5 7=4 16=0");
exchange.popAndAssert("35=4 34=4 36=6");

exchange.close();
srv.stop();
Exceptions.closeAll(this::closeInitiatingEngine, mediaDriver);

if (useProxy) {
assertTrue("SessionProxy.sendResendRequest() not called", sendResendRequestCalled);
assertTrue("SessionProxy.sendSequenceReset() not called", sendSequenceResetCalled);
}
}

private void connectAndAcquire() {
connectSessions();
acceptingSession = acceptingHandler.lastSession();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package uk.co.real_logic.artio.util;

import org.junit.Assert;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Scanner;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* Helper to pop FIX messages received on a socket.
*
* @see DebugServer
*/
public class DebugFIXClient {
private final DebugServer.HasIOStream io;
private final Thread thread;

private final BlockingQueue<Map<String, String>> messages = new LinkedBlockingQueue<>();
private volatile boolean disposed;
private String prefix = " <<< ";

public DebugFIXClient(DebugServer.HasIOStream io) {
this.io = Objects.requireNonNull(io);
thread = new Thread(this::run, "DebugFIXClient");
thread.start();
}

public void close() throws Exception {
disposed = true;
io.in.close();
io.in.close();
thread.interrupt();
thread.join();
}

private void run() {
StringBuilder s = new StringBuilder(128);
while (!disposed) {
Scanner scanner = new Scanner(io.in).useDelimiter("\u0001");
Map<String, String> msg = new HashMap<>();
while (scanner.hasNext()) {
String fld = scanner.next();
s.append(fld).append('|');
int eq = fld.indexOf('=');
String tag = fld.substring(0, eq);
msg.put(tag, fld.substring(eq + 1));
if (tag.equals("10")) {
messages.add(msg);
msg = new HashMap<>();
System.out.println(prefix + s);
s.setLength(0);
}
}
}
}

public Map<String, String> popMessage() throws InterruptedException {
return messages.poll(5, TimeUnit.SECONDS);
}

public void popAndAssert(String tagValues) throws InterruptedException {
Map<String, String> map = popMessage();
for (String rule : tagValues.split(" ")) {
String tag = rule.substring(0, rule.indexOf('='));
if (map == null)
throw new AssertionError("No message received");
String value = map.get(tag);
Assert.assertEquals(rule, tag + "=" + value);
}
}

public void setPrefix(String prefix) {
this.prefix = prefix;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package uk.co.real_logic.artio.util;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
* A server that accepts TCP connections and is able to reply automatically with canned
* data. It can be used to simulate a FIX server in order to quickly sent specific messages.
*/
public class DebugServer {

private final int port;
private final Queue<byte[]> connectResponses;
private final BlockingQueue<HasIOStream> clients;
private final ServerSocket serverSocket;

/**
* If true, wait until some data is received before sending prepared messages.
*/
private boolean waitForData;

/**
* Creates a debug server listening on specified port.
*/
public DebugServer(int port) throws IOException {
this.port = port;
this.connectResponses = new ConcurrentLinkedQueue<>();
this.clients = new LinkedBlockingQueue<>();
this.serverSocket = new ServerSocket(port);
}

/**
* Adds a message that must be directly sent to connecting clients. Messages
* are sent in the same order they were added.
*/
public void addConnectResponse(byte[] message) {
connectResponses.add(message);
}

/**
* Warning: causes problems because SendingTime and checksum needs to be regenerated
* and they are not.
*/
public void addFIXResponse(String msg) {
addConnectResponse(FixMessageTweak.recycle(msg));
}

/**
* Starts the debug server, accepting incoming connections and sending
* prepared data.
*/
public void start() throws IOException {
new Thread("DebugServer-" + port) {
@Override
public void run() {
try {
while (!serverSocket.isClosed()) {
Socket s = serverSocket.accept();
System.out.println("Connection accepted from " + s.getInetAddress());
try {
BufferedInputStream in = new BufferedInputStream(s.getInputStream());
BufferedOutputStream out = new BufferedOutputStream(s.getOutputStream());

if (!connectResponses.isEmpty() && waitForData) {
in.mark(0);
in.read();
in.reset();
}

HasIOStream client = new HasIOStream(in, out);
sendResponses(client.out);
clients.add(client);
} catch (IOException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
if (!serverSocket.isClosed())
e.printStackTrace();
}
}
}.start();
}

public void stop() throws IOException {
serverSocket.close();
}

/**
* Sends prepared data to the client.
*/
private void sendResponses(OutputStream outputStream) throws IOException {
for (byte[] response : connectResponses) {
outputStream.write(response);
outputStream.flush();
}
}

public HasIOStream popClient(long timeoutMs) throws InterruptedException {
return clients.poll(timeoutMs, TimeUnit.MILLISECONDS);
}

public int getPort() {
return port;
}

public void setWaitForData(boolean waitForData) {
this.waitForData = waitForData;
}

public static class HasIOStream {

public final InputStream in;
public final OutputStream out;

public HasIOStream(InputStream in, OutputStream out) {
this.in = in;
this.out = out;
}
}
}
Loading

0 comments on commit ceb4702

Please sign in to comment.