Skip to content

Commit

Permalink
Write tests for replication task processor main loop (#6010)
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir committed May 11, 2024
1 parent 795aec1 commit 957fd1a
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 10 deletions.
2 changes: 2 additions & 0 deletions service/history/replication/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/uber/cadence/service/history/shard"
)

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination task_executor_mock.go -self_package github.com/uber/cadence/service/history/replication

type (
// TaskExecutor is the executor for replication task
TaskExecutor interface {
Expand Down
73 changes: 73 additions & 0 deletions service/history/replication/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,7 @@ func (p *taskProcessorImpl) processSingleTask(replicationTask *types.Replication
func (p *taskProcessorImpl) processTaskOnce(replicationTask *types.ReplicationTask) error {
ts := p.shard.GetTimeSource()
startTime := ts.Now()
scope, err := p.taskExecutor.execute(
replicationTask,
false)
scope, err := p.taskExecutor.execute(replicationTask, false)

if err != nil {
p.updateFailureMetric(scope, err)
Expand Down
179 changes: 172 additions & 7 deletions service/history/replication/task_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package replication
import (
"context"
"encoding/json"
"errors"
"testing"
"time"

Expand All @@ -39,6 +40,7 @@ import (
"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -68,8 +70,9 @@ type (
adminClient *admin.MockClient
executionManager *mocks.ExecutionManager
requestChan chan *request

taskProcessor *taskProcessorImpl
taskFetcher *fakeTaskFetcher
taskExecutor *MockTaskExecutor
taskProcessor *taskProcessorImpl
}
)

Expand Down Expand Up @@ -113,30 +116,32 @@ func (s *taskProcessorSuite) SetupTest() {
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
s.requestChan = make(chan *request, 10)

taskFetcher := &fakeTaskFetcher{
s.taskFetcher = &fakeTaskFetcher{
sourceCluster: "standby",
requestChan: s.requestChan,
rateLimiter: quotas.NewDynamicRateLimiter(func() float64 { return 100 }),
}

s.taskExecutor = NewMockTaskExecutor(s.controller)

s.taskProcessor = NewTaskProcessor(
s.mockShard,
s.mockEngine,
s.config,
metricsClient,
taskFetcher,
nil,
s.taskFetcher,
s.taskExecutor,
).(*taskProcessorImpl)
}

func (s *taskProcessorSuite) TearDownTest() {
s.mockShard.Finish(s.T())
goleak.VerifyNone(s.T())
}

func (s *taskProcessorSuite) TestStartStop() {
s.taskProcessor.Start()
s.taskProcessor.Stop()
goleak.VerifyNone(s.T())
}

func (s *taskProcessorSuite) TestProcessResponse_NoTask() {
Expand All @@ -149,7 +154,7 @@ func (s *taskProcessorSuite) TestProcessResponse_NoTask() {
s.Equal(int64(100), s.taskProcessor.lastRetrievedMessageID)
}

func (s *taskProcessorSuite) TestSendFetchMessageRequest() {
func (s *taskProcessorSuite) TestProcessorLoop_RequestChanPopulated() {
// start the process loop so it poppulates requestChan
s.taskProcessor.wg.Add(1)
go s.taskProcessor.processorLoop()
Expand All @@ -167,6 +172,166 @@ func (s *taskProcessorSuite) TestSendFetchMessageRequest() {
s.NotNil(requestMessage.respChan)
}

func (s *taskProcessorSuite) TestProcessorLoop_RespChanClosed() {
// start the process loop
s.taskProcessor.wg.Add(1)
go s.taskProcessor.processorLoop()
defer close(s.taskProcessor.done)

// act like taskFetcher here and populate respChan of the request
requestMessage := <-s.requestChan
close(requestMessage.respChan)

// loop should have continued by now. validate by checking the new request
select {
case <-s.requestChan:
// expected
case <-time.After(50 * time.Millisecond):
s.Fail("new request not sent to requestChan")
}
}

func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteSuccess() {
// taskExecutor will fail to execute the task
// returning a non-retriable task to keep mocking simpler
s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(0, nil).Times(1)

// domain name will be fetched
s.mockDomainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).AnyTimes()

// start the process loop
s.taskProcessor.wg.Add(1)
go s.taskProcessor.processorLoop()

// act like taskFetcher here and populate respChan of the request
requestMessage := <-s.requestChan
requestMessage.respChan <- &types.ReplicationMessages{
LastRetrievedMessageID: 100,
ReplicationTasks: []*types.ReplicationTask{
{
TaskType: types.ReplicationTaskTypeSyncActivity.Ptr(),
SyncActivityTaskAttributes: &types.SyncActivityTaskAttributes{
DomainID: testDomainID,
WorkflowID: testWorkflowID,
RunID: testRunID,
ScheduledID: testScheduleID,
},
SourceTaskID: testTaskID,
},
},
}

// wait a bit and terminate the loop
time.Sleep(50 * time.Millisecond)
close(s.taskProcessor.done)
}

func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteFailed_PutDLQSuccess() {
// taskExecutor will fail to execute the task
// returning a non-retriable task to keep mocking simpler
s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(0, &types.BadRequestError{}).Times(1)

// domain name will be fetched
s.mockDomainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).AnyTimes()

// task will be put into dlq
dlqReq := &persistence.PutReplicationTaskToDLQRequest{
SourceClusterName: "standby", // TODO move to a constant
TaskInfo: &persistence.ReplicationTaskInfo{
DomainID: testDomainID,
WorkflowID: testWorkflowID,
RunID: testRunID,
TaskID: testTaskID,
TaskType: persistence.ReplicationTaskTypeSyncActivity,
ScheduledID: testScheduleID,
},
DomainName: testDomainName,
}
s.mockShard.Resource.ExecutionMgr.On("PutReplicationTaskToDLQ", mock.Anything, dlqReq).Return(nil).Times(1)

// start the process loop
s.taskProcessor.wg.Add(1)
go s.taskProcessor.processorLoop()

// act like taskFetcher here and populate respChan of the request
requestMessage := <-s.requestChan
requestMessage.respChan <- &types.ReplicationMessages{
LastRetrievedMessageID: 100,
ReplicationTasks: []*types.ReplicationTask{
{
TaskType: types.ReplicationTaskTypeSyncActivity.Ptr(),
SyncActivityTaskAttributes: &types.SyncActivityTaskAttributes{
DomainID: testDomainID,
WorkflowID: testWorkflowID,
RunID: testRunID,
ScheduledID: testScheduleID,
},
SourceTaskID: testTaskID,
},
},
}

// wait a bit and terminate the loop
time.Sleep(50 * time.Millisecond)
close(s.taskProcessor.done)
}

func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteFailed_PutDLQFailed() {
// taskExecutor will fail to execute the task
// returning a non-retriable task to keep mocking simpler
s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(0, &types.BadRequestError{}).Times(1)

// domain name will be fetched
s.mockDomainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).AnyTimes()

// task will be put into dlq and will fail. It will be attempted 3 times. (first call + 2 retries based on policy overriden below)
dqlRetryPolicy := backoff.NewExponentialRetryPolicy(time.Millisecond)
dqlRetryPolicy.SetMaximumAttempts(2)
s.taskProcessor.dlqRetryPolicy = dqlRetryPolicy
dlqReq := &persistence.PutReplicationTaskToDLQRequest{
SourceClusterName: "standby", // TODO move to a constant
TaskInfo: &persistence.ReplicationTaskInfo{
DomainID: testDomainID,
WorkflowID: testWorkflowID,
RunID: testRunID,
TaskID: testTaskID,
TaskType: persistence.ReplicationTaskTypeSyncActivity,
ScheduledID: testScheduleID,
},
DomainName: testDomainName,
}
s.mockShard.Resource.ExecutionMgr.
On("PutReplicationTaskToDLQ", mock.Anything, dlqReq).
Return(errors.New("failed to put to dlq")).
Times(3)

// start the process loop
s.taskProcessor.wg.Add(1)
go s.taskProcessor.processorLoop()

// act like taskFetcher here and populate respChan of the request
requestMessage := <-s.requestChan
requestMessage.respChan <- &types.ReplicationMessages{
LastRetrievedMessageID: 100,
ReplicationTasks: []*types.ReplicationTask{
{
TaskType: types.ReplicationTaskTypeSyncActivity.Ptr(),
SyncActivityTaskAttributes: &types.SyncActivityTaskAttributes{
DomainID: testDomainID,
WorkflowID: testWorkflowID,
RunID: testRunID,
ScheduledID: testScheduleID,
},
SourceTaskID: testTaskID,
},
},
}

// wait a bit and terminate the loop
time.Sleep(50 * time.Millisecond)
close(s.taskProcessor.done)
}

func (s *taskProcessorSuite) TestHandleSyncShardStatus() {
now := time.Now()
s.mockEngine.EXPECT().SyncShardStatus(gomock.Any(), &types.SyncShardStatusRequest{
Expand Down

0 comments on commit 957fd1a

Please sign in to comment.