Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce resharding impact by redirecting data to new shards #8075

Conversation

Harkishen-Singh
Copy link
Contributor

@Harkishen-Singh Harkishen-Singh commented Oct 16, 2020

Signed-off-by: Harkishen-Singh harkishensingh@hotmail.com

Fixes: #7230

This PR adds support to reduce resharding impact by spinning up new shards at the time of resharding if there are existing shards. The new shards are fed with data from existing shards while blocking the incoming samples. Once this is done, the incoming samples are consumed. This keeps the samples in order. After this, the old shards are replaced with these new shards. This reduces the wait time of the samples due to slow shards, as mentioned in the related issue.

@Harkishen-Singh
Copy link
Contributor Author

cc @cstyan @csmarchbanks

@csmarchbanks
Copy link
Member

csmarchbanks commented Oct 18, 2020 via email

@Harkishen-Singh
Copy link
Contributor Author

Ahh, sorry. I didn't aim to re-ping, I felt like mentioning in PR description is not useful, so removed from there and mentioned in the comment.

Copy link
Member

@csmarchbanks csmarchbanks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries, we will get notified if it is in the description or a comment.

I left a few comments, I am not sure this is quite correct yet. It looks like runShard() would currently continue to pick up samples from its queue and try to send them rather than put them into a queue buffer, or a new queue. Then, only the very last request would be moved to the new queues in the current implementation?

storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
@Harkishen-Singh
Copy link
Contributor Author

It looks like runShard() would currently continue to pick up samples from its queue and try to send them rather than put them into a queue buffer, or a new queue. Then, only the very last request would be moved to the new queues in the current implementation?

As soon as the queue channel is closed, the pending samples in the shard will be sent to the buffer. It doesn't matter if its the last request or not, the pending samples are sent to buffers than waiting for them to be flushed (which is responsible for a slow shard).

Do we want also want to stop the existing sendSample (which is in process of sending) that may be slow? (assuming the answer to be no)

@csmarchbanks
Copy link
Member

csmarchbanks commented Oct 20, 2020

The issue is that using the default queue config, the queue channel might have 2,500 samples in it. All of those would be processed, and 4-5 requests would be sent, before the close is processed, at which point up to 500 samples would be moved over to the new shards. Using those numbers, resharding would speed up by about 20%, but I think we can do better :).

I have seen queue configs where 100,000 samples might be buffered in the queue channel., which you can imagine may take quite a long time to successfully send to the remote before seeing that the channel was closed.

Does that make sense?

@Harkishen-Singh
Copy link
Contributor Author

Harkishen-Singh commented Oct 23, 2020

@csmarchbanks as far as i understand, as soon as the a queue channel is closed, the following iteration in runShard will make ok as false and hence shift the pending samples to the buffer and then to the new shards. The only way it can be slow is if, when the queue channel is closed but at that time, sendSamples() inside if nPending >= max is running blockingly, meaning we have to wait for that send to finish, which can make it slow, and hence i asked if we want to stop the currently running sendSamples().

Maybe, if you could explain more on how All of those would be processed, and 4-5 requests would be sent is the case. That would be very helpful.

@csmarchbanks
Copy link
Member

Ahh, I think I might see where the misunderstanding is. When a channel is closed, ok only will be false when all of the samples from that channel are also drained. Since that channel can have thousands to hundreds of thousands of samples in it, all of those would be processed by the normal batch and send logic before the new logic is seen when ok comes back as false.

Here is a go playground example that shows the behavior in a simpler manner: https://play.golang.org/p/lmZ2po7P3OH. As can be seen, if we have 2300 samples in the queue when it is closed, we will send 4 requests of 500 samples before moving the final 300 samples to the new queue.

Did this explanation help at all?

@Harkishen-Singh
Copy link
Contributor Author

Did this explanation help at all?

Yes, very much. Thank you for the awesome explanation. I will update the code soon.

Copy link
Member

@cstyan cstyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few comments, having a bit of a hard time following the changes having come in to this late, but it seems like you and Chris are on top of things :)

storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
}

// stop the old shards; subsequent call to enqueue will return false.
func (s *shards) stopOldShards() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having stop and stopOldShards is again confusing naming IMO. Can we can name this one something like stopForResharding or something to that effect?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After the recent push, I feel that stopOldShards() is not required anymore.

Copy link
Member

@csmarchbanks csmarchbanks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work! Certainly getting better :)

I left some comments, but generally I agree with Callum, I am having a hard time understanding everything happening. I think the overall workflow is mostly correct, but debugging issues will be painful and I will need more time to review fully. The following is a suggestion that might make it easier to understand :)

Rather than having a having logic spread around among several methods in a shards struct, would it be clearer to represent this logic by creating a brand new shards struct that is then populated with values from the old shards? Resharding would look something like:

newShards := t.newShards()
newShards.start(numShards)
// transferTo would stop the shard, and enqueue pending samples to `newShards`.
t.shards.transferTo(newShards)
t.shards = newShards

But with some additional locking to make sure Append is still safe. Thoughts?

If the above sounds good, I would recommend two pull requests, one to refactor shards to be idempotent and replaced during reshard, and a second to implement the transferTo logic.

}(i)
}
wg.Wait()
sort.SliceStable(buffer, func(i, j int) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to sort all of the samples, or could we just stream them to the new shards in whatever order the go routines happen to pick up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used sorting since we wanted to ensure that samples should be sorted always in the queues. But if you feel that the will eventually be sorted during streaming, then sorting can be avoided.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We anyways need to sort, else I am not sure if sort is guaranteed.

storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
case <-s.reshard:
select {
case sample := <-queue:
// Capture the remaining (in case) sample from the queue.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would this happen, and is it guaranteed to only happen once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the code. This is to accept the samples that would have entered the queue, between when the resharding signal is sent to t.shards.reshard and blocking incoming samples at the time of creation of newQueues. And, this should not happen once, but till the blocking is not done on the queues of the old shards.

@Harkishen-Singh
Copy link
Contributor Author

Harkishen-Singh commented Nov 5, 2020

With the recent code pushed, I feel that stopOldShards() is not required any more. This is because stopOldShards() is called only during resharding, but when a reshard signal is sent, it goes into the t.reshard case, and then accepts incoming samples and redirecets to new queues, ending the old shards/go_routines.

What do others think about this?

I will try to make the code more cleaner (along with the suggestions by for renaming sharding functions). Sorry for the inconveniences 😅

@Harkishen-Singh
Copy link
Contributor Author

Harkishen-Singh commented Nov 6, 2020

If the above sounds good, I would recommend two pull requests, one to refactor shards to be idempotent and replaced during reshard, and a second to implement the transferTo logic.

mmm, they both are part of a single feature, then why for two PRs? Also, as far as I understand, I think for the implementation, they both are required.

@Harkishen-Singh
Copy link
Contributor Author

@csmarchbanks I have updated with the suggestions. The renaming suggestions were helpful and I think the code looks more understandable now. There are some changes like the enqueue() now does not need to send false for blocking, instead is being handled by queuemanager and related changes that I will do after listening your thoughts on the updated implementation.
Thank you.

Copy link
Member

@csmarchbanks csmarchbanks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work, this is certainly becoming more understandable!

What would happen in the current implementation if a reshard is attempted but the remote endpoint is down? You would have shards stuck in sendSamples forever without a context cancellation I think?

Also, there are some lint failures, I would love to see the results of the tests, I think there are some bugs, and it might be that we need better test coverage if they all pass.

storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
@Harkishen-Singh
Copy link
Contributor Author

What would happen in the current implementation if a reshard is attempted but the remote endpoint is down? You would have shards stuck in sendSamples forever without a context cancellation I think?

The current implementation (with recent push) is such that after each shard is formed, we check if send sendSample() fails to send any sample, which is beyond the flush deadline, then we do a hardshutdown like earlier cases.

@roidelapluie
Copy link
Member

I am wondering it it is worth complicating this code now, since there is a consensus to make remote_write transactional, so all this code would change anyway.

https://docs.google.com/document/d/1vhXKpCNY0k2cbm0g10uM2msXoMoH8CTwrg_dyqsFUKo/edit

Copy link
Member

@csmarchbanks csmarchbanks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a few more comments.

I am wondering it it is worth complicating this code now, since there is a consensus to make remote_write transactional, so all this code would change anyway.

Is anyone actually working on making remote write transactional? I couldn't attend the last dev summit so may have missed it. If not I think this bit of extra complexity is worthwhile until transactional rw is out.

storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
@@ -846,6 +917,9 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, b
if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(len(samples)))
if time.Since(begin) > s.qm.flushDeadline {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that any retries that take longer than the flushDeadline will cause a hard shutdown? Can we just use the context cancel to stop sending?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that any retries that take longer than the flushDeadline will cause a hard shutdown?

Yes

Can we just use the context cancel to stop sending?

Sorry, do you mean to just skip the current send or to stop the shard? The current implementation here stops the reshard by using the context's cancel (hardShutdown is the cacncelFunc). If not, then I think I did not get your point. Can you please explain a bit more?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rephrase, right now (*shards).stop() contains all of the logic around soft and hard shutdowns. It would be ideal if that logic would still only be contained in stop(). It is not obvious to be calling hardShutdown from inside a method called sendSamples, and I am sure that will lead to unexpected behavior in the future.

I think that ends up meaning that

	if resharding {
		return
	}

inside of stop() should be removed. Since the shard would be sending to the new queue instead of to the remote, shutdown should still happen pretty quickly. You may have to setup some background processing in reshardLoop to avoid blocking on stop, but that is preferable IMO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been addressed. I think this can be resolved now.

storage/remote/queue_manager.go Outdated Show resolved Hide resolved
@csmarchbanks
Copy link
Member

Hi @Harkishen-Singh, I am still on vacation this week, I plan to take another look at this sometime next week!

Copy link
Member

@csmarchbanks csmarchbanks left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your continued work! I Tried to answer your question and left a few more comments. Looks like the remote tests are timing out, which is likely related to a block somewhere in the reshard code.

@@ -846,6 +917,9 @@ func (s *shards) sendSamples(ctx context.Context, samples []prompb.TimeSeries, b
if err != nil {
level.Error(s.qm.logger).Log("msg", "non-recoverable error", "count", len(samples), "err", err)
s.qm.metrics.failedSamplesTotal.Add(float64(len(samples)))
if time.Since(begin) > s.qm.flushDeadline {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will rephrase, right now (*shards).stop() contains all of the logic around soft and hard shutdowns. It would be ideal if that logic would still only be contained in stop(). It is not obvious to be calling hardShutdown from inside a method called sendSamples, and I am sure that will lead to unexpected behavior in the future.

I think that ends up meaning that

	if resharding {
		return
	}

inside of stop() should be removed. Since the shard would be sending to the new queue instead of to the remote, shutdown should still happen pretty quickly. You may have to setup some background processing in reshardLoop to avoid blocking on stop, but that is preferable IMO.

storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
@Harkishen-Singh
Copy link
Contributor Author

Hey @csmarchbanks. I have updated the code with a more cleaner approach in lines with the suggestions. Thank you.

@Harkishen-Singh
Copy link
Contributor Author

PS: This isn't ready for review yet. I think the new Github features automatically asks for reviews from the maintainers.

@roidelapluie
Copy link
Member

Yes, unless your mark the PR as draft.

@Harkishen-Singh
Copy link
Contributor Author

@csmarchbanks the tests are passing now. I tried to go back to the comments/suggestions and I guess they are implemented. It's been a while working on this and I might have lost a comment or so, sorry for that. I think we can give another look at this.

I think we can work on this since the transactional remote-write can take a while to get ready, we can continue with this.

@stale stale bot added the stale label Jun 6, 2021
@codesome
Copy link
Member

@csmarchbanks @cstyan can you take another look at this, please?

@codesome codesome closed this Jul 12, 2021
@codesome codesome reopened this Jul 12, 2021
@codesome
Copy link
Member

(closed by mistake)

@stale stale bot removed the stale label Jul 12, 2021
Copy link
Member

@cstyan cstyan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need to rebase as well.

storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
storage/remote/queue_manager.go Outdated Show resolved Hide resolved
@Harkishen-Singh
Copy link
Contributor Author

Rebased and updated.

@cstyan
Copy link
Member

cstyan commented Jul 28, 2021

Looks like there's now a data race and also maybe a routine that never ends somewhere in the resharding procedure, take a look at the failing tests.

@Harkishen-Singh
Copy link
Contributor Author

Harkishen-Singh commented Jul 30, 2021

Ah sorry, I missed a lock. It should be fine now.

PS: Seems like the windows tests that are failing are not part of this PR.

Signed-off-by: Harkishen-Singh <harkishensingh@hotmail.com>
@stale stale bot added the stale label Nov 15, 2021
@bboreham
Copy link
Member

Is this likely to be resurrected? I see the original issue #7230 is still open.

@cstyan
Copy link
Member

cstyan commented May 29, 2023

I personally don't have time to take this over. @Harkishen-Singh do you plan/have any interest in picking this up again or should we mark it as open for contributors?

@roidelapluie
Copy link
Member

We have looked at this pull request during our bug scrub.

Given the lack of response, I have marked the issue as help wanted, and we have decided to close this PR.

Thank you for your contribution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Reduce the impact of remote write resharding
7 participants