Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid RMQConnection leaks by adopting weak refs, take 3 #197

Closed
wants to merge 14 commits into from

Conversation

michaelklishin
Copy link
Member

for some important dependencies of allocated channels and
RMQConnection itself.

For example, in this client, RMQConnection auto-allocates a channel for
the purpose of special "channel zero" (system communication
in the protocol) purposes, and that leads to a loop
of strong references that prevent RMQConnection instances
from being released.

Based on object graph analysis by @BarryDuggan in #194.

@michaelklishin
Copy link
Member Author

Some trial and error results:

  • Making RMQConnection#channelAllocator weak results in test failures in ChannelLifecycleIntegrationTest
  • Making RMQAllocatedChannel#nameGenerator weak results in test failures in ConsumersIntegrationTest

@BarryDuggan
Copy link
Collaborator

BarryDuggan commented Mar 31, 2022

I ran this branch as is and we have retain cycle.
What I would like to try next is the following.

@interface RMQSuspendResumeDispatcher ()
@property (nonatomic,weak, readwrite) id<RMQChannel> channel;
@property (nonatomic,weak, readwrite) id<RMQSender> sender;

What I've done is checked out this branch into my profiling test project.
I create a connection and then destroy it.
The memory graph shows me what object is holding onto an other.
If an object is a member of one class and that object gets passed to another class , then thats a candidate for investigation.

If these changes cause no test failures then I can pull the branch down again and verify against profiler.

@BarryDuggan
Copy link
Collaborator

@interface RMQConnectionRecover ()
@property (nonatomic,weak, readwrite) id<RMQHeartbeatSender> heartbeatSender;
@end

@michaelklishin
Copy link
Member Author

The RMQSuspendResumeDispatcher changes make unit tests for that class fail. Perhaps they can be reviewed but right now, that's the way it is.

The RMQConnectionRecover one doesn't break anything, so I've added it.

@michaelklishin
Copy link
Member Author

Another strategy would be to review how connections are closed. We may depend on ARC too much and can be more proactive about releasing the resources we obviously won't need any more after a connection.close-ok frame is received.

@michaelklishin
Copy link
Member Author

For example, the reader doesn't seem to be released here:

- (NSArray *)closeOperations {
    return @[^{[self closeAllUserChannels];},
              ^{[self sendFrameset:[[RMQFrameset alloc] initWithChannelNumber:@0 method:self.amqClose]];},
              ^{[self.channelZero blockingWaitOn:[RMQConnectionCloseOk class]];},
              ^{[self.heartbeatSender stop];},
              ^{
                  self.transport.delegate = nil;
                  [self.transport close];
              }];
}

@BarryDuggan
Copy link
Collaborator

Do I have permission to push to this branch?
rabbitmq-objc-client-194-take-3

The reason I asked is I wanted to add a New iPhone app Target to the project that links again RMQClient.
If I have that it will be a lot easier for me to:

  1. Make A change
  2. Run Unit Tests
  3. Check Memory Graph in iPhone app target

If we dont want to give me permission just yet thats fine.
Maybe you could update this branch with the new app target and I can pull down again.

@BarryDuggan
Copy link
Collaborator

This is the current state of the memory graph after

  • Create A Connection
  • Connect to Server
  • Close Connection
  • Nil Connection

I 'think' the run method in Reader may be creating a retain cycle.
Will look into that today.
Screenshot 2022-04-01 at 10 03 57

@BarryDuggan
Copy link
Collaborator

BarryDuggan commented Apr 1, 2022

More Progress here related to capturing self in callbacks.
All Tests Passing Locally.

(I can't seem to push myself)

RMQConnection

- (void)start:(void (^)(void))completionHandler {
    NSError *connectError = NULL;

    [self.transport connectAndReturnError:&connectError];
    if (connectError) {
        [self.delegate connection:self failedToConnectWithError:connectError];
    } else {
        [self.transport write:[RMQProtocolHeader new].amqEncoded];
        __weak id this = self;

        [self.commandQueue enqueue:^{
            __strong typeof(self) strongThis = this;
            id<RMQWaiter> handshakeCompletion = [strongThis.waiterFactory makeWithTimeout:strongThis.handshakeTimeout];

            RMQHandshaker *handshaker = [[RMQHandshaker alloc] initWithSender:strongThis
                                                                       config:strongThis.config
                                                            completionHandler:^(NSNumber *heartbeatTimeout,
                                                                                RMQTable *serverProperties) {
                                                                [strongThis.heartbeatSender startWithInterval:@(heartbeatTimeout.integerValue / 2)];
                strongThis.handshakeComplete = YES;
                                                                [handshakeCompletion done];
                                                                [strongThis.reader run];
                strongThis.serverProperties = serverProperties;
                                                                completionHandler();
                                                            }];
            RMQReader *handshakeReader = [[RMQReader alloc] initWithTransport:strongThis.transport
                                                                 frameHandler:handshaker];
            handshaker.reader = handshakeReader;
            [handshakeReader run];

            if (handshakeCompletion.timesOut) {
                NSError *error = [NSError errorWithDomain:RMQErrorDomain
                                                     code:RMQErrorConnectionHandshakeTimedOut
                                                 userInfo:@{NSLocalizedDescriptionKey: @"Handshake timed out."}];
                [strongThis.delegate connection:strongThis failedToConnectWithError:error];
            }
        }];
    }
}

RMQReader

- (void)handleMethodFrame:(RMQFrame *)frame {
    id<RMQMethod> method = (id<RMQMethod>)frame.payload;
    __weak id this = self;
    if (method.hasContent) {
        [self.transport readFrame:^(NSData * _Nonnull headerData) {
            __strong typeof(self) strongThis = this;

            RMQFrame *headerFrame = [strongThis frameWithData:headerData];
            RMQContentHeader *header = (RMQContentHeader *)headerFrame.payload;

            RMQFrameset *frameset = [[RMQFrameset alloc] initWithChannelNumber:frame.channelNumber
                                                                        method:method
                                                                 contentHeader:header
                                                                 contentBodies:@[]];
            if ([header.bodySize isEqualToNumber:@0]) {
                [strongThis.frameHandler handleFrameset:frameset];
            } else {
                [strongThis readBodiesForIncompleteFrameset:frameset];
            }
        }];
    } else {
        RMQFrameset *frameset = [[RMQFrameset alloc] initWithChannelNumber:frame.channelNumber
                                                                    method:method];
        [self.frameHandler handleFrameset:frameset];
    }
}

RMQTCPSocketTransport

- (void)readFrame:(void (^)(NSData * _Nonnull))complete {
    __weak id this = self;
    [self read:AMQP091_HEADER_SIZE complete:^(NSData * _Nonnull data) {
        const struct AMQPHeader *header;
        header = (const struct AMQPHeader *)data.bytes;
        __strong typeof(self) strongThis = this;
        UInt32 hostSize = CFSwapInt32BigToHost(header->size);
        
        [strongThis read:hostSize complete:^(NSData * _Nonnull payload) {
            [strongThis read:AMQP091_FINAL_OCTET_SIZE complete:^(NSData * _Nonnull frameEnd) {
                NSMutableData *allData = [data mutableCopy];
                [allData appendData:payload];
                complete(allData);
            }];
        }];
    }];
}

These changes leave us with a slightly healthier looking memory graph (just 1 issue left)
Screenshot 2022-04-01 at 12 53 23

@michaelklishin
Copy link
Member Author

I will add you as a contributor so that you can push.

Committer: Michael Klishin <michael@clojurewerkz.org>
Author: Barry Duggan <barry.duggan@jci.com>
@michaelklishin
Copy link
Member Author

@BarryDuggan you should be able to push to this repository now.

@BarryDuggan BarryDuggan closed this Apr 1, 2022
@BarryDuggan BarryDuggan deleted the rabbitmq-objc-client-194-take-3 branch April 1, 2022 16:20
@michaelklishin michaelklishin restored the rabbitmq-objc-client-194-take-3 branch April 1, 2022 18:12
@michaelklishin
Copy link
Member Author

Why was this branch deleted without a merge?

@michaelklishin
Copy link
Member Author

@BarryDuggan BarryDuggan changed the title Avoid RMQConnection leaks by adopting weak refs, take 3 Avoid RMQConnection leaks by adopting weak refs, take 4 Apr 1, 2022
@BarryDuggan BarryDuggan changed the title Avoid RMQConnection leaks by adopting weak refs, take 4 Avoid RMQConnection leaks by adopting weak refs, take 3 Apr 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants