diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 12afb752b8e..cc839acb7c1 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -31,13 +31,14 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/thanos/pkg/api" - statusapi "github.com/thanos-io/thanos/pkg/api/status" - "github.com/thanos-io/thanos/pkg/logging" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/thanos-io/thanos/pkg/api" + statusapi "github.com/thanos-io/thanos/pkg/api/status" + "github.com/thanos-io/thanos/pkg/logging" + "github.com/thanos-io/thanos/pkg/errutil" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/runutil" @@ -343,6 +344,7 @@ func (h *Handler) Run() error { // replica encapsulates the replica number of a request and if the request is // already replicated. type replica struct { + batch uint64 n uint64 replicated bool } @@ -353,6 +355,16 @@ type endpointReplica struct { replica replica } +type writeRequest struct { + batch uint64 + request *prompb.WriteRequest +} + +type writeResponse struct { + batch uint64 + err error +} + func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error { tLogger := log.With(h.logger, "tenant", tenant) @@ -372,7 +384,7 @@ func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, return errBadReplica } - r := replica{n: rep, replicated: rep != 0} + r := replica{n: rep, replicated: rep != 0, batch: 0} // On the wire, format is 1-indexed and in-code is 0-indexed, so we decrement the value if it was already replicated. if r.replicated { @@ -559,7 +571,8 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p // at most one outgoing write request will be made // to every other node in the hashring, rather than // one request per time series. - wreqs := make(map[endpointReplica]*prompb.WriteRequest) + var batchId uint64 + wreqs := make(map[endpointReplica]*writeRequest) for i := range wreq.Timeseries { endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n) if err != nil { @@ -568,14 +581,18 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p } key := endpointReplica{endpoint: endpoint, replica: r} if _, ok := wreqs[key]; !ok { - wreqs[key] = &prompb.WriteRequest{} + wreqs[key] = &writeRequest{ + batch: batchId, + request: &prompb.WriteRequest{}, + } + batchId++ } wr := wreqs[key] - wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i]) + wr.request.Timeseries = append(wr.request.Timeseries, wreq.Timeseries[i]) } h.mtx.RUnlock() - return h.fanoutForward(ctx, tenant, wreqs, len(wreqs)) + return h.fanoutForward(ctx, tenant, wreqs, uint64(len(wreqs)), len(wreqs)) } // writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success. @@ -583,9 +600,19 @@ func (h *Handler) writeQuorum() int { return int((h.options.ReplicationFactor / 2) + 1) } +func quorumReached(replicationSuccess []int, successThreshold int) bool { + for _, successCount := range replicationSuccess { + if successCount < successThreshold { + return false + } + } + + return true +} + // fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of // requests succeeds or fails or if context is canceled. -func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*prompb.WriteRequest, successThreshold int) error { +func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*writeRequest, numBatches uint64, successThreshold int) error { var errs errutil.MultiError fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout) @@ -607,7 +634,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e tLogger = log.With(h.logger, logTags) } - ec := make(chan error) + ec := make(chan writeResponse) var wg sync.WaitGroup for er := range wreqs { @@ -629,12 +656,12 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }) if err != nil { h.replications.WithLabelValues(labelError).Inc() - ec <- errors.Wrapf(err, "replicate write request for endpoint %v", endpoint) + ec <- writeResponse{batch: wreqs[er].batch, err: errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)} return } h.replications.WithLabelValues(labelSuccess).Inc() - ec <- nil + ec <- writeResponse{batch: wreqs[er].batch, err: nil} }(endpoint) continue @@ -652,16 +679,16 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e var err error tracing.DoInSpan(fctx, "receive_tsdb_write", func(_ context.Context) { - err = h.writer.Write(fctx, tenant, wreqs[er]) + err = h.writer.Write(fctx, tenant, wreqs[er].request) }) if err != nil { // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. // To avoid breaking the counting logic, we need to flatten the error. level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) - ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint) + ec <- writeResponse{batch: r.batch, err: errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)} return } - ec <- nil + ec <- writeResponse{batch: r.batch, err: nil} }(endpoint) continue @@ -686,7 +713,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e cl, err = h.peers.get(fctx, endpoint) if err != nil { - ec <- errors.Wrapf(err, "get peer connection for endpoint %v", endpoint) + ec <- writeResponse{batch: r.batch, err: errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)} return } @@ -695,7 +722,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e if ok { if time.Now().Before(b.nextAllowed) { h.mtx.RUnlock() - ec <- errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint) + ec <- writeResponse{batch: r.batch, err: errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)} return } } @@ -705,7 +732,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e tracing.DoInSpan(fctx, "receive_forward", func(ctx context.Context) { // Actually make the request against the endpoint we determined should handle these time series. _, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{ - Timeseries: wreqs[er].Timeseries, + Timeseries: wreqs[er].request.Timeseries, Tenant: tenant, // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. Replica: int64(r.n + 1), @@ -727,14 +754,14 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e h.mtx.Unlock() } } - ec <- errors.Wrapf(err, "forwarding request to endpoint %v", endpoint) + ec <- writeResponse{batch: r.batch, err: errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)} return } h.mtx.Lock() delete(h.peerStates, endpoint) h.mtx.Unlock() - ec <- nil + ec <- writeResponse{batch: r.batch, err: nil} }(endpoint) } @@ -747,34 +774,34 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // This is needed if context is canceled or if we reached success of fail quorum faster. defer func() { go func() { - for err := range ec { - if err != nil { - level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", err) + for wresp := range ec { + if wresp.err != nil { + level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", wresp) } } }() }() - var success int + successCounts := make([]int, numBatches) for { select { case <-fctx.Done(): return fctx.Err() - case err, more := <-ec: + case wr, more := <-ec: if !more { return errs.Err() } - if err == nil { - success++ - if success >= successThreshold { - // In case the success threshold is lower than the total - // number of requests, then we can finish early here. This - // is the case for quorum writes for example. + if wr.err == nil { + successCounts[wr.batch]++ + // In case the success threshold is lower than the total + // number of requests, then we can finish early here. This + // is the case for quorum writes for example. + if quorumReached(successCounts, successThreshold) { return nil } continue } - errs.Add(err) + errs.Add(wr.err) } } } @@ -783,7 +810,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // selected by the tenant and time series. // The function only returns when all replication requests have finished // or the context is canceled. -func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.WriteRequest) error { +func (h *Handler) replicate(ctx context.Context, tenant string, wreq *writeRequest) error { // It is possible that hashring is ready in testReady() but unready now, // so need to lock here. h.mtx.RLock() @@ -792,9 +819,9 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri return errors.New("hashring is not ready") } - replicatedRequests := make(map[endpointReplica]*prompb.WriteRequest) + replicatedRequests := make(map[endpointReplica]*writeRequest) for i := uint64(0); i < h.options.ReplicationFactor; i++ { - for _, ts := range wreq.Timeseries { + for _, ts := range wreq.request.Timeseries { endpoint, err := h.hashring.GetN(tenant, &ts, i) if err != nil { h.mtx.RUnlock() @@ -807,19 +834,22 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri } replicatedRequest, ok := replicatedRequests[er] if !ok { - replicatedRequest = &prompb.WriteRequest{ - Timeseries: make([]prompb.TimeSeries, 0), + replicatedRequest = &writeRequest{ + batch: wreq.batch, + request: &prompb.WriteRequest{ + Timeseries: make([]prompb.TimeSeries, 0), + }, } replicatedRequests[er] = replicatedRequest } - replicatedRequest.Timeseries = append(replicatedRequest.Timeseries, ts) + replicatedRequest.request.Timeseries = append(replicatedRequest.request.Timeseries, ts) } } h.mtx.RUnlock() quorum := h.writeQuorum() // fanoutForward only returns an error if successThreshold (quorum) is not reached. - if err := h.fanoutForward(ctx, tenant, replicatedRequests, quorum); err != nil { + if err := h.fanoutForward(ctx, tenant, replicatedRequests, 1, quorum); err != nil { return errors.Wrap(determineWriteErrorCause(err, quorum), "quorum not reached") } return nil