Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write tests for replication task processor main loop #6010

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions service/history/replication/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/uber/cadence/service/history/shard"
)

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination task_executor_mock.go -self_package github.com/uber/cadence/service/history/replication

type (
// TaskExecutor is the executor for replication task
TaskExecutor interface {
Expand Down
73 changes: 73 additions & 0 deletions service/history/replication/task_executor_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,7 @@ func (p *taskProcessorImpl) processSingleTask(replicationTask *types.Replication
func (p *taskProcessorImpl) processTaskOnce(replicationTask *types.ReplicationTask) error {
ts := p.shard.GetTimeSource()
startTime := ts.Now()
scope, err := p.taskExecutor.execute(
replicationTask,
false)
scope, err := p.taskExecutor.execute(replicationTask, false)

if err != nil {
p.updateFailureMetric(scope, err)
Expand Down
179 changes: 172 additions & 7 deletions service/history/replication/task_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package replication
import (
"context"
"encoding/json"
"errors"
"testing"
"time"

Expand All @@ -39,6 +40,7 @@ import (
"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/metrics"
Expand Down Expand Up @@ -68,8 +70,9 @@ type (
adminClient *admin.MockClient
executionManager *mocks.ExecutionManager
requestChan chan *request

taskProcessor *taskProcessorImpl
taskFetcher *fakeTaskFetcher
taskExecutor *MockTaskExecutor
taskProcessor *taskProcessorImpl
}
)

Expand Down Expand Up @@ -113,30 +116,32 @@ func (s *taskProcessorSuite) SetupTest() {
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
s.requestChan = make(chan *request, 10)

taskFetcher := &fakeTaskFetcher{
s.taskFetcher = &fakeTaskFetcher{
sourceCluster: "standby",
requestChan: s.requestChan,
rateLimiter: quotas.NewDynamicRateLimiter(func() float64 { return 100 }),
}

s.taskExecutor = NewMockTaskExecutor(s.controller)

s.taskProcessor = NewTaskProcessor(
s.mockShard,
s.mockEngine,
s.config,
metricsClient,
taskFetcher,
nil,
s.taskFetcher,
s.taskExecutor,
).(*taskProcessorImpl)
}

func (s *taskProcessorSuite) TearDownTest() {
s.mockShard.Finish(s.T())
goleak.VerifyNone(s.T())
}

func (s *taskProcessorSuite) TestStartStop() {
s.taskProcessor.Start()
s.taskProcessor.Stop()
goleak.VerifyNone(s.T())
}

func (s *taskProcessorSuite) TestProcessResponse_NoTask() {
Expand All @@ -149,7 +154,7 @@ func (s *taskProcessorSuite) TestProcessResponse_NoTask() {
s.Equal(int64(100), s.taskProcessor.lastRetrievedMessageID)
}

func (s *taskProcessorSuite) TestSendFetchMessageRequest() {
func (s *taskProcessorSuite) TestProcessorLoop_RequestChanPopulated() {
// start the process loop so it poppulates requestChan
s.taskProcessor.wg.Add(1)
go s.taskProcessor.processorLoop()
Expand All @@ -167,6 +172,166 @@ func (s *taskProcessorSuite) TestSendFetchMessageRequest() {
s.NotNil(requestMessage.respChan)
}

func (s *taskProcessorSuite) TestProcessorLoop_RespChanClosed() {
// start the process loop
s.taskProcessor.wg.Add(1)
go s.taskProcessor.processorLoop()
defer close(s.taskProcessor.done)

// act like taskFetcher here and populate respChan of the request
requestMessage := <-s.requestChan
close(requestMessage.respChan)

// loop should have continued by now. validate by checking the new request
select {
case <-s.requestChan:
// expected
case <-time.After(50 * time.Millisecond):
s.Fail("new request not sent to requestChan")
}
}

func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteSuccess() {
taylanisikdemir marked this conversation as resolved.
Show resolved Hide resolved
// taskExecutor will fail to execute the task
// returning a non-retriable task to keep mocking simpler
s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(0, nil).Times(1)

// domain name will be fetched
s.mockDomainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).AnyTimes()

// start the process loop
s.taskProcessor.wg.Add(1)
go s.taskProcessor.processorLoop()

// act like taskFetcher here and populate respChan of the request
requestMessage := <-s.requestChan
requestMessage.respChan <- &types.ReplicationMessages{
LastRetrievedMessageID: 100,
ReplicationTasks: []*types.ReplicationTask{
{
TaskType: types.ReplicationTaskTypeSyncActivity.Ptr(),
SyncActivityTaskAttributes: &types.SyncActivityTaskAttributes{
DomainID: testDomainID,
WorkflowID: testWorkflowID,
RunID: testRunID,
ScheduledID: testScheduleID,
},
SourceTaskID: testTaskID,
},
},
}

// wait a bit and terminate the loop
time.Sleep(50 * time.Millisecond)
close(s.taskProcessor.done)
}

func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteFailed_PutDLQSuccess() {
// taskExecutor will fail to execute the task
// returning a non-retriable task to keep mocking simpler
s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(0, &types.BadRequestError{}).Times(1)

// domain name will be fetched
s.mockDomainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).AnyTimes()

// task will be put into dlq
dlqReq := &persistence.PutReplicationTaskToDLQRequest{
SourceClusterName: "standby", // TODO move to a constant
TaskInfo: &persistence.ReplicationTaskInfo{
DomainID: testDomainID,
WorkflowID: testWorkflowID,
RunID: testRunID,
TaskID: testTaskID,
TaskType: persistence.ReplicationTaskTypeSyncActivity,
ScheduledID: testScheduleID,
},
DomainName: testDomainName,
}
s.mockShard.Resource.ExecutionMgr.On("PutReplicationTaskToDLQ", mock.Anything, dlqReq).Return(nil).Times(1)

// start the process loop
s.taskProcessor.wg.Add(1)
go s.taskProcessor.processorLoop()

// act like taskFetcher here and populate respChan of the request
requestMessage := <-s.requestChan
requestMessage.respChan <- &types.ReplicationMessages{
LastRetrievedMessageID: 100,
ReplicationTasks: []*types.ReplicationTask{
{
TaskType: types.ReplicationTaskTypeSyncActivity.Ptr(),
SyncActivityTaskAttributes: &types.SyncActivityTaskAttributes{
DomainID: testDomainID,
WorkflowID: testWorkflowID,
RunID: testRunID,
ScheduledID: testScheduleID,
},
SourceTaskID: testTaskID,
},
},
}

// wait a bit and terminate the loop
time.Sleep(50 * time.Millisecond)
close(s.taskProcessor.done)
}

func (s *taskProcessorSuite) TestProcessorLoop_TaskExecuteFailed_PutDLQFailed() {
// taskExecutor will fail to execute the task
// returning a non-retriable task to keep mocking simpler
s.taskExecutor.EXPECT().execute(gomock.Any(), false).Return(0, &types.BadRequestError{}).Times(1)

// domain name will be fetched
s.mockDomainCache.EXPECT().GetDomainName(testDomainID).Return(testDomainName, nil).AnyTimes()

// task will be put into dlq and will fail. It will be attempted 3 times. (first call + 2 retries based on policy overriden below)
dqlRetryPolicy := backoff.NewExponentialRetryPolicy(time.Millisecond)
dqlRetryPolicy.SetMaximumAttempts(2)
s.taskProcessor.dlqRetryPolicy = dqlRetryPolicy
dlqReq := &persistence.PutReplicationTaskToDLQRequest{
SourceClusterName: "standby", // TODO move to a constant
TaskInfo: &persistence.ReplicationTaskInfo{
DomainID: testDomainID,
WorkflowID: testWorkflowID,
RunID: testRunID,
TaskID: testTaskID,
TaskType: persistence.ReplicationTaskTypeSyncActivity,
ScheduledID: testScheduleID,
},
DomainName: testDomainName,
}
s.mockShard.Resource.ExecutionMgr.
On("PutReplicationTaskToDLQ", mock.Anything, dlqReq).
Return(errors.New("failed to put to dlq")).
Times(3)

// start the process loop
s.taskProcessor.wg.Add(1)
go s.taskProcessor.processorLoop()

// act like taskFetcher here and populate respChan of the request
requestMessage := <-s.requestChan
requestMessage.respChan <- &types.ReplicationMessages{
LastRetrievedMessageID: 100,
ReplicationTasks: []*types.ReplicationTask{
{
TaskType: types.ReplicationTaskTypeSyncActivity.Ptr(),
SyncActivityTaskAttributes: &types.SyncActivityTaskAttributes{
DomainID: testDomainID,
WorkflowID: testWorkflowID,
RunID: testRunID,
ScheduledID: testScheduleID,
},
SourceTaskID: testTaskID,
},
},
}

// wait a bit and terminate the loop
time.Sleep(50 * time.Millisecond)
close(s.taskProcessor.done)
}

func (s *taskProcessorSuite) TestHandleSyncShardStatus() {
now := time.Now()
s.mockEngine.EXPECT().SyncShardStatus(gomock.Any(), &types.SyncShardStatusRequest{
Expand Down
Loading