From d76c723161dfe8a661c8ef751f5250afca431fc9 Mon Sep 17 00:00:00 2001 From: Filip Petkovski Date: Thu, 15 Dec 2022 07:59:07 +0100 Subject: [PATCH] Fix ketama quorum (#5910) * Fix quorum calculation for Ketama hashring The quorum calculation is currently broken when using the Ketama hashring. The reasons are explained in detail in issue https://github.com/thanos-io/thanos/issues/5784. This commit fixes quorum calculation by tracking successfull writes for each individual time-series inside a remote-write request. The commit also removes the replicate() method inside the Handler and moves the entire logic of fanning out and calculating success into the fanoutForward() method. Signed-off-by: Filip Petkovski * Fix error propagation Signed-off-by: fpetkovski * Fix writer errors Signed-off-by: fpetkovski * Separate write from replication errors Signed-off-by: fpetkovski * Add back replication metric Signed-off-by: Filip Petkovski * Address PR comments Signed-off-by: Filip Petkovski * Address code review comments Signed-off-by: Filip Petkovski Signed-off-by: Filip Petkovski Signed-off-by: fpetkovski --- pkg/receive/handler.go | 422 ++++++++++++------- pkg/receive/handler_test.go | 789 +++++++++--------------------------- pkg/receive/writer.go | 5 +- 3 files changed, 456 insertions(+), 760 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 12afb752b8..815df089fa 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -31,14 +31,14 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/thanos/pkg/api" - statusapi "github.com/thanos-io/thanos/pkg/api/status" - "github.com/thanos-io/thanos/pkg/logging" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/thanos-io/thanos/pkg/errutil" + "github.com/thanos-io/thanos/pkg/api" + statusapi "github.com/thanos-io/thanos/pkg/api/status" + "github.com/thanos-io/thanos/pkg/logging" + extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/server/http/middleware" @@ -78,6 +78,7 @@ var ( errBadReplica = errors.New("request replica exceeds receiver replication factor") errNotReady = errors.New("target not ready") errUnavailable = errors.New("target not available") + errInternal = errors.New("internal error") ) // Options for the web Handler. @@ -350,7 +351,24 @@ type replica struct { // endpointReplica is a pair of a receive endpoint and a write request replica. type endpointReplica struct { endpoint string - replica replica + replica uint64 +} + +type trackedSeries struct { + seriesIDs []int + timeSeries []prompb.TimeSeries +} + +type writeResponse struct { + seriesIDs []int + err error +} + +func newWriteResponse(seriesIDs []int, err error) writeResponse { + return writeResponse{ + seriesIDs: seriesIDs, + err: err, + } } func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error { @@ -513,7 +531,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { responseStatusCode := http.StatusOK if err = h.handleRequest(ctx, rep, tenant, &wreq); err != nil { level.Debug(tLogger).Log("msg", "failed to handle request", "err", err) - switch determineWriteErrorCause(err, 1) { + switch errors.Cause(err) { case errNotReady: responseStatusCode = http.StatusServiceUnavailable case errUnavailable: @@ -552,30 +570,39 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p return errors.New("hashring is not ready") } - // Batch all of the time series in the write request - // into several smaller write requests that are - // grouped by target endpoint. This ensures that - // for any incoming write request to a node, - // at most one outgoing write request will be made - // to every other node in the hashring, rather than - // one request per time series. - wreqs := make(map[endpointReplica]*prompb.WriteRequest) - for i := range wreq.Timeseries { - endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n) - if err != nil { - h.mtx.RUnlock() - return err + var replicas []uint64 + if r.replicated { + replicas = []uint64{r.n} + } else { + for rn := uint64(0); rn < h.options.ReplicationFactor; rn++ { + replicas = append(replicas, rn) } - key := endpointReplica{endpoint: endpoint, replica: r} - if _, ok := wreqs[key]; !ok { - wreqs[key] = &prompb.WriteRequest{} + } + + wreqs := make(map[endpointReplica]trackedSeries) + for tsID, ts := range wreq.Timeseries { + for _, rn := range replicas { + endpoint, err := h.hashring.GetN(tenant, &ts, rn) + if err != nil { + h.mtx.RUnlock() + return err + } + key := endpointReplica{endpoint: endpoint, replica: rn} + writeTarget, ok := wreqs[key] + if !ok { + writeTarget = trackedSeries{ + seriesIDs: make([]int, 0), + timeSeries: make([]prompb.TimeSeries, 0), + } + } + writeTarget.timeSeries = append(wreqs[key].timeSeries, ts) + writeTarget.seriesIDs = append(wreqs[key].seriesIDs, tsID) + wreqs[key] = writeTarget } - wr := wreqs[key] - wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i]) } h.mtx.RUnlock() - return h.fanoutForward(ctx, tenant, wreqs, len(wreqs)) + return h.fanoutForward(ctx, tenant, wreqs, len(wreq.Timeseries), r.replicated) } // writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success. @@ -583,16 +610,26 @@ func (h *Handler) writeQuorum() int { return int((h.options.ReplicationFactor / 2) + 1) } +func quorumReached(successes []int, successThreshold int) bool { + for _, success := range successes { + if success < successThreshold { + return false + } + } + + return true +} + // fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of // requests succeeds or fails or if context is canceled. -func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*prompb.WriteRequest, successThreshold int) error { - var errs errutil.MultiError +func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]trackedSeries, numSeries int, seriesReplicated bool) error { + var errs writeErrors fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout) defer func() { - if errs.Err() != nil { + if errs.ErrOrNil() != nil { // NOTICE: The cancel function is not used on all paths intentionally, - // if there is no error when quorum successThreshold is reached, + // if there is no error when quorum is reached, // let forward requests to optimistically run until timeout. cancel() } @@ -607,38 +644,11 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e tLogger = log.With(h.logger, logTags) } - ec := make(chan error) + responses := make(chan writeResponse) var wg sync.WaitGroup - for er := range wreqs { - er := er - r := er.replica - endpoint := er.endpoint - + for writeTarget := range wreqs { wg.Add(1) - // If the request is not yet replicated, let's replicate it. - // If the replication factor isn't greater than 1, let's - // just forward the requests. - if !r.replicated && h.options.ReplicationFactor > 1 { - go func(endpoint string) { - defer wg.Done() - - var err error - tracing.DoInSpan(fctx, "receive_replicate", func(ctx context.Context) { - err = h.replicate(ctx, tenant, wreqs[er]) - }) - if err != nil { - h.replications.WithLabelValues(labelError).Inc() - ec <- errors.Wrapf(err, "replicate write request for endpoint %v", endpoint) - return - } - - h.replications.WithLabelValues(labelSuccess).Inc() - ec <- nil - }(endpoint) - - continue - } // If the endpoint for the write request is the // local node, then don't make a request but store locally. @@ -646,29 +656,29 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // function as replication to other nodes, we can treat // a failure to write locally as just another error that // can be ignored if the replication factor is met. - if endpoint == h.options.Endpoint { - go func(endpoint string) { + if writeTarget.endpoint == h.options.Endpoint { + go func(writeTarget endpointReplica) { defer wg.Done() var err error tracing.DoInSpan(fctx, "receive_tsdb_write", func(_ context.Context) { - err = h.writer.Write(fctx, tenant, wreqs[er]) + err = h.writer.Write(fctx, tenant, &prompb.WriteRequest{ + Timeseries: wreqs[writeTarget].timeSeries, + }) }) if err != nil { - // When a MultiError is added to another MultiError, the error slices are concatenated, not nested. - // To avoid breaking the counting logic, we need to flatten the error. level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) - ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "store locally for endpoint %v", writeTarget.endpoint)) return } - ec <- nil - }(endpoint) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, nil) + }(writeTarget) continue } // Make a request to the specified endpoint. - go func(endpoint string) { + go func(writeTarget endpointReplica) { defer wg.Done() var ( @@ -679,23 +689,29 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // This is an actual remote forward request so report metric here. if err != nil { h.forwardRequests.WithLabelValues(labelError).Inc() + if !seriesReplicated { + h.replications.WithLabelValues(labelError).Inc() + } return } h.forwardRequests.WithLabelValues(labelSuccess).Inc() + if !seriesReplicated { + h.replications.WithLabelValues(labelSuccess).Inc() + } }() - cl, err = h.peers.get(fctx, endpoint) + cl, err = h.peers.get(fctx, writeTarget.endpoint) if err != nil { - ec <- errors.Wrapf(err, "get peer connection for endpoint %v", endpoint) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "get peer connection for endpoint %v", writeTarget.endpoint)) return } h.mtx.RLock() - b, ok := h.peerStates[endpoint] + b, ok := h.peerStates[writeTarget.endpoint] if ok { if time.Now().Before(b.nextAllowed) { h.mtx.RUnlock() - ec <- errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", writeTarget.endpoint)) return } } @@ -705,10 +721,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e tracing.DoInSpan(fctx, "receive_forward", func(ctx context.Context) { // Actually make the request against the endpoint we determined should handle these time series. _, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{ - Timeseries: wreqs[er].Timeseries, + Timeseries: wreqs[writeTarget].timeSeries, Tenant: tenant, // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. - Replica: int64(r.n + 1), + Replica: int64(writeTarget.replica + 1), }) }) if err != nil { @@ -716,113 +732,78 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e if st, ok := status.FromError(err); ok { if st.Code() == codes.Unavailable { h.mtx.Lock() - if b, ok := h.peerStates[endpoint]; ok { + if b, ok := h.peerStates[writeTarget.endpoint]; ok { b.attempt++ dur := h.expBackoff.ForAttempt(b.attempt) b.nextAllowed = time.Now().Add(dur) level.Debug(tLogger).Log("msg", "target unavailable backing off", "for", dur) } else { - h.peerStates[endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))} + h.peerStates[writeTarget.endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))} } h.mtx.Unlock() } } - ec <- errors.Wrapf(err, "forwarding request to endpoint %v", endpoint) + werr := errors.Wrapf(err, "forwarding request to endpoint %v", writeTarget.endpoint) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, werr) return } h.mtx.Lock() - delete(h.peerStates, endpoint) + delete(h.peerStates, writeTarget.endpoint) h.mtx.Unlock() - ec <- nil - }(endpoint) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, nil) + }(writeTarget) } go func() { wg.Wait() - close(ec) + close(responses) }() // At the end, make sure to exhaust the channel, letting remaining unnecessary requests finish asynchronously. // This is needed if context is canceled or if we reached success of fail quorum faster. defer func() { go func() { - for err := range ec { - if err != nil { - level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", err) + for wresp := range responses { + if wresp.err != nil { + level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", wresp.err) } } }() }() - var success int + quorum := h.writeQuorum() + if seriesReplicated { + quorum = 1 + } + successes := make([]int, numSeries) + seriesErrs := newReplicationErrors(quorum, numSeries) for { select { case <-fctx.Done(): return fctx.Err() - case err, more := <-ec: + case wresp, more := <-responses: if !more { - return errs.Err() + for _, rerr := range seriesErrs { + errs.Add(rerr) + } + return errs.ErrOrNil() } - if err == nil { - success++ - if success >= successThreshold { - // In case the success threshold is lower than the total - // number of requests, then we can finish early here. This - // is the case for quorum writes for example. - return nil + + if wresp.err != nil { + for _, tsID := range wresp.seriesIDs { + seriesErrs[tsID].Add(wresp.err) } continue } - errs.Add(err) - } - } -} - -// replicate replicates a write request to (replication-factor) nodes -// selected by the tenant and time series. -// The function only returns when all replication requests have finished -// or the context is canceled. -func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.WriteRequest) error { - // It is possible that hashring is ready in testReady() but unready now, - // so need to lock here. - h.mtx.RLock() - if h.hashring == nil { - h.mtx.RUnlock() - return errors.New("hashring is not ready") - } - - replicatedRequests := make(map[endpointReplica]*prompb.WriteRequest) - for i := uint64(0); i < h.options.ReplicationFactor; i++ { - for _, ts := range wreq.Timeseries { - endpoint, err := h.hashring.GetN(tenant, &ts, i) - if err != nil { - h.mtx.RUnlock() - return err + for _, tsID := range wresp.seriesIDs { + successes[tsID]++ } - - er := endpointReplica{ - endpoint: endpoint, - replica: replica{n: i, replicated: true}, + if quorumReached(successes, quorum) { + return nil } - replicatedRequest, ok := replicatedRequests[er] - if !ok { - replicatedRequest = &prompb.WriteRequest{ - Timeseries: make([]prompb.TimeSeries, 0), - } - replicatedRequests[er] = replicatedRequest - } - replicatedRequest.Timeseries = append(replicatedRequest.Timeseries, ts) } } - h.mtx.RUnlock() - - quorum := h.writeQuorum() - // fanoutForward only returns an error if successThreshold (quorum) is not reached. - if err := h.fanoutForward(ctx, tenant, replicatedRequests, quorum); err != nil { - return errors.Wrap(determineWriteErrorCause(err, quorum), "quorum not reached") - } - return nil } // RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore. @@ -834,7 +815,7 @@ func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*st if err != nil { level.Debug(h.logger).Log("msg", "failed to handle request", "err", err) } - switch determineWriteErrorCause(err, 1) { + switch errors.Cause(err) { case nil: return &storepb.WriteResponse{}, nil case errNotReady: @@ -923,7 +904,9 @@ type retryState struct { nextAllowed time.Time } -type expectedErrors []*struct { +type expectedErrors []*expectedError + +type expectedError struct { err error cause func(error) bool count int @@ -933,25 +916,141 @@ func (a expectedErrors) Len() int { return len(a) } func (a expectedErrors) Swap(i, j int) { a[i], a[j] = a[j], a[i] } func (a expectedErrors) Less(i, j int) bool { return a[i].count < a[j].count } -// determineWriteErrorCause extracts a sentinel error that has occurred more than the given threshold from a given fan-out error. -// It will inspect the error's cause if the error is a MultiError, -// It will return cause of each contained error but will not traverse any deeper. -func determineWriteErrorCause(err error, threshold int) error { +// errorSet is a set of errors. +type errorSet struct { + reasonSet map[string]struct{} + errs []error +} + +// Error returns a string containing a deduplicated set of reasons. +func (es errorSet) Error() string { + if len(es.reasonSet) == 0 { + return "" + } + reasons := make([]string, 0, len(es.reasonSet)) + for reason := range es.reasonSet { + reasons = append(reasons, reason) + } + sort.Strings(reasons) + + var buf bytes.Buffer + if len(reasons) > 1 { + fmt.Fprintf(&buf, "%d errors: ", len(es.reasonSet)) + } + + var more bool + for _, reason := range reasons { + if more { + buf.WriteString("; ") + } + buf.WriteString(reason) + more = true + } + + return buf.String() +} + +// Add adds an error to the errorSet. +func (es *errorSet) Add(err error) { if err == nil { - return nil + return } - unwrappedErr := errors.Cause(err) - errs, ok := unwrappedErr.(errutil.NonNilMultiError) - if !ok { - errs = []error{unwrappedErr} + if len(es.errs) == 0 { + es.errs = []error{err} + } else { + es.errs = append(es.errs, err) } - if len(errs) == 0 { + if es.reasonSet == nil { + es.reasonSet = make(map[string]struct{}) + } + + switch addedErr := err.(type) { + case *replicationErrors: + for reason := range addedErr.reasonSet { + es.reasonSet[reason] = struct{}{} + } + case *writeErrors: + for reason := range addedErr.reasonSet { + es.reasonSet[reason] = struct{}{} + } + default: + es.reasonSet[err.Error()] = struct{}{} + } +} + +// writeErrors contains all errors that have +// occurred during a local write of a remote-write request. +type writeErrors struct { + errorSet +} + +// ErrOrNil returns the writeErrors instance if any +// errors are contained in it. +// Otherwise, it returns nil. +func (es *writeErrors) ErrOrNil() error { + if len(es.errs) == 0 { return nil } + return es +} - if threshold < 1 { - return err +// Cause returns the primary cause for a write failure. +// If multiple errors have occurred, Cause will prefer +// recoverable over non-recoverable errors. +func (es *writeErrors) Cause() error { + if len(es.errs) == 0 { + return nil + } + + expErrs := expectedErrors{ + {err: errUnavailable, cause: isUnavailable}, + {err: errNotReady, cause: isNotReady}, + {err: errConflict, cause: isConflict}, + } + + var ( + unknownErr error + knownCause bool + ) + for _, werr := range es.errs { + knownCause = false + cause := errors.Cause(werr) + for _, exp := range expErrs { + if exp.cause(cause) { + knownCause = true + exp.count++ + } + } + if !knownCause { + unknownErr = cause + } + } + + for _, exp := range expErrs { + if exp.count > 0 { + return exp.err + } + } + + return unknownErr +} + +// replicationErrors contains errors that have happened while +// replicating a time series within a remote-write request. +type replicationErrors struct { + errorSet + threshold int +} + +// Cause extracts a sentinel error with the highest occurrence that +// has happened more than the given threshold. +// If no single error has occurred more than the threshold, but the +// total number of errors meets the threshold, +// replicationErr will return errInternal. +func (es *replicationErrors) Cause() error { + if len(es.errs) == 0 { + return errorSet{} } expErrs := expectedErrors{ @@ -961,19 +1060,32 @@ func determineWriteErrorCause(err error, threshold int) error { } for _, exp := range expErrs { exp.count = 0 - for _, err := range errs { + for _, err := range es.errs { if exp.cause(errors.Cause(err)) { exp.count++ } } } + // Determine which error occurred most. sort.Sort(sort.Reverse(expErrs)) - if exp := expErrs[0]; exp.count >= threshold { + if exp := expErrs[0]; exp.count >= es.threshold { return exp.err } - return err + if len(es.errs) >= es.threshold { + return errInternal + } + + return nil +} + +func newReplicationErrors(threshold, numErrors int) []*replicationErrors { + errs := make([]*replicationErrors, numErrors) + for i := range errs { + errs[i] = &replicationErrors{threshold: threshold} + } + return errs } func newPeerGroup(dialOpts ...grpc.DialOption) *peerGroup { diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 4a2a536038..9d178612b3 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -9,7 +9,6 @@ import ( "fmt" "io" "math" - "math/rand" "net/http" "net/http/httptest" "os" @@ -22,6 +21,7 @@ import ( "testing" "time" + "google.golang.org/grpc" "gopkg.in/yaml.v3" "github.com/alecthomas/units" @@ -37,12 +37,8 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/extkingpin" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -51,196 +47,6 @@ import ( "github.com/thanos-io/thanos/pkg/testutil" ) -func TestDetermineWriteErrorCause(t *testing.T) { - for _, tc := range []struct { - name string - err error - threshold int - exp error - }{ - { - name: "nil", - }, - { - name: "nil multierror", - err: errutil.NonNilMultiError([]error{}), - }, - { - name: "matching simple", - err: errConflict, - threshold: 1, - exp: errConflict, - }, - { - name: "non-matching multierror", - err: errutil.NonNilMultiError([]error{ - errors.New("foo"), - errors.New("bar"), - }), - exp: errors.New("2 errors: foo; bar"), - }, - { - name: "nested non-matching multierror", - err: errors.Wrap(errutil.NonNilMultiError([]error{ - errors.New("foo"), - errors.New("bar"), - }), "baz"), - threshold: 1, - exp: errors.New("baz: 2 errors: foo; bar"), - }, - { - name: "deep nested non-matching multierror", - err: errors.Wrap(errutil.NonNilMultiError([]error{ - errors.New("foo"), - errutil.NonNilMultiError([]error{ - errors.New("bar"), - errors.New("qux"), - }), - }), "baz"), - threshold: 1, - exp: errors.New("baz: 2 errors: foo; 2 errors: bar; qux"), - }, - { - name: "matching multierror", - err: errutil.NonNilMultiError([]error{ - storage.ErrOutOfOrderSample, - errors.New("foo"), - errors.New("bar"), - }), - threshold: 1, - exp: errConflict, - }, - { - name: "matching multierror (exemplar error)", - err: errutil.NonNilMultiError([]error{ - storage.ErrExemplarLabelLength, - errors.New("foo"), - errors.New("bar"), - }), - threshold: 1, - exp: errConflict, - }, - { - name: "matching multierror (labels error)", - err: errutil.NonNilMultiError([]error{ - labelpb.ErrEmptyLabels, - errors.New("foo"), - errors.New("bar"), - }), - threshold: 1, - exp: errConflict, - }, - { - name: "matching but below threshold multierror", - err: errutil.NonNilMultiError([]error{ - storage.ErrOutOfOrderSample, - errors.New("foo"), - errors.New("bar"), - }), - threshold: 2, - exp: errors.New("3 errors: out of order sample; foo; bar"), - }, - { - name: "matching multierror many", - err: errutil.NonNilMultiError([]error{ - storage.ErrOutOfOrderSample, - errConflict, - status.Error(codes.AlreadyExists, "conflict"), - errors.New("foo"), - errors.New("bar"), - }), - threshold: 1, - exp: errConflict, - }, - { - name: "matching multierror many, one above threshold", - err: errutil.NonNilMultiError([]error{ - storage.ErrOutOfOrderSample, - errConflict, - tsdb.ErrNotReady, - tsdb.ErrNotReady, - tsdb.ErrNotReady, - errors.New("foo"), - }), - threshold: 2, - exp: errNotReady, - }, - { - name: "matching multierror many, one above threshold (exemplar error)", - err: errutil.NonNilMultiError([]error{ - tsdb.ErrNotReady, - tsdb.ErrNotReady, - storage.ErrDuplicateExemplar, - storage.ErrDuplicateSampleForTimestamp, - storage.ErrExemplarLabelLength, - errors.New("foo"), - }), - threshold: 2, - exp: errConflict, - }, - { - name: "matching multierror many, both above threshold, conflict has precedence", - err: errutil.NonNilMultiError([]error{ - storage.ErrOutOfOrderSample, - errConflict, - tsdb.ErrNotReady, - tsdb.ErrNotReady, - tsdb.ErrNotReady, - status.Error(codes.AlreadyExists, "conflict"), - errors.New("foo"), - }), - threshold: 2, - exp: errConflict, - }, - { - name: "matching multierror many, both above threshold, conflict has precedence (labels error)", - err: errutil.NonNilMultiError([]error{ - labelpb.ErrDuplicateLabels, - labelpb.ErrDuplicateLabels, - tsdb.ErrNotReady, - tsdb.ErrNotReady, - tsdb.ErrNotReady, - labelpb.ErrDuplicateLabels, - errors.New("foo"), - }), - threshold: 2, - exp: errConflict, - }, - { - name: "nested matching multierror", - err: errors.Wrap(errors.Wrap(errutil.NonNilMultiError([]error{ - storage.ErrOutOfOrderSample, - errors.New("foo"), - errors.New("bar"), - }), "baz"), "qux"), - threshold: 1, - exp: errConflict, - }, - { - name: "deep nested matching multierror", - err: errors.Wrap(errutil.NonNilMultiError([]error{ - errutil.NonNilMultiError([]error{ - errors.New("qux"), - status.Error(codes.AlreadyExists, "conflict"), - status.Error(codes.AlreadyExists, "conflict"), - }), - errors.New("foo"), - errors.New("bar"), - }), "baz"), - threshold: 1, - exp: errors.New("baz: 3 errors: 3 errors: qux; rpc error: code = AlreadyExists desc = conflict; rpc error: code = AlreadyExists desc = conflict; foo; bar"), - }, - } { - err := determineWriteErrorCause(tc.err, tc.threshold) - if tc.exp != nil { - testutil.NotOk(t, err) - testutil.Equals(t, tc.exp.Error(), err.Error()) - continue - } - testutil.Ok(t, err) - } -} - type fakeTenantAppendable struct { f *fakeAppendable } @@ -347,7 +153,7 @@ func (f *fakeAppender) Rollback() error { return f.rollbackErr() } -func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64) ([]*Handler, Hashring) { +func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uint64, hashringAlgo HashringAlgorithm) ([]*Handler, Hashring) { var ( cfg = []HashringConfig{{Hashring: "test"}} handlers []*Handler @@ -366,60 +172,44 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin }, } + ag := addrGen{} limiter, _ := NewLimiter(NewNopConfig(), nil, RouterIngestor, log.NewNopLogger()) for i := range appendables { h := NewHandler(nil, &Options{ TenantHeader: DefaultTenantHeader, ReplicaHeader: DefaultReplicaHeader, ReplicationFactor: replicationFactor, - ForwardTimeout: 5 * time.Second, + ForwardTimeout: 5 * time.Minute, Writer: NewWriter(log.NewNopLogger(), newFakeTenantAppendable(appendables[i])), Limiter: limiter, }) handlers = append(handlers, h) h.peers = peers - addr := randomAddr() + addr := ag.newAddr() h.options.Endpoint = addr cfg[0].Endpoints = append(cfg[0].Endpoints, h.options.Endpoint) peers.cache[addr] = &fakeRemoteWriteGRPCServer{h: h} } - hashring := newMultiHashring(AlgorithmHashmod, replicationFactor, cfg) + // Use hashmod as default. + if hashringAlgo == "" { + hashringAlgo = AlgorithmHashmod + } + + hashring := newMultiHashring(hashringAlgo, replicationFactor, cfg) for _, h := range handlers { h.Hashring(hashring) } return handlers, hashring } -func TestReceiveQuorum(t *testing.T) { +func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsistencyDelay bool) { appenderErrFn := func() error { return errors.New("failed to get appender") } conflictErrFn := func() error { return storage.ErrOutOfBounds } commitErrFn := func() error { return errors.New("failed to commit") } - wreq1 := &prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ - { - Labels: []labelpb.ZLabel{ - { - Name: "foo", - Value: "bar", - }, - }, - Samples: []prompb.Sample{ - { - Value: 1, - Timestamp: 1, - }, - { - Value: 2, - Timestamp: 2, - }, - { - Value: 3, - Timestamp: 3, - }, - }, - }, - }, + wreq := &prompb.WriteRequest{ + Timeseries: makeSeriesWithValues(50), } + for _, tc := range []struct { name string status int @@ -431,7 +221,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 1 success", status: http.StatusOK, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -442,7 +232,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 1 commit error", status: http.StatusInternalServerError, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -453,7 +243,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 1 conflict", status: http.StatusConflict, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(conflictErrFn, nil, nil), @@ -464,7 +254,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 2 success", status: http.StatusOK, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -478,7 +268,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 success", status: http.StatusOK, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -495,7 +285,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 success with replication", status: http.StatusOK, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -512,7 +302,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 commit error", status: http.StatusInternalServerError, replicationFactor: 1, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -529,7 +319,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 commit error with replication", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -546,7 +336,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 appender error with replication", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, nil, nil), @@ -566,7 +356,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 conflict with replication", status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(conflictErrFn, nil, nil), @@ -583,7 +373,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 conflict and commit error with replication", status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(conflictErrFn, commitErrFn, nil), @@ -600,7 +390,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication and one faulty", status: http.StatusOK, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), @@ -617,7 +407,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication and one commit error", status: http.StatusOK, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -634,7 +424,7 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication and two conflicts", status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), @@ -651,11 +441,28 @@ func TestReceiveQuorum(t *testing.T) { name: "size 3 with replication one conflict and one commit error", status: http.StatusInternalServerError, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, { appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), }, + }, + }, + { + name: "size 3 with replication two commit errors", + status: http.StatusInternalServerError, + replicationFactor: 3, + wreq: wreq, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, { appender: newFakeAppender(nil, commitErrFn, nil), }, @@ -665,10 +472,62 @@ func TestReceiveQuorum(t *testing.T) { }, }, { - name: "size 3 with replication two commit errors", - status: http.StatusInternalServerError, + name: "size 6 with replication 3", + status: http.StatusOK, + replicationFactor: 3, + wreq: wreq, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + }, + }, + { + name: "size 6 with replication 3 one commit and two conflict error", + status: http.StatusConflict, replicationFactor: 3, - wreq: wreq1, + wreq: wreq, + appendables: []*fakeAppendable{ + { + appender: newFakeAppender(nil, commitErrFn, nil), + }, + { + appender: newFakeAppender(nil, conflictErrFn, nil), + }, + { + appender: newFakeAppender(nil, conflictErrFn, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + }, + }, + { + name: "size 6 with replication 5 two commit errors", + status: http.StatusOK, + replicationFactor: 5, + wreq: wreq, appendables: []*fakeAppendable{ { appender: newFakeAppender(nil, commitErrFn, nil), @@ -679,11 +538,20 @@ func TestReceiveQuorum(t *testing.T) { { appender: newFakeAppender(nil, nil, nil), }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, + { + appender: newFakeAppender(nil, nil, nil), + }, }, }, } { t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) + handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor, hashringAlgo) tenant := "test" // Test from the point of view of every node // so that we know status code does not depend @@ -692,12 +560,17 @@ func TestReceiveQuorum(t *testing.T) { // Test that the correct status is returned. rec, err := makeRequest(handler, tenant, tc.wreq) if err != nil { - t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err) + t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", i+1, err) } if rec.Code != tc.status { - t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) + t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i+1, tc.status, rec.Code, rec.Body.String()) } } + + if withConsistencyDelay { + time.Sleep(50 * time.Millisecond) + } + // Test that each time series is stored // the correct amount of times in each fake DB. for _, ts := range tc.wreq.Timeseries { @@ -709,16 +582,30 @@ func TestReceiveQuorum(t *testing.T) { } } for j, a := range tc.appendables { - var expectedMin int - n := a.appender.(*fakeAppender).Get(lset) - got := uint64(len(n)) - if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { - // We have len(handlers) copies of each sample because the test case - // is run once for each handler and they all use the same appender. - expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples) - } - if uint64(expectedMin) > got { - t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got) + if withConsistencyDelay { + var expected int + n := a.appender.(*fakeAppender).Get(lset) + got := uint64(len(n)) + if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { + // We have len(handlers) copies of each sample because the test case + // is run once for each handler and they all use the same appender. + expected = len(handlers) * len(ts.Samples) + } + if uint64(expected) != got { + t.Errorf("handler: %d, labels %q: expected %d samples, got %d", j, lset.String(), expected, got) + } + } else { + var expectedMin int + n := a.appender.(*fakeAppender).Get(lset) + got := uint64(len(n)) + if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { + // We have len(handlers) copies of each sample because the test case + // is run once for each handler and they all use the same appender. + expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples) + } + if uint64(expectedMin) > got { + t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got) + } } } } @@ -726,6 +613,22 @@ func TestReceiveQuorum(t *testing.T) { } } +func TestReceiveQuorumHashmod(t *testing.T) { + testReceiveQuorum(t, AlgorithmHashmod, false) +} + +func TestReceiveQuorumKetama(t *testing.T) { + testReceiveQuorum(t, AlgorithmKetama, false) +} + +func TestReceiveWithConsistencyDelayHashmod(t *testing.T) { + testReceiveQuorum(t, AlgorithmHashmod, true) +} + +func TestReceiveWithConsistencyDelayKetama(t *testing.T) { + testReceiveQuorum(t, AlgorithmKetama, true) +} + func TestReceiveWriteRequestLimits(t *testing.T) { for _, tc := range []struct { name string @@ -778,7 +681,7 @@ func TestReceiveWriteRequestLimits(t *testing.T) { appender: newFakeAppender(nil, nil, nil), }, } - handlers, _ := newTestHandlerHashring(appendables, 3) + handlers, _ := newTestHandlerHashring(appendables, 3, AlgorithmHashmod) handler := handlers[0] tenant := "test" @@ -832,348 +735,6 @@ func TestReceiveWriteRequestLimits(t *testing.T) { } } -func TestReceiveWithConsistencyDelay(t *testing.T) { - appenderErrFn := func() error { return errors.New("failed to get appender") } - conflictErrFn := func() error { return storage.ErrOutOfBounds } - commitErrFn := func() error { return errors.New("failed to commit") } - wreq1 := &prompb.WriteRequest{ - Timeseries: []prompb.TimeSeries{ - { - Labels: []labelpb.ZLabel{ - { - Name: "foo", - Value: "bar", - }, - }, - Samples: []prompb.Sample{ - { - Value: 1, - Timestamp: 1, - }, - { - Value: 2, - Timestamp: 2, - }, - { - Value: 3, - Timestamp: 3, - }, - }, - }, - }, - } - for _, tc := range []struct { - name string - status int - replicationFactor uint64 - wreq *prompb.WriteRequest - appendables []*fakeAppendable - }{ - { - name: "size 1 success", - status: http.StatusOK, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 1 commit error", - status: http.StatusInternalServerError, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - }, - }, - { - name: "size 1 conflict", - status: http.StatusConflict, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - }, - }, - { - name: "size 2 success", - status: http.StatusOK, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 success", - status: http.StatusOK, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 success with replication", - status: http.StatusOK, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 commit error", - status: http.StatusInternalServerError, - replicationFactor: 1, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - }, - }, - { - name: "size 3 commit error with replication", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - }, - }, - { - name: "size 3 appender error with replication", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, nil, nil), - appenderErr: appenderErrFn, - }, - { - appender: newFakeAppender(nil, nil, nil), - appenderErr: appenderErrFn, - }, - { - appender: newFakeAppender(nil, nil, nil), - appenderErr: appenderErrFn, - }, - }, - }, - { - name: "size 3 conflict with replication", - status: http.StatusConflict, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - }, - }, - { - name: "size 3 conflict and commit error with replication", - status: http.StatusConflict, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(conflictErrFn, commitErrFn, nil), - }, - { - appender: newFakeAppender(conflictErrFn, commitErrFn, nil), - }, - { - appender: newFakeAppender(conflictErrFn, commitErrFn, nil), - }, - }, - }, - { - name: "size 3 with replication and one faulty", - status: http.StatusOK, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication and one commit error", - status: http.StatusOK, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication and two conflicts", - status: http.StatusConflict, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), - }, - { - appender: newFakeAppender(conflictErrFn, nil, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication one conflict and one commit error", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(cycleErrors([]error{storage.ErrOutOfBounds, storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp}), nil, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - { - name: "size 3 with replication two commit errors", - status: http.StatusInternalServerError, - replicationFactor: 3, - wreq: wreq1, - appendables: []*fakeAppendable{ - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, commitErrFn, nil), - }, - { - appender: newFakeAppender(nil, nil, nil), - }, - }, - }, - } { - // Run the quorum tests with consistency delay, which should allow us - // to see all requests completing all the time, since we're using local - // network we are not expecting anything to go wrong with these. - t.Run(tc.name, func(t *testing.T) { - handlers, hashring := newTestHandlerHashring(tc.appendables, tc.replicationFactor) - tenant := "test" - // Test from the point of view of every node - // so that we know status code does not depend - // on which node is erroring and which node is receiving. - for i, handler := range handlers { - // Test that the correct status is returned. - rec, err := makeRequest(handler, tenant, tc.wreq) - if err != nil { - t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", tc.status, err) - } - if rec.Code != tc.status { - t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i, tc.status, rec.Code, rec.Body.String()) - } - } - - time.Sleep(50 * time.Millisecond) - - // Test that each time series is stored - // the correct amount of times in each fake DB. - for _, ts := range tc.wreq.Timeseries { - lset := make(labels.Labels, len(ts.Labels)) - for j := range ts.Labels { - lset[j] = labels.Label{ - Name: ts.Labels[j].Name, - Value: ts.Labels[j].Value, - } - } - for j, a := range tc.appendables { - var expected int - n := a.appender.(*fakeAppender).Get(lset) - got := uint64(len(n)) - if a.appenderErr == nil && endpointHit(t, hashring, tc.replicationFactor, handlers[j].options.Endpoint, tenant, &ts) { - // We have len(handlers) copies of each sample because the test case - // is run once for each handler and they all use the same appender. - expected = len(handlers) * len(ts.Samples) - } - if uint64(expected) != got { - t.Errorf("handler: %d, labels %q: expected %d samples, got %d", j, lset.String(), expected, got) - } - } - } - }) - } -} - // endpointHit is a helper to determine if a given endpoint in a hashring would be selected // for a given time series, tenant, and replication factor. func endpointHit(t *testing.T, h Hashring, rf uint64, endpoint, tenant string, timeSeries *prompb.TimeSeries) bool { @@ -1224,8 +785,11 @@ func makeRequest(h *Handler, tenant string, wreq *prompb.WriteRequest) (*httptes return rec, nil } -func randomAddr() string { - return fmt.Sprintf("http://%d.%d.%d.%d:%d", rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(256), rand.Intn(35000)+30000) +type addrGen struct{ n int } + +func (a *addrGen) newAddr() string { + a.n++ + return fmt.Sprintf("http://node-%d:%d", a.n, 12345+a.n) } type fakeRemoteWriteGRPCServer struct { @@ -1302,10 +866,31 @@ func serializeSeriesWithOneSample(t testing.TB, series [][]labelpb.ZLabel) []byt return snappy.Encode(nil, body) } +func makeSeriesWithValues(numSeries int) []prompb.TimeSeries { + series := make([]prompb.TimeSeries, numSeries) + for i := 0; i < numSeries; i++ { + series[i] = prompb.TimeSeries{ + Labels: []labelpb.ZLabel{ + { + Name: fmt.Sprintf("pod-%d", i), + Value: fmt.Sprintf("nginx-%d", i), + }, + }, + Samples: []prompb.Sample{ + { + Value: float64(i), + Timestamp: 10, + }, + }, + } + } + return series +} + func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { dir := b.TempDir() - handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1) + handlers, _ := newTestHandlerHashring([]*fakeAppendable{nil}, 1, AlgorithmHashmod) handler := handlers[0] reg := prometheus.NewRegistry() diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index dbd3751536..3187782631 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -13,7 +13,6 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "github.com/thanos-io/thanos/pkg/errutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" ) @@ -73,7 +72,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR var ( ref storage.SeriesRef - errs errutil.MultiError + errs writeErrors ) for _, t := range wreq.Timeseries { // Check if time series labels are valid. If not, skip the time series @@ -210,5 +209,5 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *prompb.WriteR if err := app.Commit(); err != nil { errs.Add(errors.Wrap(err, "commit samples")) } - return errs.Err() + return errs.ErrOrNil() }