Skip to content

Commit

Permalink
Fix ES bulk processor commit timeout (#3696)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Dec 20, 2022
1 parent 23a2c4e commit a7240b4
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 40 deletions.
14 changes: 8 additions & 6 deletions common/persistence/visibility/store/elasticsearch/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,20 +199,18 @@ func (p *processorImpl) bulkBeforeAction(_ int64, requests []elastic.BulkableReq
p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorRequests.GetMetricName()).Record(int64(len(requests)))
p.metricsHandler.Histogram(metrics.ElasticsearchBulkProcessorBulkSize.GetMetricName(), metrics.ElasticsearchBulkProcessorBulkSize.GetMetricUnit()).
Record(int64(len(requests)))
p.metricsHandler.Histogram(metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(), metrics.ElasticsearchBulkProcessorBulkSize.GetMetricUnit()).
Record(int64(p.mapToAckFuture.Len() - len(requests)))

for _, request := range requests {
visibilityTaskKey := p.extractVisibilityTaskKey(request)
if visibilityTaskKey == "" {
continue
}
_, _, _ = p.mapToAckFuture.GetAndDo(visibilityTaskKey, func(key interface{}, value interface{}) error {
future, ok := value.(*ackFuture)
ackF, ok := value.(*ackFuture)
if !ok {
p.logger.Fatal(fmt.Sprintf("mapToAckFuture has item of a wrong type %T (%T expected).", value, &ackFuture{}), tag.Value(key))
}
future.recordStart(p.metricsHandler)
ackF.recordStart(p.metricsHandler)
return nil
})
}
Expand Down Expand Up @@ -286,6 +284,10 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ
p.metricsHandler.Counter(metrics.ElasticsearchBulkProcessorRetries.GetMetricName()).Record(1, metrics.HttpStatusTag(responseItem.Status))
}
}

// Record how many documents are waiting to be flushed to Elasticsearch after this bulk is committed.
p.metricsHandler.Histogram(metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(), metrics.ElasticsearchBulkProcessorBulkSize.GetMetricUnit()).
Record(int64(p.mapToAckFuture.Len()))
}

func (p *processorImpl) buildResponseIndex(response *elastic.BulkResponse) map[string]*elastic.BulkResponseItem {
Expand All @@ -307,12 +309,12 @@ func (p *processorImpl) buildResponseIndex(response *elastic.BulkResponse) map[s
func (p *processorImpl) notifyResult(visibilityTaskKey string, ack bool) {
// Use RemoveIf here to prevent race condition with de-dup logic in Add method.
_ = p.mapToAckFuture.RemoveIf(visibilityTaskKey, func(key interface{}, value interface{}) bool {
future, ok := value.(*ackFuture)
ackF, ok := value.(*ackFuture)
if !ok {
p.logger.Fatal(fmt.Sprintf("mapToAckFuture has item of a wrong type %T (%T expected).", value, &ackFuture{}), tag.ESKey(visibilityTaskKey))
}

future.done(ack, p.metricsHandler)
ackF.done(ack, p.metricsHandler)
return true
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,12 @@ func (s *processorSuite) TestBulkAfterAction_Ack() {
Items: []map[string]*elastic.BulkResponseItem{mSuccess},
}

queuedRequestHistogram := metrics.NewMockHistogramIface(s.controller)
s.mockMetricHandler.EXPECT().Histogram(
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(),
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricUnit(),
).Return(queuedRequestHistogram)
queuedRequestHistogram.EXPECT().Record(int64(0))
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorRequestLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
mapVal := newAckFuture()
s.esProcessor.mapToAckFuture.Put(testKey, mapVal)
Expand Down Expand Up @@ -287,6 +293,12 @@ func (s *processorSuite) TestBulkAfterAction_Nack() {
Items: []map[string]*elastic.BulkResponseItem{mFailed},
}

queuedRequestHistogram := metrics.NewMockHistogramIface(s.controller)
s.mockMetricHandler.EXPECT().Histogram(
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(),
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricUnit(),
).Return(queuedRequestHistogram)
queuedRequestHistogram.EXPECT().Record(int64(0))
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorRequestLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
mapVal := newAckFuture()
s.esProcessor.mapToAckFuture.Put(testKey, mapVal)
Expand Down Expand Up @@ -352,12 +364,6 @@ func (s *processorSuite) TestBulkBeforeAction() {
metrics.ElasticsearchBulkProcessorBulkSize.GetMetricUnit(),
).Return(bulkSizeHistogram)
bulkSizeHistogram.EXPECT().Record(int64(1))
queuedRequestHistorgram := metrics.NewMockHistogramIface(s.controller)
s.mockMetricHandler.EXPECT().Histogram(
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricName(),
metrics.ElasticsearchBulkProcessorQueuedRequests.GetMetricUnit(),
).Return(queuedRequestHistorgram)
queuedRequestHistorgram.EXPECT().Record(int64(0))
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorWaitAddLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
s.mockMetricHandler.EXPECT().Timer(metrics.ElasticsearchBulkProcessorWaitStartLatency.GetMetricName()).Return(metrics.NoopTimerMetricFunc)
mapVal := newAckFuture()
Expand Down Expand Up @@ -484,7 +490,7 @@ func (s *processorSuite) TestErrorReasonFromResponse() {
func (s *processorSuite) Test_End2End() {
docsCount := 1000
parallelFactor := 10
version := int64(2208) //random
version := int64(2208) // random

request := &client.BulkableRequest{}
bulkIndexRequests := make([]elastic.BulkableRequest, docsCount)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,10 @@ import (
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch/client"
"go.temporal.io/server/common/persistence/visibility/store/query"
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/common/util"
)

const (
persistenceName = "elasticsearch"
PersistenceName = "elasticsearch"

delimiter = "~"
pointInTimeKeepAliveInterval = "1m"
Expand Down Expand Up @@ -128,7 +127,7 @@ func (s *visibilityStore) Close() {
}

func (s *visibilityStore) GetName() string {
return persistenceName
return PersistenceName
}

func (s *visibilityStore) RecordWorkflowExecutionStarted(
Expand Down Expand Up @@ -232,32 +231,30 @@ func (s *visibilityStore) addBulkIndexRequestAndWait(
}

func (s *visibilityStore) addBulkRequestAndWait(
ctx context.Context,
_ context.Context,
bulkRequest *client.BulkableRequest,
visibilityTaskKey string,
) error {
s.checkProcessor()

// Add method is blocking. If bulk processor is busy flushing previous bulk, request will wait here.
// Therefore, ackTimeoutTimer in fact wait for request to be committed after it was added to bulk processor.
// TODO: this also means ctx is not respected if bulk processor is busy. Shall we make Add non-blocking or
// respecting the context?
future := s.processor.Add(bulkRequest, visibilityTaskKey)

ackTimeout := s.processorAckTimeout()
if deadline, ok := ctx.Deadline(); ok {
ackTimeout = util.Min(ackTimeout, time.Until(deadline))
}
subCtx, subCtxCancelFn := context.WithTimeout(context.Background(), ackTimeout)
defer subCtxCancelFn()

ack, err := future.Get(subCtx)
ackF := s.processor.Add(bulkRequest, visibilityTaskKey)

if errors.Is(err, context.DeadlineExceeded) {
return &persistence.TimeoutError{Msg: fmt.Sprintf("visibility task %s timedout waiting for ACK after %v", visibilityTaskKey, s.processorAckTimeout())}
}
// processorAckTimeout is a maximum duration for bulk processor to commit the bulk and unblock the `ackF`.
// Default value is 30s and this timeout should never have happened,
// because Elasticsearch must process a bulk within 30s.
// Parent context is not respected here because it has shorter timeout (3s),
// which might already expired here due to wait at Add method above.
ctx, cancel := context.WithTimeout(context.Background(), s.processorAckTimeout())
defer cancel()
ack, err := ackF.Get(ctx)

if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
return &persistence.TimeoutError{Msg: fmt.Sprintf("visibility task %s timed out waiting for ACK after %v", visibilityTaskKey, s.processorAckTimeout())}
}
// Returns non-retryable Internal error here because these errors are unexpected.
// Visibility task processor retries all errors though, therefore new request will be generated for the same task.
return serviceerror.NewInternal(fmt.Sprintf("visibility task %s received error %v", visibilityTaskKey, err))
}

Expand Down Expand Up @@ -900,7 +897,7 @@ func (s *visibilityStore) parseESDoc(docID string, docSource json.RawMessage, sa
// Very important line. See finishParseJSONValue bellow.
d.UseNumber()
if err := d.Decode(&sourceMap); err != nil {
s.metricsHandler.Counter(metrics.ElasticsearchDocumentParseFailuresCount.GetMetricName()) //.Record(1)
s.metricsHandler.Counter(metrics.ElasticsearchDocumentParseFailuresCount.GetMetricName()).Record(1)
return nil, serviceerror.NewInternal(fmt.Sprintf("Unable to unmarshal JSON from Elasticsearch document(%s): %v", docID, err))
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,14 +498,14 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByNamespace(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024),
IndexerConcurrency: dc.GetIntProperty(dynamicconfig.WorkerIndexerConcurrency, 100),
ESProcessorNumOfWorkers: dc.GetIntProperty(dynamicconfig.WorkerESProcessorNumOfWorkers, 1),
// Should be not greater than NumberOfShards(512)/NumberOfHistoryNodes(4) * VisibilityTaskWorkerCount(10)/ESProcessorNumOfWorkers(1) divided by workflow distribution factor (2 at least).
// Should not be greater than number of visibility task queue workers VisibilityProcessorSchedulerWorkerCount (default 512)
// Otherwise, visibility queue processors won't be able to fill up bulk with documents (even under heavy load) and bulk will flush due to interval, not number of actions.
ESProcessorBulkActions: dc.GetIntProperty(dynamicconfig.WorkerESProcessorBulkActions, 500),
// 16MB - just a sanity check. With ES document size ~1Kb it should never be reached.
ESProcessorBulkSize: dc.GetIntProperty(dynamicconfig.WorkerESProcessorBulkSize, 16*1024*1024),
// Bulk processor will flush every this interval regardless of last flush due to bulk actions.
ESProcessorFlushInterval: dc.GetDurationProperty(dynamicconfig.WorkerESProcessorFlushInterval, 1*time.Second),
ESProcessorAckTimeout: dc.GetDurationProperty(dynamicconfig.WorkerESProcessorAckTimeout, 1*time.Minute),
ESProcessorAckTimeout: dc.GetDurationProperty(dynamicconfig.WorkerESProcessorAckTimeout, 30*time.Second),

EnableCrossNamespaceCommands: dc.GetBoolProperty(dynamicconfig.EnableCrossNamespaceCommands, true),
EnableActivityEagerExecution: dc.GetBoolPropertyFnWithNamespaceFilter(dynamicconfig.EnableActivityEagerExecution, false),
Expand Down
14 changes: 11 additions & 3 deletions service/history/visibilityQueueTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,10 @@ func (t *visibilityQueueTaskExecutor) upsertExecution(
}

func (t *visibilityQueueTaskExecutor) processCloseExecution(
ctx context.Context,
parentCtx context.Context,
task *tasks.CloseExecutionVisibilityTask,
) (retError error) {
ctx, cancel := context.WithTimeout(ctx, taskTimeout)
ctx, cancel := context.WithTimeout(parentCtx, taskTimeout)
defer cancel()

namespaceEntry, err := t.shard.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(task.GetNamespaceID()))
Expand Down Expand Up @@ -396,8 +396,13 @@ func (t *visibilityQueueTaskExecutor) processCloseExecution(
return err
}

// Elasticsearch bulk processor doesn't respect context timeout
// because under heavy load bulk flush might take longer than taskTimeout.
// Therefore, ctx timeout might be already expired
// and parentCtx (which doesn't have timeout) must be used everywhere bellow.

if t.enableCloseWorkflowCleanup(namespaceEntry.Name().String()) {
return t.cleanupExecutionInfo(ctx, task)
return t.cleanupExecutionInfo(parentCtx, task)
}
return nil
}
Expand Down Expand Up @@ -500,6 +505,9 @@ func (t *visibilityQueueTaskExecutor) cleanupExecutionInfo(
ctx context.Context,
task *tasks.CloseExecutionVisibilityTask,
) (retError error) {
ctx, cancel := context.WithTimeout(ctx, taskTimeout)
defer cancel()

weContext, release, err := getWorkflowExecutionContextForTask(ctx, t.cache, task)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/visibility/manager"
"go.temporal.io/server/common/persistence/visibility/store/elasticsearch"
"go.temporal.io/server/service/worker/deletenamespace/errors"
)

Expand All @@ -63,7 +64,7 @@ func NewActivities(
}
}
func (a *Activities) IsAdvancedVisibilityActivity(_ context.Context) (bool, error) {
return strings.Contains(a.visibilityManager.GetName(), "elasticsearch"), nil
return strings.Contains(a.visibilityManager.GetName(), elasticsearch.PersistenceName), nil
}

func (a *Activities) CountExecutionsAdvVisibilityActivity(ctx context.Context, nsID namespace.ID, nsName namespace.Name) (int64, error) {
Expand Down

0 comments on commit a7240b4

Please sign in to comment.