-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
receive: Only wait for write quorum #2621
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ import ( | |
"net/http" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
"github.com/go-kit/kit/log" | ||
"github.com/go-kit/kit/log/level" | ||
|
@@ -67,6 +68,7 @@ type Options struct { | |
Tracer opentracing.Tracer | ||
TLSConfig *tls.Config | ||
DialOpts []grpc.DialOption | ||
ForwardTimeout time.Duration | ||
} | ||
|
||
// Handler serves a Prometheus remote write receiving HTTP endpoint. | ||
|
@@ -324,32 +326,38 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p | |
} | ||
h.mtx.RUnlock() | ||
|
||
return h.parallelizeRequests(ctx, tenant, replicas, wreqs) | ||
return h.fanoutForward(ctx, tenant, replicas, wreqs, len(wreqs)) | ||
} | ||
|
||
// parallelizeRequests parallelizes a given set of write requests. | ||
// The function only returns when all requests have finished | ||
// or the context is canceled. | ||
func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replicas map[string]replica, wreqs map[string]*prompb.WriteRequest) error { | ||
// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success. | ||
func (h *Handler) writeQuorum() int { | ||
return int((h.options.ReplicationFactor / 2) + 1) | ||
} | ||
|
||
// fanoutForward fanouts concurrently given set of write requests. It returns status immediately when quorum of | ||
// requests succeeds or fails or if context is cancelled. | ||
func (h *Handler) fanoutForward(ctx context.Context, tenant string, replicas map[string]replica, wreqs map[string]*prompb.WriteRequest, successThreshold int) error { | ||
ec := make(chan error) | ||
defer close(ec) | ||
// We don't wan't to use a sync.WaitGroup here because that | ||
// introduces an unnecessary second synchronization mechanism, | ||
// the first being the error chan. Plus, it saves us a goroutine | ||
// as in order to collect errors while doing wg.Wait, we would | ||
// need a separate error collection goroutine. | ||
var n int | ||
|
||
var wg sync.WaitGroup | ||
for endpoint := range wreqs { | ||
n++ | ||
wg.Add(1) | ||
|
||
// If the request is not yet replicated, let's replicate it. | ||
// If the replication factor isn't greater than 1, let's | ||
// just forward the requests. | ||
if !replicas[endpoint].replicated && h.options.ReplicationFactor > 1 { | ||
go func(endpoint string) { | ||
ec <- h.replicate(ctx, tenant, wreqs[endpoint]) | ||
defer wg.Done() | ||
if err := h.replicate(ctx, tenant, wreqs[endpoint]); err != nil { | ||
ec <- errors.Wrap(err, "replicate write request") | ||
return | ||
} | ||
ec <- nil | ||
}(endpoint) | ||
continue | ||
} | ||
|
||
// If the endpoint for the write request is the | ||
// local node, then don't make a request but store locally. | ||
// By handing replication to the local node in the same | ||
|
@@ -358,82 +366,114 @@ func (h *Handler) parallelizeRequests(ctx context.Context, tenant string, replic | |
// can be ignored if the replication factor is met. | ||
if endpoint == h.options.Endpoint { | ||
go func(endpoint string) { | ||
defer wg.Done() | ||
var err error | ||
|
||
tracing.DoInSpan(ctx, "receive_tsdb_write", func(ctx context.Context) { | ||
err = h.writer.Write(tenant, wreqs[endpoint]) | ||
}) | ||
|
||
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested. | ||
// To avoid breaking the counting logic, we need to flatten the error. | ||
if errs, ok := err.(terrors.MultiError); ok { | ||
if countCause(errs, isConflict) > 0 { | ||
err = errors.Wrap(conflictErr, errs.Error()) | ||
} else if countCause(errs, isNotReady) > 0 { | ||
err = tsdb.ErrNotReady | ||
} else { | ||
err = errors.New(errs.Error()) | ||
} | ||
} | ||
if err != nil { | ||
level.Error(h.logger).Log("msg", "storing locally", "err", err, "endpoint", endpoint) | ||
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested. | ||
// To avoid breaking the counting logic, we need to flatten the error. | ||
if errs, ok := err.(terrors.MultiError); ok { | ||
if countCause(errs, isConflict) > 0 { | ||
err = errors.Wrap(conflictErr, errs.Error()) | ||
} else if countCause(errs, isNotReady) > 0 { | ||
err = tsdb.ErrNotReady | ||
} else { | ||
err = errors.New(errs.Error()) | ||
} | ||
} | ||
ec <- errors.Wrapf(err, "storing locally, endpoint %v", endpoint) | ||
return | ||
} | ||
ec <- err | ||
ec <- nil | ||
|
||
}(endpoint) | ||
continue | ||
} | ||
|
||
// Make a request to the specified endpoint. | ||
go func(endpoint string) { | ||
var err error | ||
defer wg.Done() | ||
|
||
// Increment the counters as necessary now that | ||
// the requests will go out. | ||
var ( | ||
err error | ||
cl storepb.WriteableStoreClient | ||
) | ||
defer func() { | ||
// This is an actual remote forward request so report metric here. | ||
if err != nil { | ||
h.forwardRequestsTotal.WithLabelValues("error").Inc() | ||
return | ||
} | ||
h.forwardRequestsTotal.WithLabelValues("success").Inc() | ||
}() | ||
|
||
cl, err := h.peers.get(ctx, endpoint) | ||
cl, err = h.peers.get(ctx, endpoint) | ||
if err != nil { | ||
level.Error(h.logger).Log("msg", "failed to get peer connection to forward request", "err", err, "endpoint", endpoint) | ||
ec <- err | ||
ec <- errors.Wrapf(err, "get peer connection for endpoint %v", endpoint) | ||
return | ||
} | ||
// Create a span to track the request made to another receive node. | ||
tracing.DoInSpan(ctx, "receive_forward", func(ctx context.Context) { | ||
// Actually make the request against the endpoint | ||
// we determined should handle these time series. | ||
// Actually make the request against the endpoint we determined should handle these time series. | ||
_, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{ | ||
Timeseries: wreqs[endpoint].Timeseries, | ||
Tenant: tenant, | ||
Replica: int64(replicas[endpoint].n + 1), // increment replica since on-the-wire format is 1-indexed and 0 indicates unreplicated. | ||
// Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. | ||
Replica: int64(replicas[endpoint].n + 1), | ||
}) | ||
if err != nil { | ||
level.Error(h.logger).Log("msg", "forwarding request", "err", err, "endpoint", endpoint) | ||
ec <- err | ||
return | ||
} | ||
ec <- nil | ||
}) | ||
if err != nil { | ||
ec <- errors.Wrapf(err, "forwarding request to endpoint %v", endpoint) | ||
return | ||
} | ||
ec <- nil | ||
}(endpoint) | ||
} | ||
|
||
// Collect any errors from forwarding the time series. | ||
// Rather than doing a wg.Wait here, we decrement a counter | ||
// for every error received on the chan. This simplifies | ||
// error collection and avoids data races with a separate | ||
// error collection goroutine. | ||
var errs terrors.MultiError | ||
for ; n > 0; n-- { | ||
if err := <-ec; err != nil { | ||
go func() { | ||
wg.Wait() | ||
close(ec) | ||
}() | ||
|
||
// At the end, make sure to exhaust the channel, letting remaining unnecessary requests finish asnychronously. | ||
// This is needed if context is cancelled or if we reached success of fail quorum faster. | ||
defer func() { | ||
go func() { | ||
for err := range ec { | ||
if err != nil { | ||
level.Debug(h.logger).Log("msg", "request failed, but not needed to achieve quorum", "err", err) | ||
} | ||
} | ||
}() | ||
}() | ||
|
||
var ( | ||
success int | ||
errs terrors.MultiError | ||
) | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
case err, more := <-ec: | ||
if !more { | ||
return errs | ||
} | ||
if err == nil { | ||
success++ | ||
if success >= successThreshold { | ||
// In case the success threshold is lower than the total | ||
// number of requests, then we can finish early here. This | ||
// is the case for quorum writes for example. | ||
return nil | ||
} | ||
continue | ||
} | ||
errs.Add(err) | ||
} | ||
} | ||
|
||
return errs.Err() | ||
} | ||
|
||
// replicate replicates a write request to (replication-factor) nodes | ||
|
@@ -464,20 +504,22 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri | |
} | ||
h.mtx.RUnlock() | ||
|
||
err := h.parallelizeRequests(ctx, tenant, replicas, wreqs) | ||
if errs, ok := err.(terrors.MultiError); ok { | ||
if uint64(countCause(errs, isNotReady)) >= (h.options.ReplicationFactor+1)/2 { | ||
return tsdb.ErrNotReady | ||
} | ||
if uint64(countCause(errs, isConflict)) >= (h.options.ReplicationFactor+1)/2 { | ||
return errors.Wrap(conflictErr, "did not meet replication threshold") | ||
} | ||
if uint64(len(errs)) >= (h.options.ReplicationFactor+1)/2 { | ||
return errors.Wrap(err, "did not meet replication threshold") | ||
} | ||
return nil | ||
ctx, cancel := context.WithTimeout(ctx, h.options.ForwardTimeout) | ||
defer cancel() | ||
|
||
quorum := h.writeQuorum() | ||
err := h.fanoutForward(ctx, tenant, replicas, wreqs, quorum) | ||
if countCause(err, isNotReady) >= quorum { | ||
return tsdb.ErrNotReady | ||
} | ||
return errors.Wrap(err, "could not replicate write request") | ||
if countCause(err, isConflict) >= quorum { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I still think it should len - quorum There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's take the example of replication factor 3, which has a quorum factor of 2, and we get 1 conflict error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, then it has to be I think we are both right if we assume that quorum is always +1 than half. This is however very depending on quorum value.... This algorithm should never assume things like that. Let's say quorum is 1 for some reason, with replication 3, then this logic will not hold true, vs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
what do you mean by this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We talked offline. Not a blocker so merging. |
||
return errors.Wrap(conflictErr, "did not meet success threshold due to conflict") | ||
} | ||
if err != nil { | ||
return errors.Wrap(err, "replicate") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about this case, but I thought that this will never happen (: We either have success or errors quorum kind of in my previous version (:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The attempt here is to return the best possible error at the possible cost of higher latency, for example in a 3x replication, and 2x return tsdb-not-ready/unavailable, and 1 conflict, would end up with a generalized error when in reality a retry is likely to resolve the error.
It's a trade off, either better error reporting or lower latency. Since the request is failing in this case anyways, I prefer better errors over latency.