Skip to content

Commit

Permalink
Avoid leaking offline sessions.
Browse files Browse the repository at this point in the history
Relates to support ticket 18482.

When using `soleLibraryMode` and a FIX session disconnects, the engine
will keep the session instance in `LiveLibraryInfo` but mark it as
disconnected (e.g., removing the endpoint information and
`connectionId`).

When a FIX session reconnects, a new session will be added to
`LiveLibraryInfo` regardless of whether or not there is an existing
session.

In this commit, I've added a removal as part of `addSession` maintaining
an invariant that `LiveLibraryInfo` has only one session instance per
`sessionId`. I believe this is safe, but I'm not hugely familiar with
lots of the code paths. I have added some logging in case I have missed
something. There are some long standing checks around duplicate
sessions, which gives me hope.
  • Loading branch information
ZachBray committed Apr 18, 2024
1 parent f17dad0 commit 6a7c216
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,6 @@
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.ControlledFragmentHandler.Action;
import io.aeron.logbuffer.Header;
import org.agrona.DeadlineTimerWheel;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.Long2LongHashMap.KeyIterator;
import org.agrona.collections.LongHashSet;
import org.agrona.concurrent.*;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersReader;
import org.agrona.concurrent.status.UnsafeBufferPosition;
import uk.co.real_logic.artio.*;
import uk.co.real_logic.artio.decoder.SessionHeaderDecoder;
import uk.co.real_logic.artio.dictionary.FixDictionary;
Expand All @@ -47,6 +36,17 @@
import uk.co.real_logic.artio.util.AsciiBuffer;
import uk.co.real_logic.artio.util.CharFormatter;
import uk.co.real_logic.artio.util.MutableAsciiBuffer;
import org.agrona.DeadlineTimerWheel;
import org.agrona.DirectBuffer;
import org.agrona.ErrorHandler;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2LongHashMap;
import org.agrona.collections.Long2LongHashMap.KeyIterator;
import org.agrona.collections.LongHashSet;
import org.agrona.concurrent.*;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersReader;
import org.agrona.concurrent.status.UnsafeBufferPosition;

import java.io.File;
import java.io.IOException;
Expand All @@ -66,7 +66,9 @@
import static org.agrona.concurrent.status.CountersReader.NULL_COUNTER_ID;
import static uk.co.real_logic.artio.GatewayProcess.NO_CONNECTION_ID;
import static uk.co.real_logic.artio.GatewayProcess.NO_CORRELATION_ID;
import static uk.co.real_logic.artio.LogTag.*;
import static uk.co.real_logic.artio.LogTag.APPLICATION_HEARTBEAT;
import static uk.co.real_logic.artio.LogTag.FIX_CONNECTION;
import static uk.co.real_logic.artio.LogTag.LIBRARY_MANAGEMENT;
import static uk.co.real_logic.artio.Pressure.isBackPressured;
import static uk.co.real_logic.artio.dictionary.SessionConstants.LOGON_MESSAGE_TYPE;
import static uk.co.real_logic.artio.dictionary.SessionConstants.SEQUENCE_RESET_MESSAGE_TYPE;
Expand All @@ -89,7 +91,9 @@
import static uk.co.real_logic.artio.messages.SequenceNumberType.PERSISTENT;
import static uk.co.real_logic.artio.messages.SequenceNumberType.TRANSIENT;
import static uk.co.real_logic.artio.messages.SessionReplyStatus.*;
import static uk.co.real_logic.artio.messages.SessionState.*;
import static uk.co.real_logic.artio.messages.SessionState.ACTIVE;
import static uk.co.real_logic.artio.messages.SessionState.CONNECTED;
import static uk.co.real_logic.artio.messages.SessionState.DISCONNECTED;
import static uk.co.real_logic.artio.messages.SessionStatus.SESSION_HANDOVER;

/**
Expand Down Expand Up @@ -1881,6 +1885,7 @@ public Action onLibraryConnect(
clock);

final LiveLibraryInfo library = new LiveLibraryInfo(
errorHandler,
libraryId, libraryName, livenessDetector, aeronSessionId,
gatewaySessions instanceof FixPGatewaySessions);
idToLibrary.put(libraryId, library);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
*/
package uk.co.real_logic.artio.engine.framer;

import org.agrona.collections.Long2ObjectHashMap;
import uk.co.real_logic.artio.LivenessDetector;
import uk.co.real_logic.artio.engine.ConnectedSessionInfo;
import uk.co.real_logic.artio.engine.FixPConnectedSessionInfo;
import org.agrona.ErrorHandler;
import org.agrona.collections.Long2ObjectHashMap;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -28,6 +29,7 @@

final class LiveLibraryInfo implements LibraryInfo
{
private final ErrorHandler errorHandler;
private final int libraryId;
private final String libraryName;
private final LivenessDetector livenessDetector;
Expand All @@ -41,12 +43,14 @@ final class LiveLibraryInfo implements LibraryInfo

@SuppressWarnings("unchecked")
LiveLibraryInfo(
final ErrorHandler errorHandler,
final int libraryId,
final String libraryName,
final LivenessDetector livenessDetector,
final int aeronSessionId,
final boolean isFixP)
{
this.errorHandler = errorHandler;
this.libraryId = libraryId;
this.libraryName = libraryName;
this.livenessDetector = livenessDetector;
Expand Down Expand Up @@ -120,6 +124,14 @@ boolean isConnected()

void addSession(final GatewaySession session)
{
final GatewaySession existingSession = removeSessionBySessionId(session.sessionId());
if (existingSession != null && !existingSession.isOffline())
{
final IllegalStateException exception = new IllegalStateException(
"Session already exists for session id: " + session.sessionId() + " and it is not offline."
);
errorHandler.onError(exception);
}
allSessions.add(session);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import uk.co.real_logic.artio.engine.framer.LibraryInfo;
import uk.co.real_logic.artio.library.FixLibrary;
import uk.co.real_logic.artio.library.LibraryConfiguration;
import org.junit.Ignore;
import org.junit.Test;

import java.util.List;
Expand Down Expand Up @@ -134,7 +133,6 @@ public void shouldSupportUnreleasedOfflineSessionsInSoleLibraryMode()
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
@Ignore
public void shouldSupportManySessionReconnections()
{
launch(false, true);
Expand Down

0 comments on commit 6a7c216

Please sign in to comment.