Skip to content

Commit

Permalink
Address code review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Nov 30, 2022
1 parent fb91d51 commit 38cb138
Showing 1 changed file with 25 additions and 25 deletions.
50 changes: 25 additions & 25 deletions pkg/receive/handler.go
Expand Up @@ -644,10 +644,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
tLogger = log.With(h.logger, logTags)
}

ec := make(chan writeResponse)
responses := make(chan writeResponse)

var wg sync.WaitGroup
for er := range wreqs {
for writeTarget := range wreqs {
wg.Add(1)

// If the endpoint for the write request is the
Expand All @@ -656,29 +656,29 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
// function as replication to other nodes, we can treat
// a failure to write locally as just another error that
// can be ignored if the replication factor is met.
if er.endpoint == h.options.Endpoint {
go func(er endpointReplica) {
if writeTarget.endpoint == h.options.Endpoint {
go func(writeTarget endpointReplica) {
defer wg.Done()

var err error
tracing.DoInSpan(fctx, "receive_tsdb_write", func(_ context.Context) {
err = h.writer.Write(fctx, tenant, &prompb.WriteRequest{
Timeseries: wreqs[er].timeSeries,
Timeseries: wreqs[writeTarget].timeSeries,
})
})
if err != nil {
level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error())
ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(err, "store locally for endpoint %v", er.endpoint))
responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "store locally for endpoint %v", writeTarget.endpoint))
return
}
ec <- newWriteResponse(wreqs[er].seriesIDs, nil)
}(er)
responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, nil)
}(writeTarget)

continue
}

// Make a request to the specified endpoint.
go func(er endpointReplica) {
go func(writeTarget endpointReplica) {
defer wg.Done()

var (
Expand All @@ -700,18 +700,18 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
}
}()

cl, err = h.peers.get(fctx, er.endpoint)
cl, err = h.peers.get(fctx, writeTarget.endpoint)
if err != nil {
ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(err, "get peer connection for endpoint %v", er.endpoint))
responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "get peer connection for endpoint %v", writeTarget.endpoint))
return
}

h.mtx.RLock()
b, ok := h.peerStates[er.endpoint]
b, ok := h.peerStates[writeTarget.endpoint]
if ok {
if time.Now().Before(b.nextAllowed) {
h.mtx.RUnlock()
ec <- newWriteResponse(wreqs[er].seriesIDs, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", er.endpoint))
responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", writeTarget.endpoint))
return
}
}
Expand All @@ -721,50 +721,50 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
tracing.DoInSpan(fctx, "receive_forward", func(ctx context.Context) {
// Actually make the request against the endpoint we determined should handle these time series.
_, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{
Timeseries: wreqs[er].timeSeries,
Timeseries: wreqs[writeTarget].timeSeries,
Tenant: tenant,
// Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated.
Replica: int64(er.replica + 1),
Replica: int64(writeTarget.replica + 1),
})
})
if err != nil {
// Check if peer connection is unavailable, don't attempt to send requests constantly.
if st, ok := status.FromError(err); ok {
if st.Code() == codes.Unavailable {
h.mtx.Lock()
if b, ok := h.peerStates[er.endpoint]; ok {
if b, ok := h.peerStates[writeTarget.endpoint]; ok {
b.attempt++
dur := h.expBackoff.ForAttempt(b.attempt)
b.nextAllowed = time.Now().Add(dur)
level.Debug(tLogger).Log("msg", "target unavailable backing off", "for", dur)
} else {
h.peerStates[er.endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))}
h.peerStates[writeTarget.endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))}
}
h.mtx.Unlock()
}
}
werr := errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint)
ec <- newWriteResponse(wreqs[er].seriesIDs, werr)
werr := errors.Wrapf(err, "forwarding request to endpoint %v", writeTarget.endpoint)
responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, werr)
return
}
h.mtx.Lock()
delete(h.peerStates, er.endpoint)
delete(h.peerStates, writeTarget.endpoint)
h.mtx.Unlock()

ec <- newWriteResponse(wreqs[er].seriesIDs, nil)
}(er)
responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, nil)
}(writeTarget)
}

go func() {
wg.Wait()
close(ec)
close(responses)
}()

// At the end, make sure to exhaust the channel, letting remaining unnecessary requests finish asynchronously.
// This is needed if context is canceled or if we reached success of fail quorum faster.
defer func() {
go func() {
for wresp := range ec {
for wresp := range responses {
if wresp.err != nil {
level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", wresp.err)
}
Expand All @@ -782,7 +782,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
select {
case <-fctx.Done():
return fctx.Err()
case wresp, more := <-ec:
case wresp, more := <-responses:
if !more {
for _, rerr := range seriesErrs {
errs.Add(rerr)
Expand Down

0 comments on commit 38cb138

Please sign in to comment.