Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf optimize NDC RPC replication, reducing DB calls #1392

Merged
merged 3 commits into from
Mar 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -1894,8 +1894,8 @@ func (d *cassandraPersistence) GetReplicationTasks(
rowTypeReplicationWorkflowID,
rowTypeReplicationRunID,
defaultVisibilityTimestamp,
request.ReadLevel,
request.MaxReadLevel,
request.MinTaskID,
request.MaxTaskID,
).PageSize(request.BatchSize).PageState(request.NextPageToken)

return d.populateGetReplicationTasksResponse(query, "GetReplicationTasks")
Expand Down Expand Up @@ -2556,8 +2556,8 @@ func (d *cassandraPersistence) GetReplicationTasksFromDLQ(
request.SourceClusterName,
rowTypeDLQRunID,
defaultVisibilityTimestamp,
request.ReadLevel,
request.ReadLevel+int64(request.BatchSize),
request.MinTaskID,
request.MinTaskID+int64(request.BatchSize),
).PageSize(request.BatchSize).PageState(request.NextPageToken)

return d.populateGetReplicationTasksResponse(query, "GetReplicationTasksFromDLQ")
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,8 +662,8 @@ type (

// GetReplicationTasksRequest is used to read tasks from the replication task queue
GetReplicationTasksRequest struct {
ReadLevel int64
MaxReadLevel int64
MinTaskID int64
MaxTaskID int64
BatchSize int
NextPageToken []byte
}
Expand Down Expand Up @@ -2143,8 +2143,8 @@ func NewGetReplicationTasksFromDLQRequest(
return &GetReplicationTasksFromDLQRequest{
SourceClusterName: sourceClusterName,
GetReplicationTasksRequest: GetReplicationTasksRequest{
ReadLevel: readLevel,
MaxReadLevel: maxReadLevel,
MinTaskID: readLevel,
MaxTaskID: maxReadLevel,
BatchSize: batchSize,
NextPageToken: nextPageToken,
},
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,8 +980,8 @@ func (s *TestBase) GetReplicationTasks(batchSize int, getAll bool) ([]*persisten
Loop:
for {
response, err := s.ExecutionManager.GetReplicationTasks(&persistence.GetReplicationTasksRequest{
ReadLevel: s.GetReplicationReadLevel(),
MaxReadLevel: int64(math.MaxInt64),
MinTaskID: s.GetReplicationReadLevel(),
MaxTaskID: int64(math.MaxInt64),
BatchSize: batchSize,
NextPageToken: token,
})
Expand Down Expand Up @@ -1034,8 +1034,8 @@ func (s *TestBase) GetReplicationTasksFromDLQ(
return s.ExecutionManager.GetReplicationTasksFromDLQ(&persistence.GetReplicationTasksFromDLQRequest{
SourceClusterName: sourceCluster,
GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{
ReadLevel: readLevel,
MaxReadLevel: maxReadLevel,
MinTaskID: readLevel,
MaxTaskID: maxReadLevel,
BatchSize: pageSize,
NextPageToken: pageToken,
},
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/sql/sql_execution_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func (m *sqlExecutionManager) GetReplicationTasks(

switch err {
case nil:
return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
return m.populateGetReplicationTasksResponse(rows, request.MaxTaskID)
case sql.ErrNoRows:
return &p.GetReplicationTasksResponse{}, nil
default:
Expand All @@ -330,15 +330,15 @@ func (m *sqlExecutionManager) GetReplicationTasks(
func getReadLevels(
request *p.GetReplicationTasksRequest,
) (readLevel int64, maxReadLevelInclusive int64, err error) {
readLevel = request.ReadLevel
readLevel = request.MinTaskID
if len(request.NextPageToken) > 0 {
readLevel, err = deserializePageToken(request.NextPageToken)
if err != nil {
return 0, 0, err
}
}

maxReadLevelInclusive = collection.MaxInt64(readLevel+int64(request.BatchSize), request.MaxReadLevel)
maxReadLevelInclusive = collection.MaxInt64(readLevel+int64(request.BatchSize), request.MaxTaskID)
return readLevel, maxReadLevelInclusive, nil
}

Expand Down Expand Up @@ -476,7 +476,7 @@ func (m *sqlExecutionManager) GetReplicationTasksFromDLQ(

switch err {
case nil:
return m.populateGetReplicationDLQTasksResponse(rows, request.MaxReadLevel)
return m.populateGetReplicationDLQTasksResponse(rows, request.MaxTaskID)
case sql.ErrNoRows:
return &p.GetReplicationTasksResponse{}, nil
default:
Expand Down
4 changes: 0 additions & 4 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,6 @@ type Config struct {
ReplicationTaskProcessorNoTaskRetryWait dynamicconfig.DurationPropertyFnWithShardIDFilter
ReplicationTaskProcessorCleanupInterval dynamicconfig.DurationPropertyFnWithShardIDFilter
ReplicationTaskProcessorCleanupJitterCoefficient dynamicconfig.FloatPropertyFnWithShardIDFilter
ReplicationTaskProcessorStartWait dynamicconfig.DurationPropertyFnWithShardIDFilter
ReplicationTaskProcessorStartWaitJitterCoefficient dynamicconfig.FloatPropertyFnWithShardIDFilter
ReplicationTaskProcessorHostQPS dynamicconfig.FloatPropertyFn
ReplicationTaskProcessorShardQPS dynamicconfig.FloatPropertyFn

Expand Down Expand Up @@ -349,8 +347,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
ReplicatorProcessorMaxRedispatchQueueSize: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxRedispatchQueueSize, 10000),
ReplicatorProcessorEnablePriorityTaskProcessor: dc.GetBoolProperty(dynamicconfig.ReplicatorProcessorEnablePriorityTaskProcessor, false),
ReplicatorProcessorFetchTasksBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 25),
ReplicationTaskProcessorStartWait: dc.GetDurationPropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorStartWait, 5*time.Second),
ReplicationTaskProcessorStartWaitJitterCoefficient: dc.GetFloat64PropertyFilteredByShardID(dynamicconfig.ReplicationTaskProcessorStartWaitJitterCoefficient, 0.9),
ReplicationTaskProcessorHostQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorHostQPS, 1500),
ReplicationTaskProcessorShardQPS: dc.GetFloat64Property(dynamicconfig.ReplicationTaskProcessorShardQPS, 30),

Expand Down
1 change: 1 addition & 0 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1260,6 +1260,7 @@ func (h *Handler) GetReplicationMessages(ctx context.Context, request *historyse
tasks, err := engine.GetReplicationMessages(
ctx,
request.GetClusterName(),
token.GetLastProcessedMessageId(),
token.GetLastRetrievedMessageId(),
)
if err != nil {
Expand Down
46 changes: 30 additions & 16 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2496,23 +2496,32 @@ func (e *historyEngineImpl) NotifyNewTransferTasks(
}
}

func (e *historyEngineImpl) NotifyNewVisibilityTasks(
func (e *historyEngineImpl) NotifyNewTimerTasks(
tasks []persistence.Task,
) {

if len(tasks) > 0 && e.visibilityProcessor != nil {
e.visibilityProcessor.NotifyNewTask(tasks)
if len(tasks) > 0 {
task := tasks[0]
clusterName := e.clusterMetadata.ClusterNameForFailoverVersion(task.GetVersion())
e.timerProcessor.NotifyNewTimers(clusterName, tasks)
}
}

func (e *historyEngineImpl) NotifyNewTimerTasks(
func (e *historyEngineImpl) NotifyNewReplicationTasks(
tasks []persistence.Task,
) {

if len(tasks) > 0 {
task := tasks[0]
clusterName := e.clusterMetadata.ClusterNameForFailoverVersion(task.GetVersion())
e.timerProcessor.NotifyNewTimers(clusterName, tasks)
if len(tasks) > 0 && e.replicatorProcessor != nil {
e.replicatorProcessor.NotifyNewTasks(tasks)
}
}

func (e *historyEngineImpl) NotifyNewVisibilityTasks(
tasks []persistence.Task,
) {

if len(tasks) > 0 && e.visibilityProcessor != nil {
e.visibilityProcessor.NotifyNewTask(tasks)
}
}

Expand Down Expand Up @@ -2779,27 +2788,32 @@ func getWorkflowAlreadyStartedError(errMsg string, createRequestID string, workf
func (e *historyEngineImpl) GetReplicationMessages(
ctx context.Context,
pollingCluster string,
lastReadMessageID int64,
ackMessageID int64,
queryMessageID int64,
) (*replicationspb.ReplicationMessages, error) {

scope := metrics.HistoryGetReplicationMessagesScope
sw := e.metricsClient.StartTimer(scope, metrics.GetReplicationMessagesForShardLatency)
defer sw.Stop()

replicationMessages, err := e.replicatorProcessor.getTasks(
if ackMessageID != persistence.EmptyQueueMessageID {
if err := e.shard.UpdateClusterReplicationLevel(
pollingCluster,
ackMessageID,
); err != nil {
e.logger.Error("error updating replication level for shard", tag.Error(err), tag.OperationFailed)
}
}

replicationMessages, err := e.replicatorProcessor.paginateTasks(
ctx,
pollingCluster,
lastReadMessageID,
queryMessageID,
)
if err != nil {
e.logger.Error("Failed to retrieve replication messages.", tag.Error(err))
return nil, err
}

// Set cluster status for sync shard info
replicationMessages.SyncShardStatus = &replicationspb.SyncShardStatus{
StatusTime: timestamp.TimePtr(e.timeSource.Now()),
}
e.logger.Debug("Successfully fetched replication messages.", tag.Counter(len(replicationMessages.ReplicationTasks)))
return replicationMessages, nil
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/replicationDLQHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func (r *replicationDLQHandlerImpl) readMessagesWithAckLevel(
resp, err := r.shard.GetExecutionManager().GetReplicationTasksFromDLQ(&persistence.GetReplicationTasksFromDLQRequest{
SourceClusterName: sourceCluster,
GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{
ReadLevel: ackLevel,
MaxReadLevel: lastMessageID,
MinTaskID: ackLevel,
MaxTaskID: lastMessageID,
BatchSize: pageSize,
NextPageToken: pageToken,
},
Expand Down
8 changes: 4 additions & 4 deletions service/history/replicationDLQHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ func (s *replicationDLQHandlerSuite) TestReadMessages_OK() {
s.executionManager.EXPECT().GetReplicationTasksFromDLQ(&persistence.GetReplicationTasksFromDLQRequest{
SourceClusterName: s.sourceCluster,
GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{
ReadLevel: persistence.EmptyQueueMessageID,
MaxReadLevel: lastMessageID,
MinTaskID: persistence.EmptyQueueMessageID,
MaxTaskID: lastMessageID,
BatchSize: pageSize,
NextPageToken: pageToken,
},
Expand Down Expand Up @@ -255,8 +255,8 @@ func (s *replicationDLQHandlerSuite) TestMergeMessages() {
s.executionManager.EXPECT().GetReplicationTasksFromDLQ(&persistence.GetReplicationTasksFromDLQRequest{
SourceClusterName: s.sourceCluster,
GetReplicationTasksRequest: persistence.GetReplicationTasksRequest{
ReadLevel: persistence.EmptyQueueMessageID,
MaxReadLevel: lastMessageID,
MinTaskID: persistence.EmptyQueueMessageID,
MaxTaskID: lastMessageID,
BatchSize: pageSize,
NextPageToken: pageToken,
},
Expand Down
45 changes: 30 additions & 15 deletions service/history/replicationTaskProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,8 @@ import (
)

const (
dropSyncShardTaskTimeThreshold = 10 * time.Minute
replicationTimeout = 30 * time.Second
taskErrorRetryBackoffCoefficient = 1.2
taskErrorRetryMaxInterval = 5 * time.Second
dropSyncShardTaskTimeThreshold = 10 * time.Minute
replicationTimeout = 30 * time.Second
)

var (
Expand Down Expand Up @@ -88,6 +86,8 @@ type (
minTxAckedTaskID int64
// recv side
maxRxProcessedTaskID int64
maxRxReceivedTaskID int64
rxTaskBackoff time.Duration

requestChan chan<- *replicationTaskRequest
syncShardChan chan *replicationspb.SyncShardStatus
Expand Down Expand Up @@ -150,6 +150,7 @@ func NewReplicationTaskProcessor(
shutdownChan: make(chan struct{}),
minTxAckedTaskID: persistence.EmptyQueueMessageID,
maxRxProcessedTaskID: persistence.EmptyQueueMessageID,
maxRxReceivedTaskID: persistence.EmptyQueueMessageID,
}
}

Expand Down Expand Up @@ -198,6 +199,9 @@ func (p *ReplicationTaskProcessorImpl) eventLoop() {
))
defer cleanupTimer.Stop()

replicationTimer := time.NewTimer(0)
defer replicationTimer.Stop()

var syncShardTask *replicationspb.SyncShardStatus
for {
select {
Expand Down Expand Up @@ -226,37 +230,42 @@ func (p *ReplicationTaskProcessorImpl) eventLoop() {
case <-p.shutdownChan:
return

default:
case <-replicationTimer.C:
if err := p.pollProcessReplicationTasks(); err != nil {
p.logger.Error("unable to process replication tasks", tag.Error(err))
}
replicationTimer.Reset(p.rxTaskBackoff)
}
}
}

func (p *ReplicationTaskProcessorImpl) pollProcessReplicationTasks() error {
taskIterator := collection.NewPagingIterator(p.paginationFn)
func (p *ReplicationTaskProcessorImpl) pollProcessReplicationTasks() (retError error) {
defer func() {
if retError != nil {
p.maxRxReceivedTaskID = p.maxRxProcessedTaskID
p.rxTaskBackoff = p.config.ReplicationTaskFetcherErrorRetryWait()
}
}()

count := 0
taskIterator := collection.NewPagingIterator(p.paginationFn)
for taskIterator.HasNext() && !p.isStopped() {
task, err := taskIterator.Next()
if err != nil {
return err
}

count++
replicationTask := task.(*replicationspb.ReplicationTask)
if err = p.applyReplicationTask(replicationTask); err != nil {
return err
}
p.maxRxProcessedTaskID = replicationTask.GetSourceTaskId()
}

// TODO there should be better handling of remote not having replication tasks
// & make the application of replication task evenly distributed (in terms of time)
// stream / long poll API worth considering
if count == 0 {
time.Sleep(p.config.ReplicationTaskProcessorNoTaskRetryWait(p.shard.GetShardID()))
if !p.isStopped() {
// all tasks fetched successfully processed
// setting the receiver side max processed task ID to max received task ID
// since task ID is not contiguous
p.maxRxProcessedTaskID = p.maxRxReceivedTaskID
}

return nil
Expand Down Expand Up @@ -415,7 +424,7 @@ func (p *ReplicationTaskProcessorImpl) paginationFn(_ []byte) ([]interface{}, []
token: &replicationspb.ReplicationToken{
ShardId: p.shard.GetShardID(),
LastProcessedMessageId: p.maxRxProcessedTaskID,
LastRetrievedMessageId: p.maxRxProcessedTaskID,
LastRetrievedMessageId: p.maxRxReceivedTaskID,
},
respChan: respChan,
}
Expand All @@ -438,6 +447,12 @@ func (p *ReplicationTaskProcessorImpl) paginationFn(_ []byte) ([]interface{}, []
for _, task := range resp.GetReplicationTasks() {
tasks = append(tasks, task)
}
p.maxRxReceivedTaskID = resp.GetLastRetrievedMessageId()
if resp.GetHasMore() {
p.rxTaskBackoff = time.Duration(0)
} else {
p.rxTaskBackoff = p.config.ReplicationTaskProcessorNoTaskRetryWait(p.shard.GetShardID())
}
return tasks, nil, nil

case <-p.shutdownChan:
Expand Down