From 7b64aa0a07093584fc6ce230f30f68f6af3bb5f7 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Tue, 20 Dec 2022 23:44:27 -0800 Subject: [PATCH] Remove internal bulk processor retries (#3739) --- common/metrics/metric_defs.go | 1 - .../elasticsearch/client/bulk_processor.go | 1 - .../store/elasticsearch/client/client_test.go | 8 --- .../store/elasticsearch/client/client_v7.go | 3 +- .../store/elasticsearch/client/errors.go | 57 ------------------- .../store/elasticsearch/processor.go | 44 ++++++-------- .../store/elasticsearch/processor_test.go | 1 - .../store/elasticsearch/visibility_store.go | 9 +-- .../worker/addsearchattributes/workflow.go | 19 ++++++- 9 files changed, 42 insertions(+), 101 deletions(-) delete mode 100644 common/persistence/visibility/store/elasticsearch/client/errors.go diff --git a/common/metrics/metric_defs.go b/common/metrics/metric_defs.go index dbae56d9e55..f58a2a827e2 100644 --- a/common/metrics/metric_defs.go +++ b/common/metrics/metric_defs.go @@ -1670,7 +1670,6 @@ var ( BatcherOperationFailures = NewCounterDef("batcher_operation_errors") ElasticsearchBulkProcessorRequests = NewCounterDef("elasticsearch_bulk_processor_requests") ElasticsearchBulkProcessorQueuedRequests = NewDimensionlessHistogramDef("elasticsearch_bulk_processor_queued_requests") - ElasticsearchBulkProcessorRetries = NewCounterDef("elasticsearch_bulk_processor_retries") ElasticsearchBulkProcessorFailures = NewCounterDef("elasticsearch_bulk_processor_errors") ElasticsearchBulkProcessorCorruptedData = NewCounterDef("elasticsearch_bulk_processor_corrupted_data") ElasticsearchBulkProcessorDuplicateRequest = NewCounterDef("elasticsearch_bulk_processor_duplicate_request") diff --git a/common/persistence/visibility/store/elasticsearch/client/bulk_processor.go b/common/persistence/visibility/store/elasticsearch/client/bulk_processor.go index 9e639191e09..d424104180b 100644 --- a/common/persistence/visibility/store/elasticsearch/client/bulk_processor.go +++ b/common/persistence/visibility/store/elasticsearch/client/bulk_processor.go @@ -52,7 +52,6 @@ type ( BulkActions int BulkSize int FlushInterval time.Duration - Backoff elastic.Backoff BeforeFunc elastic.BulkBeforeFunc AfterFunc elastic.BulkAfterFunc } diff --git a/common/persistence/visibility/store/elasticsearch/client/client_test.go b/common/persistence/visibility/store/elasticsearch/client/client_test.go index b35cd9a29ba..ca1dd6eb40d 100644 --- a/common/persistence/visibility/store/elasticsearch/client/client_test.go +++ b/common/persistence/visibility/store/elasticsearch/client/client_test.go @@ -29,7 +29,6 @@ import ( "testing" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" enumspb "go.temporal.io/api/enums/v1" ) @@ -74,10 +73,3 @@ func Test_BuildPutMappingBody(t *testing.T) { assert.Equal(test.expected, fmt.Sprintf("%v", buildMappingBody(test.input))) } } - -func TestIsResponseRetryable(t *testing.T) { - status := []int{408, 429, 500, 503, 507} - for _, code := range status { - require.True(t, IsRetryableStatus(code)) - } -} diff --git a/common/persistence/visibility/store/elasticsearch/client/client_v7.go b/common/persistence/visibility/store/elasticsearch/client/client_v7.go index 6e6ca2272a1..168619b6240 100644 --- a/common/persistence/visibility/store/elasticsearch/client/client_v7.go +++ b/common/persistence/visibility/store/elasticsearch/client/client_v7.go @@ -225,9 +225,10 @@ func (c *clientImpl) RunBulkProcessor(ctx context.Context, p *BulkProcessorParam BulkActions(p.BulkActions). BulkSize(p.BulkSize). FlushInterval(p.FlushInterval). - Backoff(p.Backoff). Before(p.BeforeFunc). After(p.AfterFunc). + // Disable built-in retry logic because visibility task processor has its own. + RetryItemStatusCodes(). Do(ctx) return newBulkProcessor(esBulkProcessor), err diff --git a/common/persistence/visibility/store/elasticsearch/client/errors.go b/common/persistence/visibility/store/elasticsearch/client/errors.go deleted file mode 100644 index ccb4b0e4df2..00000000000 --- a/common/persistence/visibility/store/elasticsearch/client/errors.go +++ /dev/null @@ -1,57 +0,0 @@ -// The MIT License -// -// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package client - -import ( - "github.com/olivere/elastic/v7" -) - -func HttpStatus(err error) int { - switch e := err.(type) { - case *elastic.Error: - return e.Status - default: - return 0 - } -} - -// IsRetryableStatus is complaint with elastic.BulkProcessorService.RetryItemStatusCodes -// responses with these status will be kept in queue and retried until success -// 408 - Request Timeout -// 429 - Too Many Requests -// 500 - Node not connected -// 503 - Service Unavailable -// 507 - Insufficient Storage -func IsRetryableStatus(httpStatus int) bool { - switch httpStatus { - case 408, 429, 500, 503, 507: - return true - } - return false -} - -func IsRetryableError(err error) bool { - return IsRetryableStatus(HttpStatus(err)) -} diff --git a/common/persistence/visibility/store/elasticsearch/processor.go b/common/persistence/visibility/store/elasticsearch/processor.go index 15e4622acf6..ce10115565f 100644 --- a/common/persistence/visibility/store/elasticsearch/processor.go +++ b/common/persistence/visibility/store/elasticsearch/processor.go @@ -30,6 +30,7 @@ package elasticsearch import ( "context" "encoding/json" + "errors" "fmt" "strings" "sync/atomic" @@ -93,10 +94,7 @@ type ( var _ Processor = (*processorImpl)(nil) const ( - // retry configs for es bulk processor - esProcessorInitialRetryInterval = 200 * time.Millisecond - esProcessorMaxRetryInterval = 20 * time.Second - visibilityProcessorName = "visibility-processor" + visibilityProcessorName = "visibility-processor" ) // NewProcessor create new processorImpl @@ -119,7 +117,6 @@ func NewProcessor( BulkActions: cfg.ESProcessorBulkActions(), BulkSize: cfg.ESProcessorBulkSize(), FlushInterval: cfg.ESProcessorFlushInterval(), - Backoff: elastic.NewExponentialBackoff(esProcessorInitialRetryInterval, esProcessorMaxRetryInterval), }, } p.bulkProcessorParameters.AfterFunc = p.bulkAfterAction @@ -220,8 +217,12 @@ func (p *processorImpl) bulkBeforeAction(_ int64, requests []elastic.BulkableReq func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequest, response *elastic.BulkResponse, err error) { if err != nil { const logFirstNRequests = 5 - httpStatus := client.HttpStatus(err) - isRetryable := client.IsRetryableStatus(httpStatus) + var httpStatus int + var esErr *elastic.Error + if errors.As(err, &esErr) { + httpStatus = esErr.Status + } + var logRequests strings.Builder for i, request := range requests { if i < logFirstNRequests { @@ -229,16 +230,13 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ logRequests.WriteRune('\n') } p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorFailures.GetMetricName()).Record(1, metrics.HttpStatusTag(httpStatus)) - - if !isRetryable { - visibilityTaskKey := p.extractVisibilityTaskKey(request) - if visibilityTaskKey == "" { - continue - } - p.notifyResult(visibilityTaskKey, false) + visibilityTaskKey := p.extractVisibilityTaskKey(request) + if visibilityTaskKey == "" { + continue } + p.notifyResult(visibilityTaskKey, false) } - p.logger.Error("Unable to commit bulk ES request.", tag.Error(err), tag.IsRetryable(isRetryable), tag.RequestCount(len(requests)), tag.ESRequest(logRequests.String())) + p.logger.Error("Unable to commit bulk ES request.", tag.Error(err), tag.RequestCount(len(requests)), tag.ESRequest(logRequests.String())) return } @@ -262,10 +260,7 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ continue } - switch { - case isSuccess(responseItem): - p.notifyResult(visibilityTaskKey, true) - case !client.IsRetryableStatus(responseItem.Status): + if !isSuccess(responseItem) { p.logger.Error("ES request failed.", tag.ESResponseStatus(responseItem.Status), tag.ESResponseError(extractErrorReason(responseItem)), @@ -274,15 +269,10 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ tag.ESRequest(request.String())) p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorFailures.GetMetricName()).Record(1, metrics.HttpStatusTag(responseItem.Status)) p.notifyResult(visibilityTaskKey, false) - default: // bulk processor will retry - p.logger.Warn("ES request retried.", - tag.ESResponseStatus(responseItem.Status), - tag.ESResponseError(extractErrorReason(responseItem)), - tag.Key(visibilityTaskKey), - tag.ESDocID(docID), - tag.ESRequest(request.String())) - p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorRetries.GetMetricName()).Record(1, metrics.HttpStatusTag(responseItem.Status)) + continue } + + p.notifyResult(visibilityTaskKey, true) } // Record how many documents are waiting to be flushed to Elasticsearch after this bulk is committed. diff --git a/common/persistence/visibility/store/elasticsearch/processor_test.go b/common/persistence/visibility/store/elasticsearch/processor_test.go index 18d86e7d594..44a829ca061 100644 --- a/common/persistence/visibility/store/elasticsearch/processor_test.go +++ b/common/persistence/visibility/store/elasticsearch/processor_test.go @@ -113,7 +113,6 @@ func (s *processorSuite) TestNewESProcessorAndStartStop() { s.Equal(config.ESProcessorBulkActions(), input.BulkActions) s.Equal(config.ESProcessorBulkSize(), input.BulkSize) s.Equal(config.ESProcessorFlushInterval(), input.FlushInterval) - s.NotNil(input.Backoff) s.NotNil(input.AfterFunc) bulkProcessor := client.NewMockBulkProcessor(s.controller) diff --git a/common/persistence/visibility/store/elasticsearch/visibility_store.go b/common/persistence/visibility/store/elasticsearch/visibility_store.go index 2864d0225f8..886bf10a66d 100644 --- a/common/persistence/visibility/store/elasticsearch/visibility_store.go +++ b/common/persistence/visibility/store/elasticsearch/visibility_store.go @@ -254,14 +254,15 @@ func (s *visibilityStore) addBulkRequestAndWait( return &persistence.TimeoutError{Msg: fmt.Sprintf("visibility task %s timed out waiting for ACK after %v", visibilityTaskKey, s.processorAckTimeout())} } // Returns non-retryable Internal error here because these errors are unexpected. - // Visibility task processor retries all errors though, therefore new request will be generated for the same task. + // Visibility task processor retries all errors though, therefore new request will be generated for the same visibility task. return serviceerror.NewInternal(fmt.Sprintf("visibility task %s received error %v", visibilityTaskKey, err)) } if !ack { - // Returns non-retryable Internal error here because NACK from bulk processor means that this request can't be processed. - // Visibility task processor retries all errors though, therefore new request will be generated for the same task. - return serviceerror.NewInternal(fmt.Sprintf("visibility task %s received NACK", visibilityTaskKey)) + // Returns retryable Unavailable error here because NACK from bulk processor + // means that this request wasn't processed successfully and needs to be retried. + // Visibility task processor retries all errors anyway, therefore new request will be generated for the same visibility task. + return serviceerror.NewUnavailable(fmt.Sprintf("visibility task %s received NACK", visibilityTaskKey)) } return nil } diff --git a/service/worker/addsearchattributes/workflow.go b/service/worker/addsearchattributes/workflow.go index b9c8570819a..2f375d9656b 100644 --- a/service/worker/addsearchattributes/workflow.go +++ b/service/worker/addsearchattributes/workflow.go @@ -28,8 +28,10 @@ import ( "context" "errors" "fmt" + "net/http" "time" + "github.com/olivere/elastic/v7" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/workflow" @@ -140,7 +142,8 @@ func (a *activities) AddESMappingFieldActivity(ctx context.Context, params Workf _, err := a.esClient.PutMapping(ctx, params.IndexName, params.CustomAttributesToAdd) if err != nil { a.metricsHandler.Counter(metrics.AddSearchAttributesFailuresCount.GetMetricName()).Record(1) - if esclient.IsRetryableError(err) { + + if a.isRetryableError(err) { a.logger.Error("Unable to update Elasticsearch mapping (retryable error).", tag.ESIndex(params.IndexName), tag.Error(err)) return fmt.Errorf("%w: %v", ErrUnableToUpdateESMapping, err) } @@ -152,6 +155,20 @@ func (a *activities) AddESMappingFieldActivity(ctx context.Context, params Workf return nil } +func (a *activities) isRetryableError(err error) bool { + var esErr *elastic.Error + if !errors.As(err, &esErr) { + return true + } + + switch esErr.Status { + case http.StatusBadRequest, http.StatusUnauthorized, http.StatusForbidden, http.StatusNotFound, http.StatusConflict: + return false + default: + return true + } +} + func (a *activities) WaitForYellowStatusActivity(ctx context.Context, indexName string) error { if a.esClient == nil { a.logger.Info("Elasticsearch client is not configured. Skipping Elasticsearch status check.")