Skip to content

Commit

Permalink
Persistence context part 9: elasticsearch visibility store (#2713)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Apr 13, 2022
1 parent 1686461 commit ca4fa9e
Showing 1 changed file with 34 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"

"go.temporal.io/server/common"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -119,10 +120,6 @@ func NewVisibilityStore(
}
}

func newReadContext() (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), readTimeout)
}

func (s *visibilityStore) Close() {
// TODO (alex): visibilityStore shouldn't Stop processor. Processor should be stopped where it is created.
if s.processor != nil {
Expand All @@ -135,7 +132,7 @@ func (s *visibilityStore) GetName() string {
}

func (s *visibilityStore) RecordWorkflowExecutionStarted(
_ context.Context,
ctx context.Context,
request *store.InternalRecordWorkflowExecutionStartedRequest,
) error {
visibilityTaskKey := getVisibilityTaskKey(request.ShardID, request.TaskID)
Expand All @@ -144,11 +141,11 @@ func (s *visibilityStore) RecordWorkflowExecutionStarted(
return err
}

return s.addBulkIndexRequestAndWait(request.InternalVisibilityRequestBase, doc, visibilityTaskKey)
return s.addBulkIndexRequestAndWait(ctx, request.InternalVisibilityRequestBase, doc, visibilityTaskKey)
}

func (s *visibilityStore) RecordWorkflowExecutionClosed(
_ context.Context,
ctx context.Context,
request *store.InternalRecordWorkflowExecutionClosedRequest,
) error {
visibilityTaskKey := getVisibilityTaskKey(request.ShardID, request.TaskID)
Expand All @@ -162,11 +159,11 @@ func (s *visibilityStore) RecordWorkflowExecutionClosed(
doc[searchattribute.HistoryLength] = request.HistoryLength
doc[searchattribute.StateTransitionCount] = request.StateTransitionCount

return s.addBulkIndexRequestAndWait(request.InternalVisibilityRequestBase, doc, visibilityTaskKey)
return s.addBulkIndexRequestAndWait(ctx, request.InternalVisibilityRequestBase, doc, visibilityTaskKey)
}

func (s *visibilityStore) UpsertWorkflowExecution(
_ context.Context,
ctx context.Context,
request *store.InternalUpsertWorkflowExecutionRequest,
) error {
visibilityTaskKey := getVisibilityTaskKey(request.ShardID, request.TaskID)
Expand All @@ -175,11 +172,11 @@ func (s *visibilityStore) UpsertWorkflowExecution(
return err
}

return s.addBulkIndexRequestAndWait(request.InternalVisibilityRequestBase, doc, visibilityTaskKey)
return s.addBulkIndexRequestAndWait(ctx, request.InternalVisibilityRequestBase, doc, visibilityTaskKey)
}

func (s *visibilityStore) DeleteWorkflowExecution(
_ context.Context,
ctx context.Context,
request *manager.VisibilityDeleteWorkflowExecutionRequest,
) error {
docID := getDocID(request.WorkflowID, request.RunID)
Expand All @@ -191,7 +188,7 @@ func (s *visibilityStore) DeleteWorkflowExecution(
RequestType: client.BulkableRequestTypeDelete,
}

return s.addBulkRequestAndWait(bulkDeleteRequest, docID)
return s.addBulkRequestAndWait(ctx, bulkDeleteRequest, docID)
}

func getDocID(workflowID string, runID string) string {
Expand All @@ -217,6 +214,7 @@ func getVisibilityTaskKey(shardID int32, taskID int64) string {
}

func (s *visibilityStore) addBulkIndexRequestAndWait(
ctx context.Context,
request *store.InternalVisibilityRequestBase,
esDoc map[string]interface{},
visibilityTaskKey string,
Expand All @@ -229,16 +227,27 @@ func (s *visibilityStore) addBulkIndexRequestAndWait(
Doc: esDoc,
}

return s.addBulkRequestAndWait(bulkIndexRequest, visibilityTaskKey)
return s.addBulkRequestAndWait(ctx, bulkIndexRequest, visibilityTaskKey)
}

func (s *visibilityStore) addBulkRequestAndWait(bulkRequest *client.BulkableRequest, visibilityTaskKey string) error {
func (s *visibilityStore) addBulkRequestAndWait(
ctx context.Context,
bulkRequest *client.BulkableRequest,
visibilityTaskKey string,
) error {
s.checkProcessor()

// Add method is blocking. If bulk processor is busy flushing previous bulk, request will wait here.
// Therefore, ackTimeoutTimer in fact wait for request to be committed after it was added to bulk processor.
// TODO: this also means ctx is not respected if bulk processor is busy. Shall we make Add non-blocking or
// respecting the context?
ackCh := s.processor.Add(bulkRequest, visibilityTaskKey)
ackTimeoutTimer := time.NewTimer(s.processorAckTimeout())

ackTimeout := s.processorAckTimeout()
if deadline, ok := ctx.Deadline(); ok {
ackTimeout = common.MinDuration(ackTimeout, time.Until(deadline))
}
ackTimeoutTimer := time.NewTimer(ackTimeout)
defer ackTimeoutTimer.Stop()

select {
Expand Down Expand Up @@ -266,7 +275,7 @@ func (s *visibilityStore) checkProcessor() {
}

func (s *visibilityStore) ListOpenWorkflowExecutions(
_ context.Context,
ctx context.Context,
request *manager.ListWorkflowExecutionsRequest,
) (*store.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -278,8 +287,6 @@ func (s *visibilityStore) ListOpenWorkflowExecutions(
return nil, err
}

ctx, cancel := newReadContext()
defer cancel()
searchResult, err := s.esClient.Search(ctx, p)
if err != nil {
return nil, convertElasticsearchClientError("ListOpenWorkflowExecutions failed", err)
Expand All @@ -293,7 +300,7 @@ func (s *visibilityStore) ListOpenWorkflowExecutions(
}

func (s *visibilityStore) ListClosedWorkflowExecutions(
_ context.Context,
ctx context.Context,
request *manager.ListWorkflowExecutionsRequest,
) (*store.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -305,8 +312,6 @@ func (s *visibilityStore) ListClosedWorkflowExecutions(
return nil, err
}

ctx, cancel := newReadContext()
defer cancel()
searchResult, err := s.esClient.Search(ctx, p)
if err != nil {
return nil, convertElasticsearchClientError("ListClosedWorkflowExecutions failed", err)
Expand All @@ -320,7 +325,7 @@ func (s *visibilityStore) ListClosedWorkflowExecutions(
}

func (s *visibilityStore) ListOpenWorkflowExecutionsByType(
_ context.Context,
ctx context.Context,
request *manager.ListWorkflowExecutionsByTypeRequest,
) (*store.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -334,8 +339,6 @@ func (s *visibilityStore) ListOpenWorkflowExecutionsByType(
return nil, err
}

ctx, cancel := newReadContext()
defer cancel()
searchResult, err := s.esClient.Search(ctx, p)
if err != nil {
return nil, convertElasticsearchClientError("ListOpenWorkflowExecutionsByType failed", err)
Expand All @@ -349,7 +352,7 @@ func (s *visibilityStore) ListOpenWorkflowExecutionsByType(
}

func (s *visibilityStore) ListClosedWorkflowExecutionsByType(
_ context.Context,
ctx context.Context,
request *manager.ListWorkflowExecutionsByTypeRequest,
) (*store.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -362,8 +365,6 @@ func (s *visibilityStore) ListClosedWorkflowExecutionsByType(
return nil, err
}

ctx, cancel := newReadContext()
defer cancel()
searchResult, err := s.esClient.Search(ctx, p)
if err != nil {
return nil, convertElasticsearchClientError("ListClosedWorkflowExecutionsByType failed", err)
Expand All @@ -377,7 +378,7 @@ func (s *visibilityStore) ListClosedWorkflowExecutionsByType(
}

func (s *visibilityStore) ListOpenWorkflowExecutionsByWorkflowID(
_ context.Context,
ctx context.Context,
request *manager.ListWorkflowExecutionsByWorkflowIDRequest,
) (*store.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -391,8 +392,6 @@ func (s *visibilityStore) ListOpenWorkflowExecutionsByWorkflowID(
return nil, err
}

ctx, cancel := newReadContext()
defer cancel()
searchResult, err := s.esClient.Search(ctx, p)
if err != nil {
return nil, convertElasticsearchClientError("ListOpenWorkflowExecutionsByWorkflowID failed", err)
Expand All @@ -406,7 +405,7 @@ func (s *visibilityStore) ListOpenWorkflowExecutionsByWorkflowID(
}

func (s *visibilityStore) ListClosedWorkflowExecutionsByWorkflowID(
_ context.Context,
ctx context.Context,
request *manager.ListWorkflowExecutionsByWorkflowIDRequest,
) (*store.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -419,8 +418,6 @@ func (s *visibilityStore) ListClosedWorkflowExecutionsByWorkflowID(
return nil, err
}

ctx, cancel := newReadContext()
defer cancel()
searchResult, err := s.esClient.Search(ctx, p)
if err != nil {
return nil, convertElasticsearchClientError("ListClosedWorkflowExecutionsByWorkflowID failed", err)
Expand All @@ -434,7 +431,7 @@ func (s *visibilityStore) ListClosedWorkflowExecutionsByWorkflowID(
}

func (s *visibilityStore) ListClosedWorkflowExecutionsByStatus(
_ context.Context,
ctx context.Context,
request *manager.ListClosedWorkflowExecutionsByStatusRequest,
) (*store.InternalListWorkflowExecutionsResponse, error) {

Expand All @@ -446,8 +443,6 @@ func (s *visibilityStore) ListClosedWorkflowExecutionsByStatus(
return nil, err
}

ctx, cancel := newReadContext()
defer cancel()
searchResult, err := s.esClient.Search(ctx, p)
if err != nil {
return nil, convertElasticsearchClientError("ListClosedWorkflowExecutionsByStatus failed", err)
Expand All @@ -461,7 +456,7 @@ func (s *visibilityStore) ListClosedWorkflowExecutionsByStatus(
}

func (s *visibilityStore) ListWorkflowExecutions(
_ context.Context,
ctx context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
) (*store.InternalListWorkflowExecutionsResponse, error) {
p, err := s.buildSearchParametersV2(request)
Expand All @@ -478,8 +473,6 @@ func (s *visibilityStore) ListWorkflowExecutions(
p.SearchAfter = token.SearchAfter
}

ctx, cancel := newReadContext()
defer cancel()
searchResult, err := s.esClient.Search(ctx, p)
if err != nil {
return nil, convertElasticsearchClientError("ListWorkflowExecutions failed", err)
Expand All @@ -489,12 +482,9 @@ func (s *visibilityStore) ListWorkflowExecutions(
}

func (s *visibilityStore) ScanWorkflowExecutions(
_ context.Context,
ctx context.Context,
request *manager.ListWorkflowExecutionsRequestV2,
) (*store.InternalListWorkflowExecutionsResponse, error) {
ctx, cancel := newReadContext()
defer cancel()

if esClientV7, isV7 := s.esClient.(client.ClientV7); isV7 {
// Elasticsearch 7.10+ can use "point in time" (PIT) instead of scroll to scan over all workflows without skipping or duplicating them.
// https://www.elastic.co/guide/en/elasticsearch/reference/7.10/point-in-time-api.html
Expand Down Expand Up @@ -589,16 +579,14 @@ func (s *visibilityStore) scanWorkflowExecutionsWithScroll(ctx context.Context,
}

func (s *visibilityStore) CountWorkflowExecutions(
_ context.Context,
ctx context.Context,
request *manager.CountWorkflowExecutionsRequest,
) (*manager.CountWorkflowExecutionsResponse, error) {
boolQuery, _, err := s.convertQuery(request.Namespace, request.NamespaceID, request.Query)
if err != nil {
return nil, err
}

ctx, cancel := newReadContext()
defer cancel()
count, err := s.esClient.Count(ctx, s.index, boolQuery)
if err != nil {
return nil, convertElasticsearchClientError("CountWorkflowExecutions failed", err)
Expand Down

0 comments on commit ca4fa9e

Please sign in to comment.