Skip to content

Commit

Permalink
Remove internal bulk processor retries (#3739)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Dec 21, 2022
1 parent c6a89c0 commit 2b761b4
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 101 deletions.
1 change: 0 additions & 1 deletion common/metrics/metric_defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1682,7 +1682,6 @@ var (
BatcherOperationFailures = NewCounterDef("batcher_operation_errors")
ElasticsearchBulkProcessorRequests = NewCounterDef("elasticsearch_bulk_processor_requests")
ElasticsearchBulkProcessorQueuedRequests = NewDimensionlessHistogramDef("elasticsearch_bulk_processor_queued_requests")
ElasticsearchBulkProcessorRetries = NewCounterDef("elasticsearch_bulk_processor_retries")
ElasticsearchBulkProcessorFailures = NewCounterDef("elasticsearch_bulk_processor_errors")
ElasticsearchBulkProcessorCorruptedData = NewCounterDef("elasticsearch_bulk_processor_corrupted_data")
ElasticsearchBulkProcessorDuplicateRequest = NewCounterDef("elasticsearch_bulk_processor_duplicate_request")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type (
BulkActions int
BulkSize int
FlushInterval time.Duration
Backoff elastic.Backoff
BeforeFunc elastic.BulkBeforeFunc
AfterFunc elastic.BulkAfterFunc
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
enumspb "go.temporal.io/api/enums/v1"
)

Expand Down Expand Up @@ -74,10 +73,3 @@ func Test_BuildPutMappingBody(t *testing.T) {
assert.Equal(test.expected, fmt.Sprintf("%v", buildMappingBody(test.input)))
}
}

func TestIsResponseRetryable(t *testing.T) {
status := []int{408, 429, 500, 503, 507}
for _, code := range status {
require.True(t, IsRetryableStatus(code))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,10 @@ func (c *clientImpl) RunBulkProcessor(ctx context.Context, p *BulkProcessorParam
BulkActions(p.BulkActions).
BulkSize(p.BulkSize).
FlushInterval(p.FlushInterval).
Backoff(p.Backoff).
Before(p.BeforeFunc).
After(p.AfterFunc).
// Disable built-in retry logic because visibility task processor has its own.
RetryItemStatusCodes().
Do(ctx)

return newBulkProcessor(esBulkProcessor), err
Expand Down
57 changes: 0 additions & 57 deletions common/persistence/visibility/store/elasticsearch/client/errors.go

This file was deleted.

44 changes: 17 additions & 27 deletions common/persistence/visibility/store/elasticsearch/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ package elasticsearch
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -93,10 +94,7 @@ type (
var _ Processor = (*processorImpl)(nil)

const (
// retry configs for es bulk processor
esProcessorInitialRetryInterval = 200 * time.Millisecond
esProcessorMaxRetryInterval = 20 * time.Second
visibilityProcessorName = "visibility-processor"
visibilityProcessorName = "visibility-processor"
)

// NewProcessor create new processorImpl
Expand All @@ -119,7 +117,6 @@ func NewProcessor(
BulkActions: cfg.ESProcessorBulkActions(),
BulkSize: cfg.ESProcessorBulkSize(),
FlushInterval: cfg.ESProcessorFlushInterval(),
Backoff: elastic.NewExponentialBackoff(esProcessorInitialRetryInterval, esProcessorMaxRetryInterval),
},
}
p.bulkProcessorParameters.AfterFunc = p.bulkAfterAction
Expand Down Expand Up @@ -220,25 +217,26 @@ func (p *processorImpl) bulkBeforeAction(_ int64, requests []elastic.BulkableReq
func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) {
if err != nil {
const logFirstNRequests = 5
httpStatus := client.HttpStatus(err)
isRetryable := client.IsRetryableStatus(httpStatus)
var httpStatus int
var esErr *elastic.Error
if errors.As(err, &esErr) {
httpStatus = esErr.Status
}

var logRequests strings.Builder
for i, request := range requests {
if i < logFirstNRequests {
logRequests.WriteString(request.String())
logRequests.WriteRune('\n')
}
p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorFailures.GetMetricName()).Record(1, metrics.HttpStatusTag(httpStatus))

if !isRetryable {
visibilityTaskKey := p.extractVisibilityTaskKey(request)
if visibilityTaskKey == "" {
continue
}
p.notifyResult(visibilityTaskKey, false)
visibilityTaskKey := p.extractVisibilityTaskKey(request)
if visibilityTaskKey == "" {
continue
}
p.notifyResult(visibilityTaskKey, false)
}
p.logger.Error("Unable to commit bulk ES request.", tag.Error(err), tag.IsRetryable(isRetryable), tag.RequestCount(len(requests)), tag.ESRequest(logRequests.String()))
p.logger.Error("Unable to commit bulk ES request.", tag.Error(err), tag.RequestCount(len(requests)), tag.ESRequest(logRequests.String()))
return
}

Expand All @@ -262,10 +260,7 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ
continue
}

switch {
case isSuccess(responseItem):
p.notifyResult(visibilityTaskKey, true)
case !client.IsRetryableStatus(responseItem.Status):
if !isSuccess(responseItem) {
p.logger.Error("ES request failed.",
tag.ESResponseStatus(responseItem.Status),
tag.ESResponseError(extractErrorReason(responseItem)),
Expand All @@ -274,15 +269,10 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ
tag.ESRequest(request.String()))
p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorFailures.GetMetricName()).Record(1, metrics.HttpStatusTag(responseItem.Status))
p.notifyResult(visibilityTaskKey, false)
default: // bulk processor will retry
p.logger.Warn("ES request retried.",
tag.ESResponseStatus(responseItem.Status),
tag.ESResponseError(extractErrorReason(responseItem)),
tag.Key(visibilityTaskKey),
tag.ESDocID(docID),
tag.ESRequest(request.String()))
p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorRetries.GetMetricName()).Record(1, metrics.HttpStatusTag(responseItem.Status))
continue
}

p.notifyResult(visibilityTaskKey, true)
}

// Record how many documents are waiting to be flushed to Elasticsearch after this bulk is committed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ func (s *processorSuite) TestNewESProcessorAndStartStop() {
s.Equal(config.ESProcessorBulkActions(), input.BulkActions)
s.Equal(config.ESProcessorBulkSize(), input.BulkSize)
s.Equal(config.ESProcessorFlushInterval(), input.FlushInterval)
s.NotNil(input.Backoff)
s.NotNil(input.AfterFunc)

bulkProcessor := client.NewMockBulkProcessor(s.controller)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,15 @@ func (s *visibilityStore) addBulkRequestAndWait(
return &persistence.TimeoutError{Msg: fmt.Sprintf("visibility task %s timed out waiting for ACK after %v", visibilityTaskKey, s.processorAckTimeout())}
}
// Returns non-retryable Internal error here because these errors are unexpected.
// Visibility task processor retries all errors though, therefore new request will be generated for the same task.
// Visibility task processor retries all errors though, therefore new request will be generated for the same visibility task.
return serviceerror.NewInternal(fmt.Sprintf("visibility task %s received error %v", visibilityTaskKey, err))
}

if !ack {
// Returns non-retryable Internal error here because NACK from bulk processor means that this request can't be processed.
// Visibility task processor retries all errors though, therefore new request will be generated for the same task.
return serviceerror.NewInternal(fmt.Sprintf("visibility task %s received NACK", visibilityTaskKey))
// Returns retryable Unavailable error here because NACK from bulk processor
// means that this request wasn't processed successfully and needs to be retried.
// Visibility task processor retries all errors anyway, therefore new request will be generated for the same visibility task.
return serviceerror.NewUnavailable(fmt.Sprintf("visibility task %s received NACK", visibilityTaskKey))
}
return nil
}
Expand Down
19 changes: 18 additions & 1 deletion service/worker/addsearchattributes/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"context"
"errors"
"fmt"
"net/http"
"time"

"github.com/olivere/elastic/v7"
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
Expand Down Expand Up @@ -140,7 +142,8 @@ func (a *activities) AddESMappingFieldActivity(ctx context.Context, params Workf
_, err := a.esClient.PutMapping(ctx, params.IndexName, params.CustomAttributesToAdd)
if err != nil {
a.metricsHandler.Counter(metrics.AddSearchAttributesFailuresCount.GetMetricName()).Record(1)
if esclient.IsRetryableError(err) {

if a.isRetryableError(err) {
a.logger.Error("Unable to update Elasticsearch mapping (retryable error).", tag.ESIndex(params.IndexName), tag.Error(err))
return fmt.Errorf("%w: %v", ErrUnableToUpdateESMapping, err)
}
Expand All @@ -152,6 +155,20 @@ func (a *activities) AddESMappingFieldActivity(ctx context.Context, params Workf
return nil
}

func (a *activities) isRetryableError(err error) bool {
var esErr *elastic.Error
if !errors.As(err, &esErr) {
return true
}

switch esErr.Status {
case http.StatusBadRequest, http.StatusUnauthorized, http.StatusForbidden, http.StatusNotFound, http.StatusConflict:
return false
default:
return true
}
}

func (a *activities) WaitForYellowStatusActivity(ctx context.Context, indexName string) error {
if a.esClient == nil {
a.logger.Info("Elasticsearch client is not configured. Skipping Elasticsearch status check.")
Expand Down

0 comments on commit 2b761b4

Please sign in to comment.