Skip to content

Commit

Permalink
Handle visibility task timeout in bulk operation (#2895)
Browse files Browse the repository at this point in the history
* Handle visibility task timeout in bulk operation

Co-authored-by: Alex Shtin <alex@shtin.com>
  • Loading branch information
yux0 and alexshtin committed Jun 2, 2022
1 parent 9ad9e72 commit 4c62f56
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 185 deletions.
3 changes: 0 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2201,8 +2201,6 @@ const (

ElasticsearchBulkProcessorBulkSize

ElasticsearchBulkProcessorDeadlock

NumHistoryMetrics
)

Expand Down Expand Up @@ -2681,7 +2679,6 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ElasticsearchBulkProcessorWaitAddLatency: NewTimerDef("elasticsearch_bulk_processor_wait_add_latency"),
ElasticsearchBulkProcessorWaitStartLatency: NewTimerDef("elasticsearch_bulk_processor_wait_start_latency"),
ElasticsearchBulkProcessorBulkSize: NewDimensionlessHistogramDef("elasticsearch_bulk_processor_bulk_size"),
ElasticsearchBulkProcessorDeadlock: NewCounterDef("elasticsearch_bulk_processor_deadlock"),
},
Matching: {
PollSuccessPerTaskQueueCounter: NewRollupCounterDef("poll_success_per_tl", "poll_success"),
Expand Down
106 changes: 51 additions & 55 deletions common/persistence/visibility/store/elasticsearch/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination processor_mock.go
// TODO: enable this after https://github.com/golang/mock/issues/621
// mockgen -copyright_file ../../../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination processor_mock.go

package elasticsearch

Expand All @@ -40,6 +41,7 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/future"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
Expand All @@ -53,7 +55,7 @@ type (
common.Daemon

// Add request to bulk processor.
Add(request *client.BulkableRequest, visibilityTaskKey string) <-chan bool
Add(request *client.BulkableRequest, visibilityTaskKey string) *future.FutureImpl[bool]
}

// processorImpl implements Processor, it's an agent of elastic.BulkProcessor
Expand All @@ -62,7 +64,7 @@ type (
bulkProcessor client.BulkProcessor
bulkProcessorParameters *client.BulkProcessorParameters
client client.Client
mapToAckChan collection.ConcurrentTxMap // used to map ES request to ack channel
mapToAckFuture collection.ConcurrentTxMap // used to map ES request to ack channel
logger log.Logger
metricsClient metrics.Client
indexerConcurrency uint32
Expand All @@ -80,11 +82,11 @@ type (
ESProcessorAckTimeout dynamicconfig.DurationPropertyFn
}

ackChan struct { // value of processorImpl.mapToAckChan
ackChInternal chan bool
createdAt time.Time // Time when request was created (used to report metrics).
addedAt atomic.Value // of time.Time // Time when request was added to bulk processor (used to report metrics).
startedAt time.Time // Time when request was sent to Elasticsearch by bulk processor (used to report metrics).
ackFuture struct { // value of processorImpl.mapToAckFuture
future *future.FutureImpl[bool]
createdAt time.Time // Time when request was created (used to report metrics).
addedAt atomic.Value // of time.Time // Time when request was added to bulk processor (used to report metrics).
startedAt time.Time // Time when request was sent to Elasticsearch by bulk processor (used to report metrics).
}
)

Expand Down Expand Up @@ -135,7 +137,7 @@ func (p *processorImpl) Start() {
}

var err error
p.mapToAckChan = collection.NewShardedConcurrentTxMap(1024, p.hashFn)
p.mapToAckFuture = collection.NewShardedConcurrentTxMap(1024, p.hashFn)
p.bulkProcessor, err = p.client.RunBulkProcessor(context.Background(), p.bulkProcessorParameters)
if err != nil {
p.logger.Fatal("Unable to start Elasticsearch processor.", tag.LifeCycleStartFailed, tag.Error(err))
Expand All @@ -155,7 +157,7 @@ func (p *processorImpl) Stop() {
if err != nil {
p.logger.Fatal("Unable to stop Elasticsearch processor.", tag.LifeCycleStopFailed, tag.Error(err))
}
p.mapToAckChan = nil
p.mapToAckFuture = nil
p.bulkProcessor = nil
}

Expand All @@ -169,46 +171,44 @@ func (p *processorImpl) hashFn(key interface{}) uint32 {
return hash % p.indexerConcurrency
}

// Add request to the bulk and return ack channel which will receive ack signal when request is processed.
func (p *processorImpl) Add(request *client.BulkableRequest, visibilityTaskKey string) <-chan bool {
ackCh := newAckChan()
_, isDup, _ := p.mapToAckChan.PutOrDo(visibilityTaskKey, ackCh, func(key interface{}, value interface{}) error {
ackChExisting, ok := value.(*ackChan)
// Add request to the bulk and return a future object which will receive ack signal when request is processed.
func (p *processorImpl) Add(request *client.BulkableRequest, visibilityTaskKey string) *future.FutureImpl[bool] {
newFuture := newAckFuture()
_, isDup, _ := p.mapToAckFuture.PutOrDo(visibilityTaskKey, newFuture, func(key interface{}, value interface{}) error {
existingFuture, ok := value.(*ackFuture)
if !ok {
p.logger.Fatal(fmt.Sprintf("mapToAckChan has item of a wrong type %T (%T expected).", value, &ackChan{}), tag.Value(key))
p.logger.Fatal(fmt.Sprintf("mapToAckFuture has item of a wrong type %T (%T expected).", value, &ackFuture{}), tag.Value(key))
}

p.logger.Warn("Skipping duplicate ES request for visibility task key.", tag.Key(visibilityTaskKey), tag.ESDocID(request.ID), tag.Value(request.Doc), tag.NewDurationTag("interval-between-duplicates", ackCh.createdAt.Sub(ackChExisting.createdAt)))
p.logger.Warn("Skipping duplicate ES request for visibility task key.", tag.Key(visibilityTaskKey), tag.ESDocID(request.ID), tag.Value(request.Doc), tag.NewDurationTag("interval-between-duplicates", newFuture.createdAt.Sub(existingFuture.createdAt)))
p.metricsClient.IncCounter(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorDuplicateRequest)

// Ack duplicate visibility task right away as if it is processed successfully.
ackCh.done(true, p.metricsClient)
newFuture = existingFuture
return nil
})
if !isDup {
p.bulkProcessor.Add(request)
ackCh.recordAdd(p.metricsClient)
newFuture.recordAdd(p.metricsClient)
}
return ackCh.ackChInternal
return newFuture.future
}

// bulkBeforeAction is triggered before bulk processor commit
func (p *processorImpl) bulkBeforeAction(_ int64, requests []elastic.BulkableRequest) {
p.metricsClient.AddCounter(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorRequests, int64(len(requests)))
p.metricsClient.RecordDistribution(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorBulkSize, len(requests))
p.metricsClient.RecordDistribution(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorQueuedRequests, p.mapToAckChan.Len()-len(requests))
p.metricsClient.RecordDistribution(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorQueuedRequests, p.mapToAckFuture.Len()-len(requests))

for _, request := range requests {
visibilityTaskKey := p.extractVisibilityTaskKey(request)
if visibilityTaskKey == "" {
continue
}
_, _, _ = p.mapToAckChan.GetAndDo(visibilityTaskKey, func(key interface{}, value interface{}) error {
ackCh, ok := value.(*ackChan)
_, _, _ = p.mapToAckFuture.GetAndDo(visibilityTaskKey, func(key interface{}, value interface{}) error {
future, ok := value.(*ackFuture)
if !ok {
p.logger.Fatal(fmt.Sprintf("mapToAckChan has item of a wrong type %T (%T expected).", value, &ackChan{}), tag.Value(key))
p.logger.Fatal(fmt.Sprintf("mapToAckFuture has item of a wrong type %T (%T expected).", value, &ackFuture{}), tag.Value(key))
}
ackCh.recordStart(p.metricsClient)
future.recordStart(p.metricsClient)
return nil
})
}
Expand All @@ -233,7 +233,7 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ
if visibilityTaskKey == "" {
continue
}
p.sendToAckChan(visibilityTaskKey, false)
p.notifyResult(visibilityTaskKey, false)
}
}
p.logger.Error("Unable to commit bulk ES request.", tag.Error(err), tag.IsRetryable(isRetryable), tag.RequestCount(len(requests)), tag.ESRequest(logRequests.String()))
Expand All @@ -256,13 +256,13 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ
tag.ESDocID(docID),
tag.ESRequest(request.String()))
p.metricsClient.IncCounter(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorCorruptedData)
p.sendToAckChan(visibilityTaskKey, false)
p.notifyResult(visibilityTaskKey, false)
continue
}

switch {
case isSuccess(responseItem):
p.sendToAckChan(visibilityTaskKey, true)
p.notifyResult(visibilityTaskKey, true)
case !client.IsRetryableStatus(responseItem.Status):
p.logger.Error("ES request failed.",
tag.ESResponseStatus(responseItem.Status),
Expand All @@ -271,7 +271,7 @@ func (p *processorImpl) bulkAfterAction(_ int64, requests []elastic.BulkableRequ
tag.ESDocID(docID),
tag.ESRequest(request.String()))
p.metricsClient.Scope(metrics.ElasticsearchBulkProcessor, metrics.HttpStatusTag(responseItem.Status)).IncCounter(metrics.ElasticsearchBulkProcessorFailures)
p.sendToAckChan(visibilityTaskKey, false)
p.notifyResult(visibilityTaskKey, false)
default: // bulk processor will retry
p.logger.Warn("ES request retried.",
tag.ESResponseStatus(responseItem.Status),
Expand Down Expand Up @@ -300,15 +300,15 @@ func (p *processorImpl) buildResponseIndex(response *elastic.BulkResponse) map[s
return result
}

func (p *processorImpl) sendToAckChan(visibilityTaskKey string, ack bool) {
func (p *processorImpl) notifyResult(visibilityTaskKey string, ack bool) {
// Use RemoveIf here to prevent race condition with de-dup logic in Add method.
_ = p.mapToAckChan.RemoveIf(visibilityTaskKey, func(key interface{}, value interface{}) bool {
ackCh, ok := value.(*ackChan)
_ = p.mapToAckFuture.RemoveIf(visibilityTaskKey, func(key interface{}, value interface{}) bool {
future, ok := value.(*ackFuture)
if !ok {
p.logger.Fatal(fmt.Sprintf("mapToAckChan has item of a wrong type %T (%T expected).", value, &ackChan{}), tag.ESKey(visibilityTaskKey))
p.logger.Fatal(fmt.Sprintf("mapToAckFuture has item of a wrong type %T (%T expected).", value, &ackFuture{}), tag.ESKey(visibilityTaskKey))
}

ackCh.done(ack, p.metricsClient)
future.done(ack, p.metricsClient)
return true
})
}
Expand Down Expand Up @@ -398,41 +398,37 @@ func extractErrorReason(resp *elastic.BulkResponseItem) string {
return ""
}

func newAckChan() *ackChan {
func newAckFuture() *ackFuture {
var addedAt atomic.Value
addedAt.Store(time.Time{})
return &ackChan{
ackChInternal: make(chan bool, 1),
createdAt: time.Now().UTC(),
addedAt: addedAt,
return &ackFuture{
future: future.NewFuture[bool](),
createdAt: time.Now().UTC(),
addedAt: addedAt,
}
}

func (a *ackChan) recordAdd(metricsClient metrics.Client) {
func (a *ackFuture) recordAdd(metricsClient metrics.Client) {
addedAt := time.Now().UTC()
a.addedAt.Store(addedAt)
metricsClient.RecordTimer(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorWaitAddLatency, addedAt.Sub(a.createdAt))
}

func (a *ackChan) recordStart(metricsClient metrics.Client) {
func (a *ackFuture) recordStart(metricsClient metrics.Client) {
a.startedAt = time.Now().UTC()
addedAt := a.addedAt.Load().(time.Time)
if !addedAt.IsZero() {
metricsClient.RecordTimer(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorWaitStartLatency, a.startedAt.Sub(addedAt))
}
}

func (a *ackChan) done(ack bool, metricsClient metrics.Client) {
select {
case a.ackChInternal <- ack:
doneAt := time.Now().UTC()
if !a.createdAt.IsZero() {
metricsClient.RecordTimer(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorRequestLatency, doneAt.Sub(a.createdAt))
}
if !a.startedAt.IsZero() {
metricsClient.RecordTimer(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorCommitLatency, doneAt.Sub(a.startedAt))
}
default:
metricsClient.IncCounter(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorDeadlock)
func (a *ackFuture) done(ack bool, metricsClient metrics.Client) {
a.future.Set(ack, nil)
doneAt := time.Now().UTC()
if !a.createdAt.IsZero() {
metricsClient.RecordTimer(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorRequestLatency, doneAt.Sub(a.createdAt))
}
if !a.startedAt.IsZero() {
metricsClient.RecordTimer(metrics.ElasticsearchBulkProcessor, metrics.ElasticsearchBulkProcessorCommitLatency, doneAt.Sub(a.startedAt))
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 4c62f56

Please sign in to comment.