diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index f4150d33f83..8d61789f1e3 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -644,10 +644,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e tLogger = log.With(h.logger, logTags) } - ec := make(chan writeResponse) + responses := make(chan writeResponse) var wg sync.WaitGroup - for er := range wreqs { + for writeTarget := range wreqs { wg.Add(1) // If the endpoint for the write request is the @@ -656,29 +656,29 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // function as replication to other nodes, we can treat // a failure to write locally as just another error that // can be ignored if the replication factor is met. - if er.endpoint == h.options.Endpoint { - go func(er endpointReplica) { + if writeTarget.endpoint == h.options.Endpoint { + go func(writeTarget endpointReplica) { defer wg.Done() var err error tracing.DoInSpan(fctx, "receive_tsdb_write", func(_ context.Context) { err = h.writer.Write(fctx, tenant, &prompb.WriteRequest{ - Timeseries: wreqs[er].timeSeries, + Timeseries: wreqs[writeTarget].timeSeries, }) }) if err != nil { level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) - ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(err, "store locally for endpoint %v", er.endpoint)) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "store locally for endpoint %v", writeTarget.endpoint)) return } - ec <- newWriteResponse(wreqs[er].seriesIDs, nil) - }(er) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, nil) + }(writeTarget) continue } // Make a request to the specified endpoint. - go func(er endpointReplica) { + go func(writeTarget endpointReplica) { defer wg.Done() var ( @@ -700,18 +700,18 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e } }() - cl, err = h.peers.get(fctx, er.endpoint) + cl, err = h.peers.get(fctx, writeTarget.endpoint) if err != nil { - ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(err, "get peer connection for endpoint %v", er.endpoint)) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "get peer connection for endpoint %v", writeTarget.endpoint)) return } h.mtx.RLock() - b, ok := h.peerStates[er.endpoint] + b, ok := h.peerStates[writeTarget.endpoint] if ok { if time.Now().Before(b.nextAllowed) { h.mtx.RUnlock() - ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", er.endpoint)) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", writeTarget.endpoint)) return } } @@ -721,10 +721,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e tracing.DoInSpan(fctx, "receive_forward", func(ctx context.Context) { // Actually make the request against the endpoint we determined should handle these time series. _, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{ - Timeseries: wreqs[er].timeSeries, + Timeseries: wreqs[writeTarget].timeSeries, Tenant: tenant, // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. - Replica: int64(er.replica + 1), + Replica: int64(writeTarget.replica + 1), }) }) if err != nil { @@ -732,39 +732,39 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e if st, ok := status.FromError(err); ok { if st.Code() == codes.Unavailable { h.mtx.Lock() - if b, ok := h.peerStates[er.endpoint]; ok { + if b, ok := h.peerStates[writeTarget.endpoint]; ok { b.attempt++ dur := h.expBackoff.ForAttempt(b.attempt) b.nextAllowed = time.Now().Add(dur) level.Debug(tLogger).Log("msg", "target unavailable backing off", "for", dur) } else { - h.peerStates[er.endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))} + h.peerStates[writeTarget.endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))} } h.mtx.Unlock() } } - werr := errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint) - ec <- newWriteResponse(wreqs[er].seriesIDs, werr) + werr := errors.Wrapf(err, "forwarding request to endpoint %v", writeTarget.endpoint) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, werr) return } h.mtx.Lock() - delete(h.peerStates, er.endpoint) + delete(h.peerStates, writeTarget.endpoint) h.mtx.Unlock() - ec <- newWriteResponse(wreqs[er].seriesIDs, nil) - }(er) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, nil) + }(writeTarget) } go func() { wg.Wait() - close(ec) + close(responses) }() // At the end, make sure to exhaust the channel, letting remaining unnecessary requests finish asynchronously. // This is needed if context is canceled or if we reached success of fail quorum faster. defer func() { go func() { - for wresp := range ec { + for wresp := range responses { if wresp.err != nil { level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", wresp.err) } @@ -782,7 +782,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e select { case <-fctx.Done(): return fctx.Err() - case wresp, more := <-ec: + case wresp, more := <-responses: if !more { for _, rerr := range seriesErrs { errs.Add(rerr)