diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java index 9dacd3358d..22953bce0d 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/Framer.java @@ -19,17 +19,6 @@ import io.aeron.logbuffer.ControlledFragmentHandler; import io.aeron.logbuffer.ControlledFragmentHandler.Action; import io.aeron.logbuffer.Header; -import org.agrona.DeadlineTimerWheel; -import org.agrona.DirectBuffer; -import org.agrona.ErrorHandler; -import org.agrona.collections.Int2ObjectHashMap; -import org.agrona.collections.Long2LongHashMap; -import org.agrona.collections.Long2LongHashMap.KeyIterator; -import org.agrona.collections.LongHashSet; -import org.agrona.concurrent.*; -import org.agrona.concurrent.status.AtomicCounter; -import org.agrona.concurrent.status.CountersReader; -import org.agrona.concurrent.status.UnsafeBufferPosition; import uk.co.real_logic.artio.*; import uk.co.real_logic.artio.decoder.SessionHeaderDecoder; import uk.co.real_logic.artio.dictionary.FixDictionary; @@ -47,6 +36,17 @@ import uk.co.real_logic.artio.util.AsciiBuffer; import uk.co.real_logic.artio.util.CharFormatter; import uk.co.real_logic.artio.util.MutableAsciiBuffer; +import org.agrona.DeadlineTimerWheel; +import org.agrona.DirectBuffer; +import org.agrona.ErrorHandler; +import org.agrona.collections.Int2ObjectHashMap; +import org.agrona.collections.Long2LongHashMap; +import org.agrona.collections.Long2LongHashMap.KeyIterator; +import org.agrona.collections.LongHashSet; +import org.agrona.concurrent.*; +import org.agrona.concurrent.status.AtomicCounter; +import org.agrona.concurrent.status.CountersReader; +import org.agrona.concurrent.status.UnsafeBufferPosition; import java.io.File; import java.io.IOException; @@ -66,7 +66,9 @@ import static org.agrona.concurrent.status.CountersReader.NULL_COUNTER_ID; import static uk.co.real_logic.artio.GatewayProcess.NO_CONNECTION_ID; import static uk.co.real_logic.artio.GatewayProcess.NO_CORRELATION_ID; -import static uk.co.real_logic.artio.LogTag.*; +import static uk.co.real_logic.artio.LogTag.APPLICATION_HEARTBEAT; +import static uk.co.real_logic.artio.LogTag.FIX_CONNECTION; +import static uk.co.real_logic.artio.LogTag.LIBRARY_MANAGEMENT; import static uk.co.real_logic.artio.Pressure.isBackPressured; import static uk.co.real_logic.artio.dictionary.SessionConstants.LOGON_MESSAGE_TYPE; import static uk.co.real_logic.artio.dictionary.SessionConstants.SEQUENCE_RESET_MESSAGE_TYPE; @@ -89,7 +91,9 @@ import static uk.co.real_logic.artio.messages.SequenceNumberType.PERSISTENT; import static uk.co.real_logic.artio.messages.SequenceNumberType.TRANSIENT; import static uk.co.real_logic.artio.messages.SessionReplyStatus.*; -import static uk.co.real_logic.artio.messages.SessionState.*; +import static uk.co.real_logic.artio.messages.SessionState.ACTIVE; +import static uk.co.real_logic.artio.messages.SessionState.CONNECTED; +import static uk.co.real_logic.artio.messages.SessionState.DISCONNECTED; import static uk.co.real_logic.artio.messages.SessionStatus.SESSION_HANDOVER; /** @@ -1881,6 +1885,7 @@ public Action onLibraryConnect( clock); final LiveLibraryInfo library = new LiveLibraryInfo( + errorHandler, libraryId, libraryName, livenessDetector, aeronSessionId, gatewaySessions instanceof FixPGatewaySessions); idToLibrary.put(libraryId, library); diff --git a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/LiveLibraryInfo.java b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/LiveLibraryInfo.java index 96241d57d5..a0972416b3 100644 --- a/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/LiveLibraryInfo.java +++ b/artio-core/src/main/java/uk/co/real_logic/artio/engine/framer/LiveLibraryInfo.java @@ -15,10 +15,11 @@ */ package uk.co.real_logic.artio.engine.framer; -import org.agrona.collections.Long2ObjectHashMap; import uk.co.real_logic.artio.LivenessDetector; import uk.co.real_logic.artio.engine.ConnectedSessionInfo; import uk.co.real_logic.artio.engine.FixPConnectedSessionInfo; +import org.agrona.ErrorHandler; +import org.agrona.collections.Long2ObjectHashMap; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -28,6 +29,7 @@ final class LiveLibraryInfo implements LibraryInfo { + private final ErrorHandler errorHandler; private final int libraryId; private final String libraryName; private final LivenessDetector livenessDetector; @@ -41,12 +43,14 @@ final class LiveLibraryInfo implements LibraryInfo @SuppressWarnings("unchecked") LiveLibraryInfo( + final ErrorHandler errorHandler, final int libraryId, final String libraryName, final LivenessDetector livenessDetector, final int aeronSessionId, final boolean isFixP) { + this.errorHandler = errorHandler; this.libraryId = libraryId; this.libraryName = libraryName; this.livenessDetector = livenessDetector; @@ -120,6 +124,14 @@ boolean isConnected() void addSession(final GatewaySession session) { + final GatewaySession existingSession = removeSessionBySessionId(session.sessionId()); + if (existingSession != null && !existingSession.isOffline()) + { + final IllegalStateException exception = new IllegalStateException( + "Session already exists for session id: " + session.sessionId() + " and it is not offline." + ); + errorHandler.onError(exception); + } allSessions.add(session); } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java index 6580c7e4b4..5401a2a380 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/SoleLibrarySystemTest.java @@ -15,40 +15,53 @@ */ package uk.co.real_logic.artio.system_tests; -import org.junit.Test; -import uk.co.real_logic.artio.engine.ConnectedSessionInfo; -import uk.co.real_logic.artio.engine.EngineConfiguration; -import uk.co.real_logic.artio.engine.FixEngine; -import uk.co.real_logic.artio.engine.SessionInfo; +import uk.co.real_logic.artio.engine.*; import uk.co.real_logic.artio.engine.framer.LibraryInfo; +import uk.co.real_logic.artio.library.FixLibrary; import uk.co.real_logic.artio.library.LibraryConfiguration; +import org.junit.Test; import java.util.List; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.hasItem; import static org.junit.Assert.*; import static uk.co.real_logic.artio.FixMatchers.hasConnectionId; import static uk.co.real_logic.artio.FixMatchers.hasSessionId; import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver; +import static uk.co.real_logic.artio.Timing.DEFAULT_TIMEOUT_IN_MS; +import static uk.co.real_logic.artio.Timing.assertEventuallyTrue; import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY; import static uk.co.real_logic.artio.messages.SessionState.ACTIVE; import static uk.co.real_logic.artio.system_tests.SystemTestUtil.*; public class SoleLibrarySystemTest extends AbstractGatewayToGatewaySystemTest { + private LockStepFramerEngineScheduler scheduler; + private void launch() { - launch(true); + launch(true, false); } - private void launch(final boolean logMessages) + private void launch(final boolean logMessages, final boolean useScheduler) { mediaDriver = launchMediaDriver(); + final EngineConfiguration acceptingConfig = acceptingConfig(port, ACCEPTOR_ID, INITIATOR_ID, nanoClock) .deleteLogFileDirOnStart(true) + .replyTimeoutInMs(120_000) .initialAcceptedSessionOwner(SOLE_LIBRARY); + + if (useScheduler) + { + scheduler = new LockStepFramerEngineScheduler(); + acceptingConfig.scheduler(scheduler); + } + acceptingEngine = FixEngine.launch(acceptingConfig); final EngineConfiguration initiatingConfig = @@ -59,9 +72,23 @@ private void launch(final boolean logMessages) initiatingEngine = FixEngine.launch(initiatingConfig); final LibraryConfiguration acceptingLibraryConfig = acceptingLibraryConfig(acceptingHandler, nanoClock); - acceptingLibrary = connect(acceptingLibraryConfig); + acceptingLibrary = FixLibrary.connect(acceptingLibraryConfig); + assertEventuallyTrue( + () -> "Unable to connect to engine", + () -> + { + if (useScheduler) + { + scheduler.invokeFramer(); + } + acceptingLibrary.poll(LIBRARY_LIMIT); + + return acceptingLibrary.isConnected(); + }, + DEFAULT_TIMEOUT_IN_MS, + acceptingLibrary::close); initiatingLibrary = connect(initiatingLibraryConfig(libraryAeronPort, initiatingHandler, nanoClock)); - testSystem = new TestSystem(acceptingLibrary, initiatingLibrary); + testSystem = new TestSystem(scheduler, acceptingLibrary, initiatingLibrary); } @Test(timeout = TEST_TIMEOUT_IN_MS) @@ -105,6 +132,56 @@ public void shouldSupportUnreleasedOfflineSessionsInSoleLibraryMode() assertEquals(1, acceptingSession.lastSentMsgSeqNum()); } + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldSupportManySessionReconnections() + { + launch(false, true); + + // 300 > number in groupSizeEncoding + for (int i = 0; i < 300; i++) + { + connectAndAcquire(); + messagesCanBeExchanged(); + disconnectSessions(); + } + + assertThat(acceptingLibrary.sessions(), hasItem(acceptingSession)); + final long sessionId = acceptingSession.id(); + assertCountersClosed(false, acceptingSession); + + assertOfflineSession(sessionId, acceptingSession); + assertCountersClosed(false, acceptingSession); + + final long testDeadlineMs = System.currentTimeMillis() + 10_000; + + // trigger library timeout + while (acceptingLibrary.isConnected()) + { + acceptingLibrary.poll(10); + initiatingLibrary.poll(10); + + if (System.currentTimeMillis() > testDeadlineMs) + { + fail("Failed to disconnect library"); + } + } + + // library reconnect + while (!acceptingLibrary.isConnected()) + { + testSystem.poll(); + + if (System.currentTimeMillis() > testDeadlineMs) + { + fail("Failed to reconnect library"); + } + } + + connectAndAcquire(); + assertEquals(ACTIVE, acceptingSession.state()); + messagesCanBeExchanged(); + } + private void connectAndAcquire() { connectSessions(); @@ -130,7 +207,7 @@ public void shouldAllowReonnectingInitiatorsToReconnect() public void shouldAcquireSessionsWithLoggingSwitchedOff() { // Equivalent invariant tested in Engine mode in NoLoggingGatewayToGatewaySystemTest - launch(false); + launch(false, false); connectAndAcquire(); acceptingMessagesCanBeExchanged();