Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement tiered replication stream receiver #5856

Merged
merged 15 commits into from
May 14, 2024
493 changes: 253 additions & 240 deletions api/replication/v1/message.pb.go

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2089,6 +2089,17 @@ that task will be sent to DLQ.`,
512,
`ReplicationProcessorSchedulerWorkerCount is the replication task executor worker count`,
)
ReplicationLowPriorityProcessorSchedulerWorkerCount = NewGlobalIntSetting(
"history.ReplicationLowPriorityProcessorSchedulerWorkerCount",
128,
`ReplicationLowPriorityProcessorSchedulerWorkerCount is the low priority replication task executor worker count`,
)
ReplicationLowPriorityTaskParallelism = NewGlobalIntSetting(
"history.ReplicationLowPriorityTaskParallelism",
4,
`ReplicationLowPriorityTaskParallelism is the number of executions' low priority replication tasks that can be processed in parallel`,
)

EnableEagerNamespaceRefresher = NewGlobalBoolSetting(
"history.EnableEagerNamespaceRefresher",
false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ message WorkflowReplicationMessages {
// This can be different than the last taskId in the above list, because sender can decide to skip tasks (e.g. for completed workflows).
int64 exclusive_high_watermark = 2;
google.protobuf.Timestamp exclusive_high_watermark_time = 3;
temporal.server.api.enums.v1.TaskPriority priority = 4;
}

message ReplicationTaskInfo {
Expand Down
19 changes: 12 additions & 7 deletions service/history/configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,14 @@ type Config struct {
ReplicationEnableDLQMetrics dynamicconfig.BoolPropertyFn
ReplicationEnableUpdateWithNewTaskMerge dynamicconfig.BoolPropertyFn

ReplicationStreamSyncStatusDuration dynamicconfig.DurationPropertyFn
ReplicationProcessorSchedulerQueueSize dynamicconfig.IntPropertyFn
ReplicationProcessorSchedulerWorkerCount dynamicconfig.IntPropertyFn
EnableReplicationEagerRefreshNamespace dynamicconfig.BoolPropertyFn
EnableReplicationTaskBatching dynamicconfig.BoolPropertyFn
EnableReplicateLocalGeneratedEvent dynamicconfig.BoolPropertyFn
ReplicationStreamSyncStatusDuration dynamicconfig.DurationPropertyFn
ReplicationProcessorSchedulerQueueSize dynamicconfig.IntPropertyFn
ReplicationProcessorSchedulerWorkerCount dynamicconfig.IntPropertyFn
ReplicationLowPriorityProcessorSchedulerWorkerCount dynamicconfig.IntPropertyFn
ReplicationLowPriorityTaskParallelism dynamicconfig.IntPropertyFn
EnableReplicationEagerRefreshNamespace dynamicconfig.BoolPropertyFn
EnableReplicationTaskBatching dynamicconfig.BoolPropertyFn
EnableReplicateLocalGeneratedEvent dynamicconfig.BoolPropertyFn

// The following are used by consistent query
MaxBufferedQueryCount dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -394,7 +396,8 @@ func NewConfig(
EventsCacheTTL: dynamicconfig.EventsCacheTTL.Get(dc),
EnableHostLevelEventsCache: dynamicconfig.EnableHostLevelEventsCache.Get(dc),

RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range
RangeSizeBits: 20, // 20 bits for sequencer, 2^20 sequence number for any range

AcquireShardInterval: dynamicconfig.AcquireShardInterval.Get(dc),
AcquireShardConcurrency: dynamicconfig.AcquireShardConcurrency.Get(dc),
ShardIOConcurrency: dynamicconfig.ShardIOConcurrency.Get(dc),
Expand Down Expand Up @@ -482,6 +485,8 @@ func NewConfig(
ReplicationStreamSyncStatusDuration: dynamicconfig.ReplicationStreamSyncStatusDuration.Get(dc),
ReplicationProcessorSchedulerQueueSize: dynamicconfig.ReplicationProcessorSchedulerQueueSize.Get(dc),
ReplicationProcessorSchedulerWorkerCount: dynamicconfig.ReplicationProcessorSchedulerWorkerCount.Get(dc),
ReplicationLowPriorityProcessorSchedulerWorkerCount: dynamicconfig.ReplicationLowPriorityProcessorSchedulerWorkerCount.Get(dc),
ReplicationLowPriorityTaskParallelism: dynamicconfig.ReplicationLowPriorityTaskParallelism.Get(dc),
EnableReplicationEagerRefreshNamespace: dynamicconfig.EnableEagerNamespaceRefresher.Get(dc),
EnableReplicationTaskBatching: dynamicconfig.EnableReplicationTaskBatching.Get(dc),
EnableReplicateLocalGeneratedEvent: dynamicconfig.EnableReplicateLocalGeneratedEvents.Get(dc),
Expand Down
28 changes: 15 additions & 13 deletions service/history/replication/executable_task_tool_box.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,20 @@ type (
ProcessToolBox struct {
fx.In

Config *configs.Config
ClusterMetadata cluster.Metadata
ClientBean client.Bean
ShardController shard.Controller
NamespaceCache namespace.Registry
EagerNamespaceRefresher EagerNamespaceRefresher
NDCHistoryResender xdc.NDCHistoryResender
TaskScheduler ctasks.Scheduler[TrackableExecutableTask]
MetricsHandler metrics.Handler
Logger log.Logger
EventSerializer serialization.Serializer
DLQWriter DLQWriter
HistoryEventsHandler eventhandler.HistoryEventsHandler
Config *configs.Config
ClusterMetadata cluster.Metadata
ClientBean client.Bean
ShardController shard.Controller
NamespaceCache namespace.Registry
EagerNamespaceRefresher EagerNamespaceRefresher
NDCHistoryResender xdc.NDCHistoryResender
HighPriorityTaskScheduler ctasks.Scheduler[TrackableExecutableTask] `name:"HighPriorityTaskScheduler"`
// consider using a single TaskScheduler i.e. InterleavedWeightedRoundRobinScheduler instead of two
LowPriorityTaskScheduler ctasks.Scheduler[TrackableExecutableTask] `name:"LowPriorityTaskScheduler"`
MetricsHandler metrics.Handler
Logger log.Logger
EventSerializer serialization.Serializer
DLQWriter DLQWriter
HistoryEventsHandler eventhandler.HistoryEventsHandler
}
)
44 changes: 42 additions & 2 deletions service/history/replication/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ package replication

import (
"context"
"math/rand"
"strconv"

"github.com/dgryski/go-farm"
historypb "go.temporal.io/api/history/v1"
historyspb "go.temporal.io/server/api/history/v1"
"go.temporal.io/server/common/definition"
Expand Down Expand Up @@ -56,7 +59,14 @@ var Module = fx.Provide(
NewExecutionManagerDLQWriter,
replicationTaskConverterFactoryProvider,
replicationTaskExecutorProvider,
replicationStreamSchedulerProvider,
fx.Annotated{
Name: "HighPriorityTaskScheduler",
Target: replicationStreamHighPrioritySchedulerProvider,
},
fx.Annotated{
Name: "LowPriorityTaskScheduler",
Target: replicationStreamLowPrioritySchedulerProvider,
},
executableTaskConverterProvider,
streamReceiverMonitorProvider,
ndcHistoryResenderProvider,
Expand Down Expand Up @@ -122,7 +132,7 @@ func replicationTaskExecutorProvider() TaskExecutorProvider {
}
}

func replicationStreamSchedulerProvider(
func replicationStreamHighPrioritySchedulerProvider(
config *configs.Config,
logger log.Logger,
queueFactory ctasks.SequentialTaskQueueFactory[TrackableExecutableTask],
Expand All @@ -141,6 +151,36 @@ func replicationStreamSchedulerProvider(
return scheduler
}

func replicationStreamLowPrioritySchedulerProvider(
config *configs.Config,
logger log.Logger,
lc fx.Lifecycle,
) ctasks.Scheduler[TrackableExecutableTask] {
queueFactory := func(task TrackableExecutableTask) ctasks.SequentialTaskQueue[TrackableExecutableTask] {
return NewSequentialTaskQueue(task)
}
taskQueueHashFunc := func(item interface{}) uint32 {
workflowKey, ok := item.(definition.WorkflowKey)
if !ok {
return 0
}

idBytes := []byte(workflowKey.NamespaceID + "_" + workflowKey.WorkflowID + "_" + strconv.Itoa(rand.Intn(config.ReplicationLowPriorityTaskParallelism())))
xwduan marked this conversation as resolved.
Show resolved Hide resolved
return farm.Fingerprint32(idBytes)
}
scheduler := ctasks.NewSequentialScheduler[TrackableExecutableTask](
&ctasks.SequentialSchedulerOptions{
QueueSize: config.ReplicationProcessorSchedulerQueueSize(),
WorkerCount: config.ReplicationLowPriorityProcessorSchedulerWorkerCount,
},
taskQueueHashFunc,
queueFactory,
logger,
)
lc.Append(fx.StartStopHook(scheduler.Start, scheduler.Stop))
return scheduler
}

func sequentialTaskQueueFactoryProvider(
logger log.Logger,
metricsHandler metrics.Handler,
Expand Down
Loading
Loading