Skip to content

Commit

Permalink
Perf optimize NDC RPC replication (#1392)
Browse files Browse the repository at this point in the history
* Rename ReadLevel to MinTaskID, MaxReadLevel to MaxTaskID
* Add hint in replication task source reducing unnecessary DB read
* Add periodical sanity check in replication task source in case of timeout err
* Treat last replication task ID vs replication task pagination ID differently
  • Loading branch information
wxing1292 committed Mar 27, 2021
1 parent 32419f6 commit eed9144
Show file tree
Hide file tree
Showing 21 changed files with 462 additions and 152 deletions.
8 changes: 4 additions & 4 deletions common/persistence/cassandra/cassandraPersistence.go
Expand Up @@ -1894,8 +1894,8 @@ func (d *cassandraPersistence) GetReplicationTasks(
rowTypeReplicationWorkflowID,
rowTypeReplicationRunID,
defaultVisibilityTimestamp,
request.ReadLevel,
request.MaxReadLevel,
request.MinTaskID,
request.MaxTaskID,
).PageSize(request.BatchSize).PageState(request.NextPageToken)

return d.populateGetReplicationTasksResponse(query, "GetReplicationTasks")
Expand Down Expand Up @@ -2556,8 +2556,8 @@ func (d *cassandraPersistence) GetReplicationTasksFromDLQ(
request.SourceClusterName,
rowTypeDLQRunID,
defaultVisibilityTimestamp,
request.ReadLevel,
request.ReadLevel+int64(request.BatchSize),
request.MinTaskID,
request.MinTaskID+int64(request.BatchSize),
).PageSize(request.BatchSize).PageState(request.NextPageToken)

return d.populateGetReplicationTasksResponse(query, "GetReplicationTasksFromDLQ")
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/dataInterfaces.go
Expand Up @@ -662,8 +662,8 @@ type (

// GetReplicationTasksRequest is used to read tasks from the replication task queue
GetReplicationTasksRequest struct {
ReadLevel int64
MaxReadLevel int64
MinTaskID int64
MaxTaskID int64
BatchSize int
NextPageToken []byte
}
Expand Down Expand Up @@ -2143,8 +2143,8 @@ func NewGetReplicationTasksFromDLQRequest(
return &GetReplicationTasksFromDLQRequest{
SourceClusterName: sourceClusterName,
GetReplicationTasksRequest: GetReplicationTasksRequest{
ReadLevel: readLevel,
MaxReadLevel: maxReadLevel,
MinTaskID: readLevel,
MaxTaskID: maxReadLevel,
BatchSize: batchSize,
NextPageToken: nextPageToken,
},
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/persistence-tests/persistenceTestBase.go
Expand Up @@ -980,8 +980,8 @@ func (s *TestBase) GetReplicationTasks(batchSize int, getAll bool) ([]*persisten
Loop:
for {
response, err := s.ExecutionManager.GetReplicationTasks(&persistence.GetReplicationTasksRequest{
ReadLevel: s.GetReplicationReadLevel(),
MaxReadLevel: int64(math.MaxInt64),
MinTaskID: s.GetReplicationReadLevel(),
MaxTaskID: int64(math.MaxInt64),
BatchSize: batchSize,
NextPageToken: token,
})
Expand Down Expand Up @@ -1034,8 +1034,8 @@ func (s *TestBase) GetReplicationTasksFromDLQ(
return s.ExecutionManager.GetReplicationTasksFromDLQ(&persistence.GetReplicationTasksFromDLQRequest{
SourceClusterName: sourceCluster,
GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{
ReadLevel: readLevel,
MaxReadLevel: maxReadLevel,
MinTaskID: readLevel,
MaxTaskID: maxReadLevel,
BatchSize: pageSize,
NextPageToken: pageToken,
},
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/sql/sql_execution_tasks.go
Expand Up @@ -319,7 +319,7 @@ func (m *sqlExecutionManager) GetReplicationTasks(

switch err {
case nil:
return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
return m.populateGetReplicationTasksResponse(rows, request.MaxTaskID)
case sql.ErrNoRows:
return &p.GetReplicationTasksResponse{}, nil
default:
Expand All @@ -330,15 +330,15 @@ func (m *sqlExecutionManager) GetReplicationTasks(
func getReadLevels(
request *p.GetReplicationTasksRequest,
) (readLevel int64, maxReadLevelInclusive int64, err error) {
readLevel = request.ReadLevel
readLevel = request.MinTaskID
if len(request.NextPageToken) > 0 {
readLevel, err = deserializePageToken(request.NextPageToken)
if err != nil {
return 0, 0, err
}
}

maxReadLevelInclusive = collection.MaxInt64(readLevel+int64(request.BatchSize), request.MaxReadLevel)
maxReadLevelInclusive = collection.MaxInt64(readLevel+int64(request.BatchSize), request.MaxTaskID)
return readLevel, maxReadLevelInclusive, nil
}

Expand Down Expand Up @@ -476,7 +476,7 @@ func (m *sqlExecutionManager) GetReplicationTasksFromDLQ(

switch err {
case nil:
return m.populateGetReplicationDLQTasksResponse(rows, request.MaxReadLevel)
return m.populateGetReplicationDLQTasksResponse(rows, request.MaxTaskID)
case sql.ErrNoRows:
return &p.GetReplicationTasksResponse{}, nil
default:
Expand Down
4 changes: 0 additions & 4 deletions service/history/configs/config.go
Expand Up @@ -205,8 +205,6 @@ type Config struct {
ReplicationTaskProcessorNoTaskRetryWait dynamicconfig.DurationPropertyFnWithShardIDFilter
ReplicationTaskProcessorCleanupInterval dynamicconfig.DurationPropertyFnWithShardIDFilter
ReplicationTaskProcessorCleanupJitterCoefficient dynamicconfig.FloatPropertyFnWithShardIDFilter
ReplicationTaskProcessorStartWait dynamicconfig.DurationPropertyFnWithShardIDFilter
ReplicationTaskProcessorStartWaitJitterCoefficient dynamicconfig.FloatPropertyFnWithShardIDFilter
ReplicationTaskProcessorHostQPS dynamicconfig.FloatPropertyFn
ReplicationTaskProcessorShardQPS dynamicconfig.FloatPropertyFn

Expand Down Expand Up @@ -349,8 +347,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ReplicatorProcessorMaxRedispatchQueueSize: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxRedispatchQueueSize, 10000),
ReplicatorProcessorEnablePriorityTaskProcessor: dc.GetBoolProperty(dynamicconfig.ReplicatorProcessorEnablePriorityTaskProcessor, false),
ReplicatorProcessorFetchTasksBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 25),
ReplicationTaskProcessorStartWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorStartWait, 5*time.Second),
ReplicationTaskProcessorStartWaitJitterCoefficient: dc.GetFloat64PropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorStartWaitJitterCoefficient, 0.9),
ReplicationTaskProcessorHostQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorHostQPS, 1500),
ReplicationTaskProcessorShardQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorShardQPS, 30),

Expand Down
1 change: 1 addition & 0 deletions service/history/handler.go
Expand Up @@ -1260,6 +1260,7 @@ func (h *Handler) GetReplicationMessages(ctx context.Context, request *historyse
tasks, err := engine.GetReplicationMessages(
ctx,
request.GetClusterName(),
token.GetLastProcessedMessageId(),
token.GetLastRetrievedMessageId(),
)
if err != nil {
Expand Down
46 changes: 30 additions & 16 deletions service/history/historyEngine.go
Expand Up @@ -2496,23 +2496,32 @@ func (e *historyEngineImpl) NotifyNewTransferTasks(
}
}

func (e *historyEngineImpl) NotifyNewVisibilityTasks(
func (e *historyEngineImpl) NotifyNewTimerTasks(
tasks []persistence.Task,
) {

if len(tasks) > 0 && e.visibilityProcessor != nil {
e.visibilityProcessor.NotifyNewTask(tasks)
if len(tasks) > 0 {
task := tasks[0]
clusterName := e.clusterMetadata.ClusterNameForFailoverVersion(task.GetVersion())
e.timerProcessor.NotifyNewTimers(clusterName, tasks)
}
}

func (e *historyEngineImpl) NotifyNewTimerTasks(
func (e *historyEngineImpl) NotifyNewReplicationTasks(
tasks []persistence.Task,
) {

if len(tasks) > 0 {
task := tasks[0]
clusterName := e.clusterMetadata.ClusterNameForFailoverVersion(task.GetVersion())
e.timerProcessor.NotifyNewTimers(clusterName, tasks)
if len(tasks) > 0 && e.replicatorProcessor != nil {
e.replicatorProcessor.NotifyNewTasks(tasks)
}
}

func (e *historyEngineImpl) NotifyNewVisibilityTasks(
tasks []persistence.Task,
) {

if len(tasks) > 0 && e.visibilityProcessor != nil {
e.visibilityProcessor.NotifyNewTask(tasks)
}
}

Expand Down Expand Up @@ -2779,27 +2788,32 @@ func getWorkflowAlreadyStartedError(errMsg string, createRequestID string, workf
func (e *historyEngineImpl) GetReplicationMessages(
ctx context.Context,
pollingCluster string,
lastReadMessageID int64,
ackMessageID int64,
queryMessageID int64,
) (*replicationspb.ReplicationMessages, error) {

scope := metrics.HistoryGetReplicationMessagesScope
sw := e.metricsClient.StartTimer(scope, metrics.GetReplicationMessagesForShardLatency)
defer sw.Stop()

replicationMessages, err := e.replicatorProcessor.getTasks(
if ackMessageID != persistence.EmptyQueueMessageID {
if err := e.shard.UpdateClusterReplicationLevel(
pollingCluster,
ackMessageID,
); err != nil {
e.logger.Error("error updating replication level for shard", tag.Error(err), tag.OperationFailed)
}
}

replicationMessages, err := e.replicatorProcessor.paginateTasks(
ctx,
pollingCluster,
lastReadMessageID,
queryMessageID,
)
if err != nil {
e.logger.Error("Failed to retrieve replication messages.", tag.Error(err))
return nil, err
}

// Set cluster status for sync shard info
replicationMessages.SyncShardStatus = &replicationspb.SyncShardStatus{
StatusTime: timestamp.TimePtr(e.timeSource.Now()),
}
e.logger.Debug("Successfully fetched replication messages.", tag.Counter(len(replicationMessages.ReplicationTasks)))
return replicationMessages, nil
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/replicationDLQHandler.go
Expand Up @@ -117,8 +117,8 @@ func (r *replicationDLQHandlerImpl) readMessagesWithAckLevel(
resp, err := r.shard.GetExecutionManager().GetReplicationTasksFromDLQ(&persistence.GetReplicationTasksFromDLQRequest{
SourceClusterName: sourceCluster,
GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{
ReadLevel: ackLevel,
MaxReadLevel: lastMessageID,
MinTaskID: ackLevel,
MaxTaskID: lastMessageID,
BatchSize: pageSize,
NextPageToken: pageToken,
},
Expand Down
8 changes: 4 additions & 4 deletions service/history/replicationDLQHandler_test.go
Expand Up @@ -172,8 +172,8 @@ func (s *replicationDLQHandlerSuite) TestReadMessages_OK() {
s.executionManager.EXPECT().GetReplicationTasksFromDLQ(&persistence.GetReplicationTasksFromDLQRequest{
SourceClusterName: s.sourceCluster,
GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{
ReadLevel: persistence.EmptyQueueMessageID,
MaxReadLevel: lastMessageID,
MinTaskID: persistence.EmptyQueueMessageID,
MaxTaskID: lastMessageID,
BatchSize: pageSize,
NextPageToken: pageToken,
},
Expand Down Expand Up @@ -255,8 +255,8 @@ func (s *replicationDLQHandlerSuite) TestMergeMessages() {
s.executionManager.EXPECT().GetReplicationTasksFromDLQ(&persistence.GetReplicationTasksFromDLQRequest{
SourceClusterName: s.sourceCluster,
GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{
ReadLevel: persistence.EmptyQueueMessageID,
MaxReadLevel: lastMessageID,
MinTaskID: persistence.EmptyQueueMessageID,
MaxTaskID: lastMessageID,
BatchSize: pageSize,
NextPageToken: pageToken,
},
Expand Down
45 changes: 30 additions & 15 deletions service/history/replicationTaskProcessor.go
Expand Up @@ -54,10 +54,8 @@ import (
)

const (
dropSyncShardTaskTimeThreshold = 10 * time.Minute
replicationTimeout = 30 * time.Second
taskErrorRetryBackoffCoefficient = 1.2
taskErrorRetryMaxInterval = 5 * time.Second
dropSyncShardTaskTimeThreshold = 10 * time.Minute
replicationTimeout = 30 * time.Second
)

var (
Expand Down Expand Up @@ -88,6 +86,8 @@ type (
minTxAckedTaskID int64
// recv side
maxRxProcessedTaskID int64
maxRxReceivedTaskID int64
rxTaskBackoff time.Duration

requestChan chan<- *replicationTaskRequest
syncShardChan chan *replicationspb.SyncShardStatus
Expand Down Expand Up @@ -150,6 +150,7 @@ func NewReplicationTaskProcessor(
shutdownChan: make(chan struct{}),
minTxAckedTaskID: persistence.EmptyQueueMessageID,
maxRxProcessedTaskID: persistence.EmptyQueueMessageID,
maxRxReceivedTaskID: persistence.EmptyQueueMessageID,
}
}

Expand Down Expand Up @@ -198,6 +199,9 @@ func (p *ReplicationTaskProcessorImpl) eventLoop() {
))
defer cleanupTimer.Stop()

replicationTimer := time.NewTimer(0)
defer replicationTimer.Stop()

var syncShardTask *replicationspb.SyncShardStatus
for {
select {
Expand Down Expand Up @@ -226,37 +230,42 @@ func (p *ReplicationTaskProcessorImpl) eventLoop() {
case <-p.shutdownChan:
return

default:
case <-replicationTimer.C:
if err := p.pollProcessReplicationTasks(); err != nil {
p.logger.Error("unable to process replication tasks", tag.Error(err))
}
replicationTimer.Reset(p.rxTaskBackoff)
}
}
}

func (p *ReplicationTaskProcessorImpl) pollProcessReplicationTasks() error {
taskIterator := collection.NewPagingIterator(p.paginationFn)
func (p *ReplicationTaskProcessorImpl) pollProcessReplicationTasks() (retError error) {
defer func() {
if retError != nil {
p.maxRxReceivedTaskID = p.maxRxProcessedTaskID
p.rxTaskBackoff = p.config.ReplicationTaskFetcherErrorRetryWait()
}
}()

count := 0
taskIterator := collection.NewPagingIterator(p.paginationFn)
for taskIterator.HasNext() && !p.isStopped() {
task, err := taskIterator.Next()
if err != nil {
return err
}

count++
replicationTask := task.(*replicationspb.ReplicationTask)
if err = p.applyReplicationTask(replicationTask); err != nil {
return err
}
p.maxRxProcessedTaskID = replicationTask.GetSourceTaskId()
}

// TODO there should be better handling of remote not having replication tasks
// & make the application of replication task evenly distributed (in terms of time)
// stream / long poll API worth considering
if count == 0 {
time.Sleep(p.config.ReplicationTaskProcessorNoTaskRetryWait(p.shard.GetShardID()))
if !p.isStopped() {
// all tasks fetched successfully processed
// setting the receiver side max processed task ID to max received task ID
// since task ID is not contiguous
p.maxRxProcessedTaskID = p.maxRxReceivedTaskID
}

return nil
Expand Down Expand Up @@ -415,7 +424,7 @@ func (p *ReplicationTaskProcessorImpl) paginationFn(_ []byte) ([]interface{}, []
token: &replicationspb.ReplicationToken{
ShardId: p.shard.GetShardID(),
LastProcessedMessageId: p.maxRxProcessedTaskID,
LastRetrievedMessageId: p.maxRxProcessedTaskID,
LastRetrievedMessageId: p.maxRxReceivedTaskID,
},
respChan: respChan,
}
Expand All @@ -438,6 +447,12 @@ func (p *ReplicationTaskProcessorImpl) paginationFn(_ []byte) ([]interface{}, []
for _, task := range resp.GetReplicationTasks() {
tasks = append(tasks, task)
}
p.maxRxReceivedTaskID = resp.GetLastRetrievedMessageId()
if resp.GetHasMore() {
p.rxTaskBackoff = time.Duration(0)
} else {
p.rxTaskBackoff = p.config.ReplicationTaskProcessorNoTaskRetryWait(p.shard.GetShardID())
}
return tasks, nil, nil

case <-p.shutdownChan:
Expand Down

0 comments on commit eed9144

Please sign in to comment.