Skip to content

Commit

Permalink
Add unit test for replication task fetcher (#6009)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed May 10, 2024
1 parent 10604c3 commit 684836a
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 17 deletions.
6 changes: 6 additions & 0 deletions common/backoff/jitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ func JitDuration(duration time.Duration, coefficient float64) time.Duration {
// JitInt64 return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive
func JitInt64(input int64, coefficient float64) int64 {
validateCoefficient(coefficient)
if coefficient == 0 {
return input
}

base := int64(float64(input) * (1 - coefficient))
addon := rand.Int63n(2 * (input - base))
Expand All @@ -44,6 +47,9 @@ func JitInt64(input int64, coefficient float64) int64 {
// JitFloat64 return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive
func JitFloat64(input float64, coefficient float64) float64 {
validateCoefficient(coefficient)
if coefficient == 0 {
return input
}

base := input * (1 - coefficient)
addon := rand.Float64() * 2 * (input - base)
Expand Down
22 changes: 22 additions & 0 deletions common/backoff/jitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package backoff

import (
"log"
"math/rand"
"os"
"testing"
"time"
Expand Down Expand Up @@ -59,6 +60,13 @@ func (s *jitterSuite) TestJitInt64() {
}
}

func (s *jitterSuite) TestJitInt64WithZeroCoefficient() {
for i := 0; i < 1048576; i++ {
input := rand.Int63()
s.Equal(input, JitInt64(input, 0))
}
}

func (s *jitterSuite) TestJitFloat64() {
input := float64(1048576.1048576)
coefficient := float64(0.16)
Expand All @@ -72,6 +80,13 @@ func (s *jitterSuite) TestJitFloat64() {
}
}

func (s *jitterSuite) TestJitFloat64WithZeroCoefficient() {
for i := 0; i < 1048576; i++ {
input := rand.Float64()
s.Equal(input, JitFloat64(input, 0))
}
}

func (s *jitterSuite) TestJitDuration() {
input := time.Duration(1099511627776)
coefficient := float64(0.1)
Expand All @@ -84,3 +99,10 @@ func (s *jitterSuite) TestJitDuration() {
s.True(result < upperBound)
}
}

func (s *jitterSuite) TestJitDurationWithZeroCoefficient() {
for i := 0; i < 1048576; i++ {
input := time.Duration(rand.Int63())
s.Equal(input, JitDuration(input, 0))
}
}
41 changes: 26 additions & 15 deletions service/history/replication/task_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ package replication

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/uber/cadence/client"
"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand All @@ -40,7 +42,6 @@ import (
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination task_fetcher_mock.go -self_package github.com/uber/cadence/service/history/replication

// TODO: reuse the interface and implementation defined in history/task package

const (
fetchTaskRequestTimeout = 60 * time.Second
requestChanBufferSize = 1000
Expand Down Expand Up @@ -72,10 +73,13 @@ type (
logger log.Logger
remotePeer admin.Client
rateLimiter *quotas.DynamicRateLimiter
timeSource clock.TimeSource
requestChan chan *request
ctx context.Context
cancelCtx context.CancelFunc
stoppedCh chan struct{}
wg sync.WaitGroup

fetchAndDistributeTasksFn func(map[int32]*request) error
}

// taskFetchersImpl is a group of fetchers, one per source DC.
Expand Down Expand Up @@ -155,19 +159,21 @@ func newReplicationTaskFetcher(
sourceFrontend admin.Client,
) TaskFetcher {
ctx, cancel := context.WithCancel(context.Background())
return &taskFetcherImpl{
fetcher := &taskFetcherImpl{
status: common.DaemonStatusInitialized,
config: config,
logger: logger.WithTags(tag.ClusterName(sourceCluster)),
remotePeer: sourceFrontend,
currentCluster: currentCluster,
sourceCluster: sourceCluster,
rateLimiter: quotas.NewDynamicRateLimiter(config.ReplicationTaskProcessorHostQPS.AsFloat64()),
timeSource: clock.NewRealTimeSource(),
requestChan: make(chan *request, requestChanBufferSize),
ctx: ctx,
cancelCtx: cancel,
stoppedCh: make(chan struct{}),
}
fetcher.fetchAndDistributeTasksFn = fetcher.fetchAndDistributeTasks
return fetcher
}

// Start starts the fetcher
Expand All @@ -176,7 +182,10 @@ func (f *taskFetcherImpl) Start() {
return
}

// NOTE: we have never run production service with ReplicationTaskFetcherParallelism larger than 1,
// the behavior is undefined if we do so. We should consider making this config a boolean.
for i := 0; i < f.config.ReplicationTaskFetcherParallelism(); i++ {
f.wg.Add(1)
go f.fetchTasks()
}
f.logger.Info("Replication task fetcher started.", tag.Counter(f.config.ReplicationTaskFetcherParallelism()))
Expand All @@ -189,40 +198,42 @@ func (f *taskFetcherImpl) Stop() {
}

f.cancelCtx()
// TODO: remove this config and disable non graceful shutdown
if f.config.ReplicationTaskFetcherEnableGracefulSyncShutdown() {
f.logger.Debug("Replication task fetcher is waiting on stoppedCh before shutting down")
<-f.stoppedCh
if !common.AwaitWaitGroup(&f.wg, 10*time.Second) {
f.logger.Warn("Replication task fetcher timed out on shutdown.")
} else {
f.logger.Info("Replication task fetcher graceful shutdown completed.")
}
}
f.logger.Info("Replication task fetcher stopped.")
}

// fetchTasks collects getReplicationTasks request from shards and send out aggregated request to source frontend.
func (f *taskFetcherImpl) fetchTasks() {
timer := time.NewTimer(backoff.JitDuration(
defer f.wg.Done()
timer := f.timeSource.NewTimer(backoff.JitDuration(
f.config.ReplicationTaskFetcherAggregationInterval(),
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
))
defer timer.Stop()
defer close(f.stoppedCh)

requestByShard := make(map[int32]*request)
for {
select {
case request := <-f.requestChan:
// Here we only add the request to map. We will wait until timer fires to send the request to remote.
if req, ok := requestByShard[request.token.GetShardID()]; ok && req != request {
// since this replication task fetcher is per host
// and replication task processor is per shard
// during shard movement, duplicated requests can appear
// if shard moved from this host, to this host.
f.logger.Error("Get replication task request already exist for shard.", tag.ShardID(int(request.token.GetShardID())))
// since this replication task fetcher is per host and replication task processor is per shard
// during shard movement, duplicated requests can appear, if shard moved from this host to this host.
f.logger.Info("Get replication task request already exist for shard.", tag.ShardID(int(request.token.GetShardID())))
close(req.respChan)
}
requestByShard[request.token.GetShardID()] = request

case <-timer.C:
case <-timer.Chan():
// When timer fires, we collect all the requests we have so far and attempt to send them to remote.
err := f.fetchAndDistributeTasks(requestByShard)
err := f.fetchAndDistributeTasksFn(requestByShard)
if err != nil {
if _, ok := err.(*types.ServiceBusyError); ok {
// slow down replication when source cluster is busy
Expand Down
116 changes: 114 additions & 2 deletions service/history/replication/task_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,17 @@ package replication

import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/goleak"

"github.com/uber/cadence/client/admin"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/resource"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -67,8 +71,9 @@ func (s *taskFetcherSuite) SetupTest() {

s.mockResource = resource.NewTest(s.T(), s.controller, metrics.History)
s.frontendClient = s.mockResource.RemoteAdminClient
logger := log.NewNoop()
logger := testlogger.New(s.T())
s.config = config.NewForTest()
s.config.ReplicationTaskFetcherTimerJitterCoefficient = dynamicconfig.GetFloatPropertyFn(0.0) // set jitter to 0 for test

s.taskFetcher = newReplicationTaskFetcher(
logger,
Expand Down Expand Up @@ -140,3 +145,110 @@ func (s *taskFetcherSuite) TestFetchAndDistributeTasks() {
respToken := <-respChan
s.Equal(messageByShared[0], respToken)
}

func (s *taskFetcherSuite) TestLifecycle() {
defer goleak.VerifyNone(s.T())
mockTimeSource := clock.NewMockedTimeSourceAt(time.Now())
s.taskFetcher.timeSource = mockTimeSource
respChan0 := make(chan *types.ReplicationMessages, 1)
respChan1 := make(chan *types.ReplicationMessages, 1)
respChan2 := make(chan *types.ReplicationMessages, 1)
respChan3 := make(chan *types.ReplicationMessages, 1)
req0 := &request{
token: &types.ReplicationToken{
ShardID: 0,
LastRetrievedMessageID: 100,
LastProcessedMessageID: 10,
},
respChan: respChan0,
}
req1 := &request{
token: &types.ReplicationToken{
ShardID: 1,
LastRetrievedMessageID: 10,
LastProcessedMessageID: 1,
},
respChan: respChan1,
}
req2 := &request{
token: &types.ReplicationToken{
ShardID: 0,
LastRetrievedMessageID: 10,
LastProcessedMessageID: 1,
},
respChan: respChan2,
}
req3 := &request{
token: &types.ReplicationToken{
ShardID: 1,
LastRetrievedMessageID: 11,
LastProcessedMessageID: 2,
},
respChan: respChan3,
}
fetchAndDistributeTasksFnCall := 0
fetchAndDistributeTasksSyncChan := []chan struct{}{make(chan struct{}), make(chan struct{}), make(chan struct{}), make(chan struct{})}
s.taskFetcher.fetchAndDistributeTasksFn = func(requestByShard map[int32]*request) error {
defer func() {
fetchAndDistributeTasksFnCall++
close(fetchAndDistributeTasksSyncChan[fetchAndDistributeTasksFnCall-1])
}()
if fetchAndDistributeTasksFnCall == 0 {
s.Equal(map[int32]*request{1: req1, 0: req2}, requestByShard)
return &types.ServiceBusyError{}
} else if fetchAndDistributeTasksFnCall == 1 {
s.Equal(map[int32]*request{1: req3, 0: req2}, requestByShard)
return &types.InternalServiceError{}
} else if fetchAndDistributeTasksFnCall == 2 {
s.Equal(map[int32]*request{1: req3, 0: req2}, requestByShard)
for shard := range requestByShard {
delete(requestByShard, shard)
}
return nil
} else if fetchAndDistributeTasksFnCall == 3 {
s.Equal(map[int32]*request{}, requestByShard)
return nil
}
return nil
}
s.taskFetcher.Start()
defer s.taskFetcher.Stop()

requestChan := s.taskFetcher.GetRequestChan()
// send 3 replication requests to the fetcher
requestChan <- req0
requestChan <- req1
requestChan <- req2
_, open := <-respChan0 // block until duplicate replication task is read from fetcher's request channel
s.False(open)

// process the existing replication requests and return service busy error
s.Equal(0, fetchAndDistributeTasksFnCall)
mockTimeSource.Advance(s.config.ReplicationTaskFetcherAggregationInterval())
_, open = <-fetchAndDistributeTasksSyncChan[0] // block until fetchAndDistributeTasksFn is called
s.False(open)
s.Equal(1, fetchAndDistributeTasksFnCall)

// send a new duplicate replication request to fetcher
requestChan <- req3
_, open = <-respChan1 // block until duplicate replication task is read from fetcher's request channel
s.False(open)

// process the existing replication requests and return non-service busy error
mockTimeSource.Advance(s.config.ReplicationTaskFetcherServiceBusyWait())
_, open = <-fetchAndDistributeTasksSyncChan[1] // block until fetchAndDistributeTasksFn is called
s.False(open)
s.Equal(2, fetchAndDistributeTasksFnCall)

// process the existing replication requests and return success
mockTimeSource.Advance(s.config.ReplicationTaskFetcherErrorRetryWait())
_, open = <-fetchAndDistributeTasksSyncChan[2] // block until fetchAndDistributeTasksFn is called
s.False(open)
s.Equal(3, fetchAndDistributeTasksFnCall)

// process empty requests and return success
mockTimeSource.Advance(s.config.ReplicationTaskFetcherAggregationInterval())
_, open = <-fetchAndDistributeTasksSyncChan[3] // block until fetchAndDistributeTasksFn is called
s.False(open)
s.Equal(4, fetchAndDistributeTasksFnCall)
}
1 change: 1 addition & 0 deletions service/history/replication/task_hydrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ func TestMutableStateLoader_GetMutableState(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, expectedMS, ms)
assert.NotNil(t, release)
release(nil)
}

func TestImmediateTaskHydrator(t *testing.T) {
Expand Down

0 comments on commit 684836a

Please sign in to comment.