Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question: Publication Unblock #377

Closed
goglusid opened this issue Jul 19, 2017 · 53 comments
Closed

Question: Publication Unblock #377

goglusid opened this issue Jul 19, 2017 · 53 comments

Comments

@goglusid
Copy link

goglusid commented Jul 19, 2017

Can you please confirm that the following is indeed an issue?

In IpcPublication::checkForBlockedPublisher,

because initially we set

timeOfLastConsumerPositionChange = 0;

and

consumerPosition = producerPosition();
lastConsumerPosition = consumerPosition;

When we transition from having no subcription to having one, it triggers a call to LogBufferUnblocker.unblock even if the publication is not block.

In my use case, the first claimed message is forcefully published by unblock when in fact it should be done via commit.

Maybe setting the timeOfLastConsumerPositionChange=currenttime when a new subscription is added would prevent this.

I can also increase the unblock timeout. However it would be a a very high value given that the difference between current time and timeOfLastConsumerPositionChange = 0 could be very high.

@goglusid
Copy link
Author

goglusid commented Jul 19, 2017

Following is a scenario leading to this situation:

  1. Create a new publication (consumerPosition=0, lastConsumerPosition=0, producerPosition=0)
  2. Create a new subscription (joinPosition=0)
  3. Publication::tryClaim(X Bytes) producerPosition=X (+32 bytes for the header but that's details)

We now enter the following function with:

consumerPosition=lastConsumerPosition=0
and
producerPosition() > consumerPosition
and
timeNs > (timeOfLastConsumerPositionChange + unblockTimeoutNs)

with timeOfLastConsumerPositionChange =0

So as soon as timeNs > unblockTimeoutNs, we call LogBufferUnblocker.unblock.

private void checkForBlockedPublisher(final long timeNs)
{
if (consumerPosition == lastConsumerPosition)
{
if (producerPosition() > consumerPosition &&
timeNs > (timeOfLastConsumerPositionChange + unblockTimeoutNs))
{
if (LogBufferUnblocker.unblock(termBuffers, metaDataBuffer, consumerPosition))
{
unblockedPublications.orderedIncrement();
}
}
}
else
{
timeOfLastConsumerPositionChange = timeNs;
lastConsumerPosition = consumerPosition;
}
}

@goglusid
Copy link
Author

I changed the following code and I no longer have the issue. However, I don't know if there's a better fix to that.

public void onTimeEvent(final long timeNs, final long timeMs, final DriverConductor conductor)
{
if (subscriberPositions.length == 0)
{
timeOfLastConsumerPositionChange = timeNs;
}

checkForBlockedPublisher(timeNs);

@tmontgomery
Copy link
Contributor

tmontgomery commented Jul 19, 2017

I noticed this possibility as well while adding unblock logic to the C media driver. But haven't had time to test it out.

Same possibility for network publications might also exist.

Also, replay publications are being worked on for replay. I didn't want to touch this until @mjpt777 and @nitsanw are happy with the changes that were involved with that.

The fix you mentioned is not quite correct, I don't think. It won't work for network publications is one reason. Another is that it is only the initial check call that has the issue as it has not had a chance to block for the timeout period.

A possible solution is to init timeOfLastConsumerPositionChange to current time of publication creation.

@goglusid
Copy link
Author

goglusid commented Jul 20, 2017

My understanding is that this problem is still present for the first message published after a transition between having no subscribers and having at least one subscriber.

Ex:

  1. A publication had subscriber but now have none.
    a possible case is this: consumerPosition = lastConsumerPosition = producerPosition
    timeOfLastConsumerPositionChange = T0

  2. Wait for instance 2* unblockTimeoutNs

  3. Add a new subscription

  4. Claim a message but don't commit

  5. At this point, we have

consumerPosition = lastConsumerPosition < producerPosition
timeOfLastConsumerPositionChange = T0
and timeNs >= T0+ 2*unblockTimeoutNs

So we call LogBufferUnblocker.unblock and the publication claimed message is forcefully commited by the media driver.

mjpt777 added a commit that referenced this issue Jul 20, 2017
…on of the publication and on transition from 0 to 1 subscribers for IPC. This should avoid the publisher getting unblocked unexpectedly. Issue #377.
@mjpt777
Copy link
Contributor

mjpt777 commented Jul 20, 2017

Interesting issue this. It is different for IPC and network publications. The network publication is easily fixed using the timestamp on construction. For IPC it is more complicated as the consumers can come and go.

The timestamp can be reset when transitioning from 0 to 1 IPC subscribers. This will however reset the clock on a timeout if a publication had really become blocked before the subscriber got added.

tmontgomery added a commit that referenced this issue Jul 20, 2017
…tion creation. Reorder checks for efficiency. Issue #377.
@tmontgomery
Copy link
Contributor

Upon reflection (perhaps without enough coffee...) I'm not entirely sure that this is limited to a subscriber change on IPC.

timeOfLastConsumerPositionChange is only updated upon last consumer position and current position being different (or an added subscriber now).

So, it would seem to be the case that this sequence could occur.

  1. consumerPosition = lastConsumerPosition = producerPosition (idle publication)
    timeOfLastConsumerPositionChange = T0

  2. Wait for instance 2* unblockTimeoutNs

  3. Claim a message but don't commit for a brief (< unblockTimeoutNs) period

  4. checkForBlockedPublisher occurs before commit of message

At this point, consumerPosition = lastConsumerPosition < producerPosition
timeOfLastConsumerPositionChange = T0 and timeNs >= T0+ 2*unblockTimeoutNs

This looks more like a race of the checkForBlockedPublisher and the commit on an idle IPC publication.

Am I missing something or just not enough time for coffee to fully kick in?

I have a big sense of Deja Vu with this.... I remember it when I put in the original unblock check. But can't see how it gets avoided now.

@goglusid
Copy link
Author

goglusid commented Jul 20, 2017

I concur this will solve this issue.

If I take into account the amount of money I paid to get Aeron in the first place (0$ ;p) I shouldn't complain but yet I will respectfully.

From what I understand, if an IPC subscriber stops consuming messages for more than the unblock period then the MediaDriver will start to unblock the IPC Publisher.

The same thing happens if a publisher takes more than the unblock period between claim and commit.

Also for the Subscriber's Image, if a Subscriber takes more than the lingering period to read messages then it will crash because the Image and Log Buffers it reads are deleted.

This is the type of things that should be documented (if possible avoided) because it is like juggling with razor blades as it makes the system going into an undefined behavior.

It would be better if we would only free resources or unblock messages only if we know it is safe to do so.
Obviously if I find a better way to do it. I'll share it with you.

mjpt777 added a commit that referenced this issue Jul 20, 2017
…t producer is not in a potentially blocked position. Issue #377.
@mjpt777
Copy link
Contributor

mjpt777 commented Jul 20, 2017

If we reset the timeout on each check if the consumer position has not changed but the producer has not advanced then it seems like we would be OK. Does this now work? Also means we don't need to care about number of subscribers.

@mjpt777
Copy link
Contributor

mjpt777 commented Jul 20, 2017

@goglusid The Subscriber will not crash in the case you mentioned because it still has its own independent mapping that cannot be removed.

@goglusid
Copy link
Author

@mjpt777 I will test/review this later today or tomorrow.

@goglusid
Copy link
Author

@mjpt777 From what I understand the image mapping is removed after the lingering period. @tmontgomery ?

@tmontgomery
Copy link
Contributor

tmontgomery commented Jul 20, 2017

On the client, the Images are reclaimed after being lingered, yes.

@goglusid
Copy link
Author

goglusid commented Jul 20, 2017

@tmontgomery The following line in bold destroys the mapping for a given image after the lingering timeout. So if during this period, we have an IPC subsription scanning the log buffers then it will crash. That's what happens when I debug for instance.

void ClientConductor::onCheckManagedResources(long long now)
{
    std::lock_guard<std::recursive_mutex> lock(m_adminLock);

    // erase-remove idiom

    // check LogBuffers
    auto logIt = std::remove_if(m_lingeringLogBuffers.begin(), m_lingeringLogBuffers.end(),
      [now, this](const LogBuffersLingerDefn& entry)
    {
      return (now > (entry.m_timeOfLastStatusChange + m_resourceLingerTimeoutMs));
    });

    **m_lingeringLogBuffers.erase(logIt, m_lingeringLogBuffers.end());**

@mjpt777
Copy link
Contributor

mjpt777 commented Jul 20, 2017

Yes. It is marked closed and removed from the subscription so further usage is safe while in linger. Even after that the object when polled, if referenced directly, will return 0. This is in the Java client.

@tmontgomery
Copy link
Contributor

Yes, as I said, after the linger. The assumption is that the app will not block for an inordinate amount of time (i.e. longer than the linger timeout) in its poll.

@goglusid
Copy link
Author

@mjpt777 Indeed if you call poll after the close/removeImage you are safe. However if you call poll prior to these calls and don't return from poll during the lingering period then it will crash.

Obviously, you can put a huge timeout to cover most case but still relying on timeouts for this is risky.

@tmontgomery
Copy link
Contributor

@goglusid agreed. It is less than ideal. If you have a solution that does not rely on an atomic increment (and subsequent optimizations being avoided) on each subscription poll, I am very interested.

@goglusid
Copy link
Author

@tmontgomery I know, ideally we need to figure out an approach that doesn't need to use a shared_ptr increment/decrement of atomic for each poll. I'm thinking of it.

@tmontgomery
Copy link
Contributor

Our best solution currently is the linger one. Sadly. It's a tradeoff of performance for more app responsibility (not blocking for linger timeout on the poll).

Another tradeoff is to single thread the client like is done in the Java version with the AgentInvoker. I do plan to port something close to that into the C++ API, though. That avoids the issue, but also means the app now takes on more responsibility (to call the client conductor duty cycle timely enough).

@goglusid
Copy link
Author

@tmontgomery Ok what about this. Given a subscription is only accessed by one thread, we could call periodically a fct ex Subcription::safePoint to notify the ClientConductor that we aren't in the poll function anymore. Safepoint could increment an atomic...not frequently though. When removeImage is called we would take a copy of this safePoint counter and later the ClientConductor would compare if the current safePoint is > than its copy, if so then it is safe to delete managed resources.

Basically, the responsability of knowning the safePoint is on the Subsriber side. If you accept to use more memory then you can call safePoint every hour.

@tmontgomery
Copy link
Contributor

@goglusid have to try it. I tried early on an RCU technique that was similar. Anything that stops migration within the Image loop cancels a host of optimization opportunities. Once it is outside that loop (like the loads for the array and the number of images), it is much better. However, that's the tricky bit. Needs to be within the loop to be useful.

@goglusid
Copy link
Author

@mjpt777 Right now I can't see why your latest changes wouldn't work. (double negation) ;p That being said lock free programming leads to an increase of the possible states your program can reach so who knows... In my test, it only unblocks if the timeout is reached. Thank you very much

@goglusid
Copy link
Author

@tmontgomery Using a C++ AgentInvoker for the ClientConductor would be very useful to make the lingering of resources safe.

I'll think about a way to notify the MediaDriver when it is safe to delete lingering resources. Stay tuned!

@tmontgomery
Copy link
Contributor

@goglusid FYI. See 33c1033 for AgentInvoker support in C++.

@goglusid
Copy link
Author

@tmontgomery That was fast Thanks

@mjpt777
Copy link
Contributor

mjpt777 commented Sep 25, 2017

@goglusid I think you are correct. I'll create a test case and follow this up.

@goglusid
Copy link
Author

goglusid commented Sep 25, 2017

In order to test that I simply commented the line
activePartitionIndexOrdered(logMetaDataBuffer, nextIndex);

That way it emulates a crash and the following calls to offer all fail given that the term rotation is lost.

mjpt777 added a commit that referenced this issue Sep 26, 2017
…her dies, or takes a long pause, mid rotation then other publishers can complete the operation and make progress. Issue #377.
@mjpt777
Copy link
Contributor

mjpt777 commented Sep 26, 2017

I've taken a different approach and allowed any publishing thread to perform the rotation. This will also be better if the thread doing the rotation gets interrupted never mind dying. I've found one other issue which I'll address separately.

@goglusid
Copy link
Author

I am curious to know more about this other issue.

@mjpt777
Copy link
Contributor

mjpt777 commented Sep 26, 2017

Imagine the case of a large number of publishers all failing to complete the rotation. The driver needs to detect this and then unblock after the flow control window is exhausted. Quite a complicated one but ultimately possible.

@goglusid
Copy link
Author

goglusid commented Sep 27, 2017

@mjpt777

There's another problem:

Let's assume a lot of threads simultaneously calling Pulication::offer checking and passing the publication limit condition and all entering the TermAppender::appendUnfragmentedMessage. Given that each thread will increment the rawTail and the fact that the rawTail only has 32 bits for the offset, it is possible that the offset grow larger than 2^32 and start to mess with the upper bound of the rawTail (the termId). Do you agree this is an issue?

Another issue of using only 32 bits for the TermId is that you are limited to termLength*2^31 Bytes/Publication. This also mean that you now need to deal with a new Maxposition error in the client.

In order to deal with the previous issues, I changed the way to manage the rawTail by bitshifting the termId according to the number of bits needed for the offset based on the term length. This allow 2^63 Bytes/Publication. Also because the rawTail can no longer keep the termId I added another 64 bits to store the beginPosition of a term (TermId << bitshift). I would be willing to Skype you to discuss in more details.

@mjpt777
Copy link
Contributor

mjpt777 commented Sep 27, 2017

@goglusid Yes these are issues we are aware off and practically they are not an issue. On overflowing the first 32 bits we would need a number of threads all to be offering the maximum message length all to have passed the publication limit at the same time and be able to wrap. Have you done the calculation on how many threads it is for the different term lengths and message lengths?

The reaching max position is similar in that it is a LOT of data with anything but the smallest term lengths. Then it is a close the stream and open a new session to continue. We could look at an automated close in this case. However I'd be very interested in discussing alternative solutions to this.

@goglusid
Copy link
Author

@mjpt777

For a termlength of 1GB that is only 4 threads offering 1GB message. It is practically possible. No?

@mjpt777
Copy link
Contributor

mjpt777 commented Sep 27, 2017

No :-) For the network publication max message length is 1/8 of term length. So that is 4 * 8 = 36.

@goglusid
Copy link
Author

I meant IPC Publication. Still 36 threads isn't that much.

@mjpt777
Copy link
Contributor

mjpt777 commented Sep 27, 2017

It is the same for IPC.

@mjpt777
Copy link
Contributor

mjpt777 commented Sep 27, 2017

I also think 1/8 GB for a single message is a bad design. Does not stream well. We should pragmatically put a cap on that. @tmontgomery What do you think of a 16MB message cap?

@goglusid
Copy link
Author

So if a publication is 32 bytes from term tripping and 36 threads tries to offer the max msg at the same time then we end up in a wrong state? You don't think this is practically an issue?

@tmontgomery
Copy link
Contributor

I think 16MB is more than generous.

@mjpt777
Copy link
Contributor

mjpt777 commented Sep 27, 2017

At a 16MB cap it is very pragmatic.

@tmontgomery
Copy link
Contributor

Everything in computing has practical limits. Is 36 threads reasonable? Maybe. But is sending 128MB messages reasonable.... no. It's not. If you NEED 128MB messages, you are making the wrong tradeoffs.

@tmontgomery
Copy link
Contributor

At some point, any messaging system has to push back the responsibility of chunking to the application. 16MB is a practical limit that most applications will fit well within. And besides, once messages become too big, the application MUST assess the tradeoffs and decide for itself.

@mjpt777
Copy link
Contributor

mjpt777 commented Sep 27, 2017

Any message that does not fit in less than 1/2 cache size is a questionable design. Bigger than that should be in chunks and be streamed.

@tmontgomery
Copy link
Contributor

< 16MB fits in with the wait-free tradeoff. You want wait-free, you use smaller messages. You want convenience of obscenely large messages, then I think giving up wait-free and doing chunking at the application level is a good tradeoff.

@goglusid
Copy link
Author

@tmontgomery
:) I like your "obscenely large messages" lol

With a max of 16 MB messages then it is more practically impossible to encounter the issue that I saw. Maybe it could be documented so if someone change this limit it may cause overflow in the rawTail.

@tmontgomery
Copy link
Contributor

For the C++ side, we can definitely do a compile time check for the value and break the build if the limit is changed.

@tmontgomery
Copy link
Contributor

We could have the driver enforce a limit on the number of addPublication to a given logbuffer as well.

@goglusid
Copy link
Author

Note that if we were to store offset as a multiple of 32 Bytes instead of #Bytes then we would avoid the overflow even more...even with bigger messages ;p Indeed the offset is a multiple of 32 bytes so there are 5 bits available.

@mjpt777
Copy link
Contributor

mjpt777 commented Sep 27, 2017

@goglusid That is a nice observation on storing the multiple of 32. Maybe we should combine that with the message limit which would encourage better design anyway.

mjpt777 added a commit that referenced this issue Oct 3, 2017
…pending final message in a term, and all subsequent publishers up to the publication limit failing too so that the driver unblocks as a back stop. Issue #377.
mjpt777 added a commit that referenced this issue Oct 5, 2017
… in case of rotation failure even if producer and consumer positions are the same. Issue #377.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants