Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AeronArchive does not survive disconnects #427

Closed
zyulyaev opened this issue Oct 31, 2017 · 5 comments
Closed

AeronArchive does not survive disconnects #427

zyulyaev opened this issue Oct 31, 2017 · 5 comments

Comments

@zyulyaev
Copy link

AeronArchive does not survive long disconnects > 5s.
But what is more important there is no way to know that Archive's ControlSession is dead or not.
The only sympton will be all AeronArchive methods throwing TimeoutException which is ambiguous. It may either indicate that connection is still considered connected by Aeron but is actually broken, or that Archive's ControlSession is dead.
One could just recreate AeronArchive on each TimeoutException, but then you can get a memory leak, because currently if you send CONNECT message, a new ControlSession is unconditionaly created (line), but previous one is still considered alive as its Publication is in connected state (line).

Following test reflects described case:

package io.aeron.archive;

import io.aeron.Aeron;
import io.aeron.CommonContext;
import io.aeron.archive.client.AeronArchive;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ext.DebugReceiveChannelEndpoint;
import io.aeron.driver.ext.DebugSendChannelEndpoint;
import io.aeron.driver.ext.LossGenerator;
import io.aeron.exceptions.TimeoutException;
import org.agrona.concurrent.UnsafeBuffer;
import org.junit.Assert;
import org.junit.Test;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

public class AeronArchiveTest {
    @Test
    public void testDisconnect() throws Exception {
        ControlledLossGenerator lossGenerator = new ControlledLossGenerator();
        String aeronDirectoryName = CommonContext.generateRandomDirName();
        try (ArchivingMediaDriver mediaDriver = ArchivingMediaDriver.launch(
            new MediaDriver.Context()
                .timerIntervalNs(TimeUnit.MILLISECONDS.toNanos(100))
                .publicationConnectionTimeoutNs(TimeUnit.MILLISECONDS.toNanos(500))
                .imageLivenessTimeoutNs(TimeUnit.MILLISECONDS.toNanos(500))

                .aeronDirectoryName(aeronDirectoryName)
                .receiveChannelEndpointSupplier((udpChannel, dispatcher, statusIndicator, context) ->
                    new DebugReceiveChannelEndpoint(udpChannel, dispatcher, statusIndicator, context,
                        lossGenerator, lossGenerator))
                .sendChannelEndpointSupplier((udpChannel, statusIndicator, context) ->
                    new DebugSendChannelEndpoint(udpChannel, statusIndicator, context,
                        lossGenerator, lossGenerator)),
            new Archive.Context()
                .aeronContext(new Aeron.Context()
                    .aeronDirectoryName(aeronDirectoryName)));
             Aeron archiveAeron = Aeron.connect(new Aeron.Context()
                 .aeronDirectoryName(aeronDirectoryName));
             AeronArchive archive = AeronArchive.connect(new AeronArchive.Context()
                .aeron(archiveAeron)))
        {
            lossGenerator.shouldDrop = true;

            Thread.sleep(6000);
            // now ControlSession should be dead

            lossGenerator.shouldDrop = false;

            while (!archive.archiveProxy().publication().isConnected()) {
                Thread.yield();
            }

            try {
                archive.listRecordings(0, 10, new FailControlResponseListener());
                Assert.fail();
            } catch (TimeoutException ex) {
                // this exception is ambiguous
                // there is no way to know, is it time to CONNECT again
            }
        }
    }

    private static class ControlledLossGenerator implements LossGenerator {
        private volatile boolean shouldDrop = false;

        @Override
        public boolean shouldDropFrame(InetSocketAddress address, UnsafeBuffer buffer, int length) {
            return shouldDrop;
        }
    }
}
@mjpt777
Copy link
Contributor

mjpt777 commented Oct 31, 2017

Yes you are correct that the error reporting should be better in such cases. I'm going to revisit this code.

mjpt777 added a commit that referenced this issue Oct 31, 2017
@mjpt777
Copy link
Contributor

mjpt777 commented Oct 31, 2017

No memory leak is occurring. The Publication was going not connected and then connecting again with a new session. I've tightened up the logic to ensure the session is aborted if either inbound image or outbound publication drops.

When an inbound requests comes and no return path can be determined then sending back an error is tricky. @tmontgomery and I are looking at a number of ways to address this. One simple way would be for a having a commonly known error stream. An alternative is the concept of a full duplex Aeron style socket which is in development for other uses that this could benefit from.

For now if you get an error then reconnect.

mjpt777 added a commit that referenced this issue Oct 31, 2017
@mjpt777
Copy link
Contributor

mjpt777 commented Oct 31, 2017

I've pushed a change that detects the connection is broken and gives a more informative message.

@zyulyaev
Copy link
Author

zyulyaev commented Nov 1, 2017

I think memory leak can actually occur in a scenario when you dropped your AeronArchive for some reason (app crashed or you consider it disconnected) and created a new one with same response channel and stream, while old ControlSession in Archive is still alive.
This case actually may lead to another unwanted behavior when your new AeronArchive fail to connect. ControlSession#waitForConnection succeeds immediately as Aeron still considers original Publication connected. And the client may not connect to the server in time and miss the CONNECT response.

mjpt777 added a commit that referenced this issue Nov 1, 2017
mjpt777 added a commit that referenced this issue Nov 1, 2017
…e so that it cannot be reclaimed by a restarting client on the same channel and stream id on the same driver. Issue #427.
@mjpt777
Copy link
Contributor

mjpt777 commented Nov 1, 2017

I've addressed those two potential situations.

@mjpt777 mjpt777 closed this as completed Nov 1, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants