Skip to content

Commit

Permalink
Fix ketama quorum
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Nov 21, 2022
1 parent baac7aa commit a37d49f
Showing 1 changed file with 69 additions and 39 deletions.
108 changes: 69 additions & 39 deletions pkg/receive/handler.go
Expand Up @@ -31,13 +31,14 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/logging"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/api"
statusapi "github.com/thanos-io/thanos/pkg/api/status"
"github.com/thanos-io/thanos/pkg/logging"

"github.com/thanos-io/thanos/pkg/errutil"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/runutil"
Expand Down Expand Up @@ -343,6 +344,7 @@ func (h *Handler) Run() error {
// replica encapsulates the replica number of a request and if the request is
// already replicated.
type replica struct {
batch uint64
n uint64
replicated bool
}
Expand All @@ -353,6 +355,16 @@ type endpointReplica struct {
replica replica
}

type writeRequest struct {
batch uint64
request *prompb.WriteRequest
}

type writeResponse struct {
batch uint64
err error
}

func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error {
tLogger := log.With(h.logger, "tenant", tenant)

Expand All @@ -372,7 +384,7 @@ func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string,
return errBadReplica
}

r := replica{n: rep, replicated: rep != 0}
r := replica{n: rep, replicated: rep != 0, batch: 0}

// On the wire, format is 1-indexed and in-code is 0-indexed, so we decrement the value if it was already replicated.
if r.replicated {
Expand Down Expand Up @@ -559,7 +571,8 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
// at most one outgoing write request will be made
// to every other node in the hashring, rather than
// one request per time series.
wreqs := make(map[endpointReplica]*prompb.WriteRequest)
var batchId uint64
wreqs := make(map[endpointReplica]*writeRequest)
for i := range wreq.Timeseries {
endpoint, err := h.hashring.GetN(tenant, &wreq.Timeseries[i], r.n)
if err != nil {
Expand All @@ -568,24 +581,38 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p
}
key := endpointReplica{endpoint: endpoint, replica: r}
if _, ok := wreqs[key]; !ok {
wreqs[key] = &prompb.WriteRequest{}
wreqs[key] = &writeRequest{
batch: batchId,
request: &prompb.WriteRequest{},
}
batchId++
}
wr := wreqs[key]
wr.Timeseries = append(wr.Timeseries, wreq.Timeseries[i])
wr.request.Timeseries = append(wr.request.Timeseries, wreq.Timeseries[i])
}
h.mtx.RUnlock()

return h.fanoutForward(ctx, tenant, wreqs, len(wreqs))
return h.fanoutForward(ctx, tenant, wreqs, uint64(len(wreqs)), len(wreqs))
}

// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success.
func (h *Handler) writeQuorum() int {
return int((h.options.ReplicationFactor / 2) + 1)
}

func quorumReached(replicationSuccess []int, successThreshold int) bool {
for _, successCount := range replicationSuccess {
if successCount < successThreshold {
return false
}
}

return true
}

// fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of
// requests succeeds or fails or if context is canceled.
func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*prompb.WriteRequest, successThreshold int) error {
func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]*writeRequest, numBatches uint64, successThreshold int) error {
var errs errutil.MultiError

fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout)
Expand All @@ -607,7 +634,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
tLogger = log.With(h.logger, logTags)
}

ec := make(chan error)
ec := make(chan writeResponse)

var wg sync.WaitGroup
for er := range wreqs {
Expand All @@ -629,12 +656,12 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
})
if err != nil {
h.replications.WithLabelValues(labelError).Inc()
ec <- errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)
ec <- writeResponse{batch: wreqs[er].batch, err: errors.Wrapf(err, "replicate write request for endpoint %v", endpoint)}
return
}

h.replications.WithLabelValues(labelSuccess).Inc()
ec <- nil
ec <- writeResponse{batch: wreqs[er].batch, err: nil}
}(endpoint)

continue
Expand All @@ -652,16 +679,16 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e

var err error
tracing.DoInSpan(fctx, "receive_tsdb_write", func(_ context.Context) {
err = h.writer.Write(fctx, tenant, wreqs[er])
err = h.writer.Write(fctx, tenant, wreqs[er].request)
})
if err != nil {
// When a MultiError is added to another MultiError, the error slices are concatenated, not nested.
// To avoid breaking the counting logic, we need to flatten the error.
level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error())
ec <- errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)
ec <- writeResponse{batch: r.batch, err: errors.Wrapf(determineWriteErrorCause(err, 1), "store locally for endpoint %v", endpoint)}
return
}
ec <- nil
ec <- writeResponse{batch: r.batch, err: nil}
}(endpoint)

continue
Expand All @@ -686,7 +713,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e

cl, err = h.peers.get(fctx, endpoint)
if err != nil {
ec <- errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)
ec <- writeResponse{batch: r.batch, err: errors.Wrapf(err, "get peer connection for endpoint %v", endpoint)}
return
}

Expand All @@ -695,7 +722,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
if ok {
if time.Now().Before(b.nextAllowed) {
h.mtx.RUnlock()
ec <- errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)
ec <- writeResponse{batch: r.batch, err: errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpoint)}
return
}
}
Expand All @@ -705,7 +732,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
tracing.DoInSpan(fctx, "receive_forward", func(ctx context.Context) {
// Actually make the request against the endpoint we determined should handle these time series.
_, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{
Timeseries: wreqs[er].Timeseries,
Timeseries: wreqs[er].request.Timeseries,
Tenant: tenant,
// Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated.
Replica: int64(r.n + 1),
Expand All @@ -727,14 +754,14 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
h.mtx.Unlock()
}
}
ec <- errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)
ec <- writeResponse{batch: r.batch, err: errors.Wrapf(err, "forwarding request to endpoint %v", endpoint)}
return
}
h.mtx.Lock()
delete(h.peerStates, endpoint)
h.mtx.Unlock()

ec <- nil
ec <- writeResponse{batch: r.batch, err: nil}
}(endpoint)
}

Expand All @@ -747,34 +774,34 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
// This is needed if context is canceled or if we reached success of fail quorum faster.
defer func() {
go func() {
for err := range ec {
if err != nil {
level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", err)
for wresp := range ec {
if wresp.err != nil {
level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", wresp)
}
}
}()
}()

var success int
successCounts := make([]int, numBatches)
for {
select {
case <-fctx.Done():
return fctx.Err()
case err, more := <-ec:
case wr, more := <-ec:
if !more {
return errs.Err()
}
if err == nil {
success++
if success >= successThreshold {
// In case the success threshold is lower than the total
// number of requests, then we can finish early here. This
// is the case for quorum writes for example.
if wr.err == nil {
successCounts[wr.batch]++
// In case the success threshold is lower than the total
// number of requests, then we can finish early here. This
// is the case for quorum writes for example.
if quorumReached(successCounts, successThreshold) {
return nil
}
continue
}
errs.Add(err)
errs.Add(wr.err)
}
}
}
Expand All @@ -783,7 +810,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e
// selected by the tenant and time series.
// The function only returns when all replication requests have finished
// or the context is canceled.
func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.WriteRequest) error {
func (h *Handler) replicate(ctx context.Context, tenant string, wreq *writeRequest) error {
// It is possible that hashring is ready in testReady() but unready now,
// so need to lock here.
h.mtx.RLock()
Expand All @@ -792,9 +819,9 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri
return errors.New("hashring is not ready")
}

replicatedRequests := make(map[endpointReplica]*prompb.WriteRequest)
replicatedRequests := make(map[endpointReplica]*writeRequest)
for i := uint64(0); i < h.options.ReplicationFactor; i++ {
for _, ts := range wreq.Timeseries {
for _, ts := range wreq.request.Timeseries {
endpoint, err := h.hashring.GetN(tenant, &ts, i)
if err != nil {
h.mtx.RUnlock()
Expand All @@ -807,19 +834,22 @@ func (h *Handler) replicate(ctx context.Context, tenant string, wreq *prompb.Wri
}
replicatedRequest, ok := replicatedRequests[er]
if !ok {
replicatedRequest = &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, 0),
replicatedRequest = &writeRequest{
batch: wreq.batch,
request: &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, 0),
},
}
replicatedRequests[er] = replicatedRequest
}
replicatedRequest.Timeseries = append(replicatedRequest.Timeseries, ts)
replicatedRequest.request.Timeseries = append(replicatedRequest.request.Timeseries, ts)
}
}
h.mtx.RUnlock()

quorum := h.writeQuorum()
// fanoutForward only returns an error if successThreshold (quorum) is not reached.
if err := h.fanoutForward(ctx, tenant, replicatedRequests, quorum); err != nil {
if err := h.fanoutForward(ctx, tenant, replicatedRequests, 1, quorum); err != nil {
return errors.Wrap(determineWriteErrorCause(err, quorum), "quorum not reached")
}
return nil
Expand Down

0 comments on commit a37d49f

Please sign in to comment.