Skip to content

Commit

Permalink
Merge pull request #506 from real-logic/bugfix/ticket-18482
Browse files Browse the repository at this point in the history
Demonstrate issue exceeding numInGroup in ControlNotification and avoid leaking offline sessions
  • Loading branch information
vyazelenko committed Apr 18, 2024
2 parents 491f67d + 6a7c216 commit 74a8858
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,6 @@
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.ControlledFragmentHandler.Action;
import io.aeron.logbuffer.Header;
import org.agrona.DeadlineTimerWheel;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.Long2LongHashMap.KeyIterator;
import org.agrona.collections.LongHashSet;
import org.agrona.concurrent.*;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersReader;
import org.agrona.concurrent.status.UnsafeBufferPosition;
import uk.co.real_logic.artio.*;
import uk.co.real_logic.artio.decoder.SessionHeaderDecoder;
import uk.co.real_logic.artio.dictionary.FixDictionary;
Expand All @@ -47,6 +36,17 @@
import uk.co.real_logic.artio.util.AsciiBuffer;
import uk.co.real_logic.artio.util.CharFormatter;
import uk.co.real_logic.artio.util.MutableAsciiBuffer;
import org.agrona.DeadlineTimerWheel;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.Long2LongHashMap.KeyIterator;
import org.agrona.collections.LongHashSet;
import org.agrona.concurrent.*;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersReader;
import org.agrona.concurrent.status.UnsafeBufferPosition;

import java.io.File;
import java.io.IOException;
Expand All @@ -66,7 +66,9 @@
import static org.agrona.concurrent.status.CountersReader.NULL_COUNTER_ID;
import static uk.co.real_logic.artio.GatewayProcess.NO_CONNECTION_ID;
import static uk.co.real_logic.artio.GatewayProcess.NO_CORRELATION_ID;
import static uk.co.real_logic.artio.LogTag.*;
import static uk.co.real_logic.artio.LogTag.APPLICATION_HEARTBEAT;
import static uk.co.real_logic.artio.LogTag.FIX_CONNECTION;
import static uk.co.real_logic.artio.LogTag.LIBRARY_MANAGEMENT;
import static uk.co.real_logic.artio.Pressure.isBackPressured;
import static uk.co.real_logic.artio.dictionary.SessionConstants.LOGON_MESSAGE_TYPE;
import static uk.co.real_logic.artio.dictionary.SessionConstants.SEQUENCE_RESET_MESSAGE_TYPE;
Expand All @@ -89,7 +91,9 @@
import static uk.co.real_logic.artio.messages.SequenceNumberType.PERSISTENT;
import static uk.co.real_logic.artio.messages.SequenceNumberType.TRANSIENT;
import static uk.co.real_logic.artio.messages.SessionReplyStatus.*;
import static uk.co.real_logic.artio.messages.SessionState.*;
import static uk.co.real_logic.artio.messages.SessionState.ACTIVE;
import static uk.co.real_logic.artio.messages.SessionState.CONNECTED;
import static uk.co.real_logic.artio.messages.SessionState.DISCONNECTED;
import static uk.co.real_logic.artio.messages.SessionStatus.SESSION_HANDOVER;

/**
Expand Down Expand Up @@ -1881,6 +1885,7 @@ public Action onLibraryConnect(
clock);

final LiveLibraryInfo library = new LiveLibraryInfo(
errorHandler,
libraryId, libraryName, livenessDetector, aeronSessionId,
gatewaySessions instanceof FixPGatewaySessions);
idToLibrary.put(libraryId, library);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
*/
package uk.co.real_logic.artio.engine.framer;

import org.agrona.collections.Long2ObjectHashMap;
import uk.co.real_logic.artio.LivenessDetector;
import uk.co.real_logic.artio.engine.ConnectedSessionInfo;
import uk.co.real_logic.artio.engine.FixPConnectedSessionInfo;
import org.agrona.ErrorHandler;
import org.agrona.collections.Long2ObjectHashMap;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -28,6 +29,7 @@

final class LiveLibraryInfo implements LibraryInfo
{
private final ErrorHandler errorHandler;
private final int libraryId;
private final String libraryName;
private final LivenessDetector livenessDetector;
Expand All @@ -41,12 +43,14 @@ final class LiveLibraryInfo implements LibraryInfo

@SuppressWarnings("unchecked")
LiveLibraryInfo(
final ErrorHandler errorHandler,
final int libraryId,
final String libraryName,
final LivenessDetector livenessDetector,
final int aeronSessionId,
final boolean isFixP)
{
this.errorHandler = errorHandler;
this.libraryId = libraryId;
this.libraryName = libraryName;
this.livenessDetector = livenessDetector;
Expand Down Expand Up @@ -120,6 +124,14 @@ boolean isConnected()

void addSession(final GatewaySession session)
{
final GatewaySession existingSession = removeSessionBySessionId(session.sessionId());
if (existingSession != null && !existingSession.isOffline())
{
final IllegalStateException exception = new IllegalStateException(
"Session already exists for session id: " + session.sessionId() + " and it is not offline."
);
errorHandler.onError(exception);
}
allSessions.add(session);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,53 @@
*/
package uk.co.real_logic.artio.system_tests;

import org.junit.Test;
import uk.co.real_logic.artio.engine.ConnectedSessionInfo;
import uk.co.real_logic.artio.engine.EngineConfiguration;
import uk.co.real_logic.artio.engine.FixEngine;
import uk.co.real_logic.artio.engine.SessionInfo;
import uk.co.real_logic.artio.engine.*;
import uk.co.real_logic.artio.engine.framer.LibraryInfo;
import uk.co.real_logic.artio.library.FixLibrary;
import uk.co.real_logic.artio.library.LibraryConfiguration;
import org.junit.Test;

import java.util.List;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.*;
import static uk.co.real_logic.artio.FixMatchers.hasConnectionId;
import static uk.co.real_logic.artio.FixMatchers.hasSessionId;
import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver;
import static uk.co.real_logic.artio.Timing.DEFAULT_TIMEOUT_IN_MS;
import static uk.co.real_logic.artio.Timing.assertEventuallyTrue;
import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY;
import static uk.co.real_logic.artio.messages.SessionState.ACTIVE;
import static uk.co.real_logic.artio.system_tests.SystemTestUtil.*;

public class SoleLibrarySystemTest extends AbstractGatewayToGatewaySystemTest
{
private LockStepFramerEngineScheduler scheduler;

private void launch()
{
launch(true);
launch(true, false);
}

private void launch(final boolean logMessages)
private void launch(final boolean logMessages, final boolean useScheduler)
{
mediaDriver = launchMediaDriver();


final EngineConfiguration acceptingConfig = acceptingConfig(port, ACCEPTOR_ID, INITIATOR_ID, nanoClock)
.deleteLogFileDirOnStart(true)
.replyTimeoutInMs(120_000)
.initialAcceptedSessionOwner(SOLE_LIBRARY);

if (useScheduler)
{
scheduler = new LockStepFramerEngineScheduler();
acceptingConfig.scheduler(scheduler);
}

acceptingEngine = FixEngine.launch(acceptingConfig);

final EngineConfiguration initiatingConfig =
Expand All @@ -59,9 +72,23 @@ private void launch(final boolean logMessages)
initiatingEngine = FixEngine.launch(initiatingConfig);

final LibraryConfiguration acceptingLibraryConfig = acceptingLibraryConfig(acceptingHandler, nanoClock);
acceptingLibrary = connect(acceptingLibraryConfig);
acceptingLibrary = FixLibrary.connect(acceptingLibraryConfig);
assertEventuallyTrue(
() -> "Unable to connect to engine",
() ->
{
if (useScheduler)
{
scheduler.invokeFramer();
}
acceptingLibrary.poll(LIBRARY_LIMIT);

return acceptingLibrary.isConnected();
},
DEFAULT_TIMEOUT_IN_MS,
acceptingLibrary::close);
initiatingLibrary = connect(initiatingLibraryConfig(libraryAeronPort, initiatingHandler, nanoClock));
testSystem = new TestSystem(acceptingLibrary, initiatingLibrary);
testSystem = new TestSystem(scheduler, acceptingLibrary, initiatingLibrary);
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
Expand Down Expand Up @@ -105,6 +132,56 @@ public void shouldSupportUnreleasedOfflineSessionsInSoleLibraryMode()
assertEquals(1, acceptingSession.lastSentMsgSeqNum());
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void shouldSupportManySessionReconnections()
{
launch(false, true);

// 300 > number in groupSizeEncoding
for (int i = 0; i < 300; i++)
{
connectAndAcquire();
messagesCanBeExchanged();
disconnectSessions();
}

assertThat(acceptingLibrary.sessions(), hasItem(acceptingSession));
final long sessionId = acceptingSession.id();
assertCountersClosed(false, acceptingSession);

assertOfflineSession(sessionId, acceptingSession);
assertCountersClosed(false, acceptingSession);

final long testDeadlineMs = System.currentTimeMillis() + 10_000;

// trigger library timeout
while (acceptingLibrary.isConnected())
{
acceptingLibrary.poll(10);
initiatingLibrary.poll(10);

if (System.currentTimeMillis() > testDeadlineMs)
{
fail("Failed to disconnect library");
}
}

// library reconnect
while (!acceptingLibrary.isConnected())
{
testSystem.poll();

if (System.currentTimeMillis() > testDeadlineMs)
{
fail("Failed to reconnect library");
}
}

connectAndAcquire();
assertEquals(ACTIVE, acceptingSession.state());
messagesCanBeExchanged();
}

private void connectAndAcquire()
{
connectSessions();
Expand All @@ -130,7 +207,7 @@ public void shouldAllowReonnectingInitiatorsToReconnect()
public void shouldAcquireSessionsWithLoggingSwitchedOff()
{
// Equivalent invariant tested in Engine mode in NoLoggingGatewayToGatewaySystemTest
launch(false);
launch(false, false);

connectAndAcquire();
acceptingMessagesCanBeExchanged();
Expand Down

0 comments on commit 74a8858

Please sign in to comment.