diff --git a/service/history/replication/task_executor.go b/service/history/replication/task_executor.go index 0c321e43996..85904488c9e 100644 --- a/service/history/replication/task_executor.go +++ b/service/history/replication/task_executor.go @@ -34,6 +34,8 @@ import ( "github.com/uber/cadence/service/history/shard" ) +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination task_executor_mock.go -self_package github.com/uber/cadence/service/history/replication + type ( // TaskExecutor is the executor for replication task TaskExecutor interface { diff --git a/service/history/replication/task_executor_mock.go b/service/history/replication/task_executor_mock.go new file mode 100644 index 00000000000..f6e38999c52 --- /dev/null +++ b/service/history/replication/task_executor_mock.go @@ -0,0 +1,73 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: task_executor.go + +// Package replication is a generated GoMock package. +package replication + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + + types "github.com/uber/cadence/common/types" +) + +// MockTaskExecutor is a mock of TaskExecutor interface. +type MockTaskExecutor struct { + ctrl *gomock.Controller + recorder *MockTaskExecutorMockRecorder +} + +// MockTaskExecutorMockRecorder is the mock recorder for MockTaskExecutor. +type MockTaskExecutorMockRecorder struct { + mock *MockTaskExecutor +} + +// NewMockTaskExecutor creates a new mock instance. +func NewMockTaskExecutor(ctrl *gomock.Controller) *MockTaskExecutor { + mock := &MockTaskExecutor{ctrl: ctrl} + mock.recorder = &MockTaskExecutorMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTaskExecutor) EXPECT() *MockTaskExecutorMockRecorder { + return m.recorder +} + +// execute mocks base method. +func (m *MockTaskExecutor) execute(replicationTask *types.ReplicationTask, forceApply bool) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "execute", replicationTask, forceApply) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// execute indicates an expected call of execute. +func (mr *MockTaskExecutorMockRecorder) execute(replicationTask, forceApply interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "execute", reflect.TypeOf((*MockTaskExecutor)(nil).execute), replicationTask, forceApply) +} diff --git a/service/history/replication/task_processor.go b/service/history/replication/task_processor.go index 3ff3d8098c9..aec99ab6103 100644 --- a/service/history/replication/task_processor.go +++ b/service/history/replication/task_processor.go @@ -443,9 +443,7 @@ func (p *taskProcessorImpl) processSingleTask(replicationTask *types.Replication func (p *taskProcessorImpl) processTaskOnce(replicationTask *types.ReplicationTask) error { ts := p.shard.GetTimeSource() startTime := ts.Now() - scope, err := p.taskExecutor.execute( - replicationTask, - false) + scope, err := p.taskExecutor.execute(replicationTask, false) if err != nil { p.updateFailureMetric(scope, err) diff --git a/service/history/replication/task_processor_test.go b/service/history/replication/task_processor_test.go index ae5d39b040e..cd4c2cfb216 100644 --- a/service/history/replication/task_processor_test.go +++ b/service/history/replication/task_processor_test.go @@ -23,6 +23,7 @@ package replication import ( "context" "encoding/json" + "errors" "testing" "time" @@ -39,6 +40,7 @@ import ( "github.com/uber/cadence/client/admin" "github.com/uber/cadence/client/frontend" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/metrics" @@ -68,8 +70,9 @@ type ( adminClient *admin.MockClient executionManager *mocks.ExecutionManager requestChan chan *request - - taskProcessor *taskProcessorImpl + taskFetcher *fakeTaskFetcher + taskExecutor *MockTaskExecutor + taskProcessor *taskProcessorImpl } ) @@ -113,30 +116,32 @@ func (s *taskProcessorSuite) SetupTest() { metricsClient := metrics.NewClient(tally.NoopScope, metrics.History) s.requestChan = make(chan *request, 10) - taskFetcher := &fakeTaskFetcher{ + s.taskFetcher = &fakeTaskFetcher{ sourceCluster: "standby", requestChan: s.requestChan, rateLimiter: quotas.NewDynamicRateLimiter(func() float64 { return 100 }), } + s.taskExecutor = NewMockTaskExecutor(s.controller) + s.taskProcessor = NewTaskProcessor( s.mockShard, s.mockEngine, s.config, metricsClient, - taskFetcher, - nil, + s.taskFetcher, + s.taskExecutor, ).(*taskProcessorImpl) } func (s *taskProcessorSuite) TearDownTest() { s.mockShard.Finish(s.T()) + goleak.VerifyNone(s.T()) } func (s *taskProcessorSuite) TestStartStop() { s.taskProcessor.Start() s.taskProcessor.Stop() - goleak.VerifyNone(s.T()) } func (s *taskProcessorSuite) TestProcessResponse_NoTask() { @@ -149,7 +154,7 @@ func (s *taskProcessorSuite) TestProcessResponse_NoTask() { s.Equal(int64(100), s.taskProcessor.lastRetrievedMessageID) } -func (s *taskProcessorSuite) TestSendFetchMessageRequest() { +func (s *taskProcessorSuite) TestProcessorLoop_RequestChanPopulated() { // start the process loop so it poppulates requestChan s.taskProcessor.wg.Add(1) go s.taskProcessor.processorLoop() @@ -167,6 +172,166 @@ func (s *taskProcessorSuite) TestSendFetchMessageRequest() { s.NotNil(requestMessage.respChan) } +func (s *taskProcessorSuite) TestProcessorLoop_RespChanClosed() { + // start the process loop + s.taskProcessor.wg.Add(1) + go s.taskProcessor.processorLoop() + defer close(s.taskProcessor.done) + + // act like taskFetcher here and populate respChan of the request + requestMessage := <-s.requestChan + close(requestMessage.respChan) + + // loop should have continued by now. validate by checking the new request + select { + case <-s.requestChan: + // expected + case <-time.After(50 * time.Millisecond): + s.Fail("new request not sent to requestChan") + } +} + +func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteSuccess() { + // taskExecutor will fail to execute the task + // returning a non-retriable task to keep mocking simpler + s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(0, nil).Times(1) + + // domain name will be fetched + s.mockDomainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).AnyTimes() + + // start the process loop + s.taskProcessor.wg.Add(1) + go s.taskProcessor.processorLoop() + + // act like taskFetcher here and populate respChan of the request + requestMessage := <-s.requestChan + requestMessage.respChan <- &types.ReplicationMessages{ + LastRetrievedMessageID: 100, + ReplicationTasks: []*types.ReplicationTask{ + { + TaskType: types.ReplicationTaskTypeSyncActivity.Ptr(), + SyncActivityTaskAttributes: &types.SyncActivityTaskAttributes{ + DomainID: testDomainID, + WorkflowID: testWorkflowID, + RunID: testRunID, + ScheduledID: testScheduleID, + }, + SourceTaskID: testTaskID, + }, + }, + } + + // wait a bit and terminate the loop + time.Sleep(50 * time.Millisecond) + close(s.taskProcessor.done) +} + +func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteFailed_PutDLQSuccess() { + // taskExecutor will fail to execute the task + // returning a non-retriable task to keep mocking simpler + s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(0, &types.BadRequestError{}).Times(1) + + // domain name will be fetched + s.mockDomainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).AnyTimes() + + // task will be put into dlq + dlqReq := &persistence.PutReplicationTaskToDLQRequest{ + SourceClusterName: "standby", // TODO move to a constant + TaskInfo: &persistence.ReplicationTaskInfo{ + DomainID: testDomainID, + WorkflowID: testWorkflowID, + RunID: testRunID, + TaskID: testTaskID, + TaskType: persistence.ReplicationTaskTypeSyncActivity, + ScheduledID: testScheduleID, + }, + DomainName: testDomainName, + } + s.mockShard.Resource.ExecutionMgr.On("PutReplicationTaskToDLQ", mock.Anything, dlqReq).Return(nil).Times(1) + + // start the process loop + s.taskProcessor.wg.Add(1) + go s.taskProcessor.processorLoop() + + // act like taskFetcher here and populate respChan of the request + requestMessage := <-s.requestChan + requestMessage.respChan <- &types.ReplicationMessages{ + LastRetrievedMessageID: 100, + ReplicationTasks: []*types.ReplicationTask{ + { + TaskType: types.ReplicationTaskTypeSyncActivity.Ptr(), + SyncActivityTaskAttributes: &types.SyncActivityTaskAttributes{ + DomainID: testDomainID, + WorkflowID: testWorkflowID, + RunID: testRunID, + ScheduledID: testScheduleID, + }, + SourceTaskID: testTaskID, + }, + }, + } + + // wait a bit and terminate the loop + time.Sleep(50 * time.Millisecond) + close(s.taskProcessor.done) +} + +func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteFailed_PutDLQFailed() { + // taskExecutor will fail to execute the task + // returning a non-retriable task to keep mocking simpler + s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(0, &types.BadRequestError{}).Times(1) + + // domain name will be fetched + s.mockDomainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).AnyTimes() + + // task will be put into dlq and will fail. It will be attempted 3 times. (first call + 2 retries based on policy overriden below) + dqlRetryPolicy := backoff.NewExponentialRetryPolicy(time.Millisecond) + dqlRetryPolicy.SetMaximumAttempts(2) + s.taskProcessor.dlqRetryPolicy = dqlRetryPolicy + dlqReq := &persistence.PutReplicationTaskToDLQRequest{ + SourceClusterName: "standby", // TODO move to a constant + TaskInfo: &persistence.ReplicationTaskInfo{ + DomainID: testDomainID, + WorkflowID: testWorkflowID, + RunID: testRunID, + TaskID: testTaskID, + TaskType: persistence.ReplicationTaskTypeSyncActivity, + ScheduledID: testScheduleID, + }, + DomainName: testDomainName, + } + s.mockShard.Resource.ExecutionMgr. + On("PutReplicationTaskToDLQ", mock.Anything, dlqReq). + Return(errors.New("failed to put to dlq")). + Times(3) + + // start the process loop + s.taskProcessor.wg.Add(1) + go s.taskProcessor.processorLoop() + + // act like taskFetcher here and populate respChan of the request + requestMessage := <-s.requestChan + requestMessage.respChan <- &types.ReplicationMessages{ + LastRetrievedMessageID: 100, + ReplicationTasks: []*types.ReplicationTask{ + { + TaskType: types.ReplicationTaskTypeSyncActivity.Ptr(), + SyncActivityTaskAttributes: &types.SyncActivityTaskAttributes{ + DomainID: testDomainID, + WorkflowID: testWorkflowID, + RunID: testRunID, + ScheduledID: testScheduleID, + }, + SourceTaskID: testTaskID, + }, + }, + } + + // wait a bit and terminate the loop + time.Sleep(50 * time.Millisecond) + close(s.taskProcessor.done) +} + func (s *taskProcessorSuite) TestHandleSyncShardStatus() { now := time.Now() s.mockEngine.EXPECT().SyncShardStatus(gomock.Any(), &types.SyncShardStatusRequest{