Skip to content

Commit

Permalink
Rename queryTermination to queryCompletion (#3000)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Jun 21, 2022
1 parent fdaa013 commit 231655c
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 212 deletions.
14 changes: 7 additions & 7 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,14 +786,14 @@ func (e *historyEngineImpl) QueryWorkflow(
weCtx.GetReleaseFn()(nil)
select {
case <-termCh:
state, err := queryReg.GetTerminationState(queryID)
completionState, err := queryReg.GetCompletionState(queryID)
if err != nil {
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
return nil, err
}
switch state.QueryTerminationType {
case workflow.QueryTerminationTypeCompleted:
result := state.QueryResult
switch completionState.Type {
case workflow.QueryCompletionTypeSucceeded:
result := completionState.Result
switch result.GetResultType() {
case enumspb.QUERY_RESULT_TYPE_ANSWERED:
return &historyservice.QueryWorkflowResponse{
Expand All @@ -807,15 +807,15 @@ func (e *historyEngineImpl) QueryWorkflow(
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
return nil, consts.ErrQueryEnteredInvalidState
}
case workflow.QueryTerminationTypeUnblocked:
case workflow.QueryCompletionTypeUnblocked:
msResp, err := e.getMutableState(ctx, workflowKey)
if err != nil {
return nil, err
}
req.Execution.RunId = msResp.Execution.RunId
return e.queryDirectlyThroughMatching(ctx, msResp, request.GetNamespaceId(), req, scope)
case workflow.QueryTerminationTypeFailed:
return nil, state.Failure
case workflow.QueryCompletionTypeFailed:
return nil, completionState.Err
default:
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
return nil, consts.ErrQueryEnteredInvalidState
Expand Down
36 changes: 18 additions & 18 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,18 +696,18 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Complete() {
buffered := qr.GetBufferedIDs()
for _, id := range buffered {
resultType := enumspb.QUERY_RESULT_TYPE_ANSWERED
completedTerminationState := &workflow.QueryTerminationState{
QueryTerminationType: workflow.QueryTerminationTypeCompleted,
QueryResult: &querypb.WorkflowQueryResult{
succeededCompletionState := &workflow.QueryCompletionState{
Type: workflow.QueryCompletionTypeSucceeded,
Result: &querypb.WorkflowQueryResult{
ResultType: resultType,
Answer: payloads.EncodeBytes(answer),
},
}
err := qr.SetTerminationState(id, completedTerminationState)
err := qr.SetCompletionState(id, succeededCompletionState)
s.NoError(err)
state, err := qr.GetTerminationState(id)
state, err := qr.GetCompletionState(id)
s.NoError(err)
s.Equal(workflow.QueryTerminationTypeCompleted, state.QueryTerminationType)
s.Equal(workflow.QueryCompletionTypeSucceeded, state.Type)
}
}

Expand Down Expand Up @@ -767,10 +767,10 @@ func (s *engineSuite) TestQueryWorkflow_WorkflowTaskDispatch_Unblocked() {
qr := builder.GetQueryRegistry()
buffered := qr.GetBufferedIDs()
for _, id := range buffered {
s.NoError(qr.SetTerminationState(id, &workflow.QueryTerminationState{QueryTerminationType: workflow.QueryTerminationTypeUnblocked}))
state, err := qr.GetTerminationState(id)
s.NoError(qr.SetCompletionState(id, &workflow.QueryCompletionState{Type: workflow.QueryCompletionTypeUnblocked}))
state, err := qr.GetCompletionState(id)
s.NoError(err)
s.Equal(workflow.QueryTerminationTypeUnblocked, state.QueryTerminationType)
s.Equal(workflow.QueryCompletionTypeUnblocked, state.Type)
}
}

Expand Down Expand Up @@ -4423,21 +4423,21 @@ func (s *engineSuite) TestRequestCancel_RespondWorkflowTaskCompleted_SuccessWith
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, executionBuilder.GetExecutionState().State)
s.False(executionBuilder.HasPendingWorkflowTask())
s.Len(qr.GetCompletedIDs(), 2)
completed1, err := qr.GetTerminationState(id1)
succeeded1, err := qr.GetCompletionState(id1)
s.NoError(err)
s.EqualValues(completed1.QueryResult, result1)
s.Equal(workflow.QueryTerminationTypeCompleted, completed1.QueryTerminationType)
completed2, err := qr.GetTerminationState(id2)
s.EqualValues(succeeded1.Result, result1)
s.Equal(workflow.QueryCompletionTypeSucceeded, succeeded1.Type)
succeeded2, err := qr.GetCompletionState(id2)
s.NoError(err)
s.EqualValues(completed2.QueryResult, result2)
s.Equal(workflow.QueryTerminationTypeCompleted, completed2.QueryTerminationType)
s.EqualValues(succeeded2.Result, result2)
s.Equal(workflow.QueryCompletionTypeSucceeded, succeeded2.Type)
s.Len(qr.GetBufferedIDs(), 0)
s.Len(qr.GetFailedIDs(), 0)
s.Len(qr.GetUnblockedIDs(), 1)
unblocked1, err := qr.GetTerminationState(id3)
unblocked1, err := qr.GetCompletionState(id3)
s.NoError(err)
s.Nil(unblocked1.QueryResult)
s.Equal(workflow.QueryTerminationTypeUnblocked, unblocked1.QueryTerminationType)
s.Nil(unblocked1.Result)
s.Equal(workflow.QueryCompletionTypeUnblocked, unblocked1.Type)

// Try recording activity heartbeat
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any(), gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil)
Expand Down
106 changes: 53 additions & 53 deletions service/history/workflow/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,117 +34,117 @@ import (
)

const (
QueryTerminationTypeCompleted QueryTerminationType = iota
QueryTerminationTypeUnblocked
QueryTerminationTypeFailed
QueryCompletionTypeSucceeded QueryCompletionType = iota
QueryCompletionTypeUnblocked
QueryCompletionTypeFailed
)

var (
errTerminationStateInvalid = serviceerror.NewInternal("query termination state invalid")
errAlreadyInTerminalState = serviceerror.NewInternal("query already in terminal state")
errQueryNotInTerminalState = serviceerror.NewInternal("query not in terminal state")
errCompletionStateInvalid = serviceerror.NewInternal("query completion state invalid")
errAlreadyInCompletionState = serviceerror.NewInternal("query is already in completion state")
errQueryNotInCompletionState = serviceerror.NewInternal("query is not in completion state")
)

type (
QueryTerminationType int
QueryCompletionType int

query interface {
getQueryID() string
getQueryTermCh() <-chan struct{}
getID() string
getCompletionCh() <-chan struct{}
getQueryInput() *querypb.WorkflowQuery
GetTerminationState() (*QueryTerminationState, error)
setTerminationState(*QueryTerminationState) error
GetCompletionState() (*QueryCompletionState, error)
setCompletionState(*QueryCompletionState) error
}

queryImpl struct {
id string
queryInput *querypb.WorkflowQuery
termCh chan struct{}
id string
queryInput *querypb.WorkflowQuery
completionCh chan struct{}

terminationState atomic.Value
completionState atomic.Value
}

QueryTerminationState struct {
QueryTerminationType QueryTerminationType
QueryResult *querypb.WorkflowQueryResult
Failure error
QueryCompletionState struct {
Type QueryCompletionType
Result *querypb.WorkflowQueryResult
Err error
}
)

func newQuery(queryInput *querypb.WorkflowQuery) query {
return &queryImpl{
id: uuid.New(),
queryInput: queryInput,
termCh: make(chan struct{}),
id: uuid.New(),
queryInput: queryInput,
completionCh: make(chan struct{}),
}
}

func (q *queryImpl) getQueryID() string {
func (q *queryImpl) getID() string {
return q.id
}

func (q *queryImpl) getQueryTermCh() <-chan struct{} {
return q.termCh
func (q *queryImpl) getCompletionCh() <-chan struct{} {
return q.completionCh
}

func (q *queryImpl) getQueryInput() *querypb.WorkflowQuery {
return q.queryInput
}

func (q *queryImpl) GetTerminationState() (*QueryTerminationState, error) {
ts := q.terminationState.Load()
func (q *queryImpl) GetCompletionState() (*QueryCompletionState, error) {
ts := q.completionState.Load()
if ts == nil {
return nil, errQueryNotInTerminalState
return nil, errQueryNotInCompletionState
}
return ts.(*QueryTerminationState), nil
return ts.(*QueryCompletionState), nil
}

func (q *queryImpl) setTerminationState(terminationState *QueryTerminationState) error {
if err := q.validateTerminationState(terminationState); err != nil {
func (q *queryImpl) setCompletionState(completionState *QueryCompletionState) error {
if err := q.validateCompletionState(completionState); err != nil {
return err
}
currTerminationState, _ := q.GetTerminationState()
if currTerminationState != nil {
return errAlreadyInTerminalState
currCompletionState, _ := q.GetCompletionState()
if currCompletionState != nil {
return errAlreadyInCompletionState
}
q.terminationState.Store(terminationState)
close(q.termCh)
q.completionState.Store(completionState)
close(q.completionCh)
return nil
}

func (q *queryImpl) validateTerminationState(
terminationState *QueryTerminationState,
func (q *queryImpl) validateCompletionState(
completionState *QueryCompletionState,
) error {
if terminationState == nil {
return errTerminationStateInvalid
if completionState == nil {
return errCompletionStateInvalid
}
switch terminationState.QueryTerminationType {
case QueryTerminationTypeCompleted:
if terminationState.QueryResult == nil || terminationState.Failure != nil {
return errTerminationStateInvalid
switch completionState.Type {
case QueryCompletionTypeSucceeded:
if completionState.Result == nil || completionState.Err != nil {
return errCompletionStateInvalid
}
queryResult := terminationState.QueryResult
queryResult := completionState.Result
validAnswered := queryResult.GetResultType() == enumspb.QUERY_RESULT_TYPE_ANSWERED &&
queryResult.Answer != nil &&
queryResult.GetErrorMessage() == ""
validFailed := queryResult.GetResultType() == enumspb.QUERY_RESULT_TYPE_FAILED &&
queryResult.Answer == nil &&
queryResult.GetErrorMessage() != ""
if !validAnswered && !validFailed {
return errTerminationStateInvalid
return errCompletionStateInvalid
}
return nil
case QueryTerminationTypeUnblocked:
if terminationState.QueryResult != nil || terminationState.Failure != nil {
return errTerminationStateInvalid
case QueryCompletionTypeUnblocked:
if completionState.Result != nil || completionState.Err != nil {
return errCompletionStateInvalid
}
return nil
case QueryTerminationTypeFailed:
if terminationState.QueryResult != nil || terminationState.Failure == nil {
return errTerminationStateInvalid
case QueryCompletionTypeFailed:
if completionState.Result != nil || completionState.Err == nil {
return errCompletionStateInvalid
}
return nil
default:
return errTerminationStateInvalid
return errCompletionStateInvalid
}
}
32 changes: 16 additions & 16 deletions service/history/workflow/query_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ type (

GetQueryTermCh(string) (<-chan struct{}, error)
GetQueryInput(string) (*querypb.WorkflowQuery, error)
GetTerminationState(string) (*QueryTerminationState, error)
GetCompletionState(string) (*QueryCompletionState, error)

BufferQuery(queryInput *querypb.WorkflowQuery) (string, <-chan struct{})
SetTerminationState(string, *QueryTerminationState) error
SetCompletionState(string, *QueryCompletionState) error
RemoveQuery(id string)
Clear()
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func (r *queryRegistryImpl) GetQueryTermCh(id string) (<-chan struct{}, error) {
if err != nil {
return nil, err
}
return q.getQueryTermCh(), nil
return q.getCompletionCh(), nil
}

func (r *queryRegistryImpl) GetQueryInput(id string) (*querypb.WorkflowQuery, error) {
Expand All @@ -145,42 +145,42 @@ func (r *queryRegistryImpl) GetQueryInput(id string) (*querypb.WorkflowQuery, er
return q.getQueryInput(), nil
}

func (r *queryRegistryImpl) GetTerminationState(id string) (*QueryTerminationState, error) {
func (r *queryRegistryImpl) GetCompletionState(id string) (*QueryCompletionState, error) {
r.RLock()
defer r.RUnlock()
q, err := r.getQueryNoLock(id)
if err != nil {
return nil, err
}
return q.GetTerminationState()
return q.GetCompletionState()
}

func (r *queryRegistryImpl) BufferQuery(queryInput *querypb.WorkflowQuery) (string, <-chan struct{}) {
r.Lock()
defer r.Unlock()
q := newQuery(queryInput)
id := q.getQueryID()
id := q.getID()
r.buffered[id] = q
return id, q.getQueryTermCh()
return id, q.getCompletionCh()
}

func (r *queryRegistryImpl) SetTerminationState(id string, terminationState *QueryTerminationState) error {
func (r *queryRegistryImpl) SetCompletionState(id string, completionState *QueryCompletionState) error {
r.Lock()
defer r.Unlock()
q, ok := r.buffered[id]
if !ok {
return errQueryNotExists
}
if err := q.setTerminationState(terminationState); err != nil {
if err := q.setCompletionState(completionState); err != nil {
return err
}
delete(r.buffered, id)
switch terminationState.QueryTerminationType {
case QueryTerminationTypeCompleted:
switch completionState.Type {
case QueryCompletionTypeSucceeded:
r.completed[id] = q
case QueryTerminationTypeUnblocked:
case QueryCompletionTypeUnblocked:
r.unblocked[id] = q
case QueryTerminationTypeFailed:
case QueryCompletionTypeFailed:
r.failed[id] = q
}
return nil
Expand All @@ -199,9 +199,9 @@ func (r *queryRegistryImpl) Clear() {
r.Lock()
defer r.Unlock()
for id, q := range r.buffered {
q.setTerminationState(&QueryTerminationState{
QueryTerminationType: QueryTerminationTypeFailed,
Failure: consts.ErrBufferedQueryCleared,
_ = q.setCompletionState(&QueryCompletionState{
Type: QueryCompletionTypeFailed,
Err: consts.ErrBufferedQueryCleared,
})
r.failed[id] = q
}
Expand Down
Loading

0 comments on commit 231655c

Please sign in to comment.