Skip to content

Commit

Permalink
Refactor replication related components (#2816)
Browse files Browse the repository at this point in the history
* Refactor replication related components

* Fix merge changes
  • Loading branch information
yux0 committed May 10, 2022
1 parent be024bf commit 5da9b57
Show file tree
Hide file tree
Showing 23 changed files with 1,270 additions and 989 deletions.
13 changes: 7 additions & 6 deletions service/history/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ import (
"go.temporal.io/server/service"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/replication"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
warchiver "go.temporal.io/server/service/worker/archiver"
Expand All @@ -78,7 +79,7 @@ var Module = fx.Options(
fx.Provide(PersistenceMaxQpsProvider),
fx.Provide(ServiceResolverProvider),
fx.Provide(EventNotifierProvider),
fx.Provide(ReplicationTaskFetchersProvider),
fx.Provide(ReplicationTaskFetcherFactoryProvider),
fx.Provide(ArchivalClientProvider),
fx.Provide(HistoryEngineFactoryProvider),
fx.Provide(HandlerProvider),
Expand Down Expand Up @@ -132,7 +133,7 @@ func HandlerProvider(
hostInfoProvider membership.HostInfoProvider,
shardController *shard.ControllerImpl,
eventNotifier events.Notifier,
replicationTaskFetchers ReplicationTaskFetchers,
replicationTaskFetcherFactory replication.TaskFetcherFactory,
) *Handler {
args := NewHandlerArgs{
config,
Expand All @@ -152,7 +153,7 @@ func HandlerProvider(
hostInfoProvider,
shardController,
eventNotifier,
replicationTaskFetchers,
replicationTaskFetcherFactory,
}
return NewHandler(args)
}
Expand Down Expand Up @@ -267,13 +268,13 @@ func EventNotifierProvider(
)
}

func ReplicationTaskFetchersProvider(
func ReplicationTaskFetcherFactoryProvider(
logger log.Logger,
config *configs.Config,
clusterMetadata cluster.Metadata,
clientBean client.Bean,
) ReplicationTaskFetchers {
return NewReplicationTaskFetchers(
) replication.TaskFetcherFactory {
return replication.NewTaskFetcherFactory(
logger,
config,
clusterMetadata,
Expand Down
121 changes: 61 additions & 60 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import (
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/replication"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/tasks"
)
Expand All @@ -70,47 +71,47 @@ type (
Handler struct {
status int32

tokenSerializer common.TaskTokenSerializer
startWG sync.WaitGroup
config *configs.Config
eventNotifier events.Notifier
replicationTaskFetchers ReplicationTaskFetchers
logger log.Logger
throttledLogger log.Logger
persistenceExecutionManager persistence.ExecutionManager
persistenceShardManager persistence.ShardManager
historyServiceResolver membership.ServiceResolver
metricsClient metrics.Client
payloadSerializer serialization.Serializer
timeSource clock.TimeSource
namespaceRegistry namespace.Registry
saProvider searchattribute.Provider
saMapper searchattribute.Mapper
clusterMetadata cluster.Metadata
archivalMetadata archiver.ArchivalMetadata
hostInfoProvider membership.HostInfoProvider
controller *shard.ControllerImpl
tokenSerializer common.TaskTokenSerializer
startWG sync.WaitGroup
config *configs.Config
eventNotifier events.Notifier
replicationTaskFetcherFactory replication.TaskFetcherFactory
logger log.Logger
throttledLogger log.Logger
persistenceExecutionManager persistence.ExecutionManager
persistenceShardManager persistence.ShardManager
historyServiceResolver membership.ServiceResolver
metricsClient metrics.Client
payloadSerializer serialization.Serializer
timeSource clock.TimeSource
namespaceRegistry namespace.Registry
saProvider searchattribute.Provider
saMapper searchattribute.Mapper
clusterMetadata cluster.Metadata
archivalMetadata archiver.ArchivalMetadata
hostInfoProvider membership.HostInfoProvider
controller *shard.ControllerImpl
}

NewHandlerArgs struct {
Config *configs.Config
Logger log.Logger
ThrottledLogger log.Logger
PersistenceExecutionManager persistence.ExecutionManager
PersistenceShardManager persistence.ShardManager
HistoryServiceResolver membership.ServiceResolver
MetricsClient metrics.Client
PayloadSerializer serialization.Serializer
TimeSource clock.TimeSource
NamespaceRegistry namespace.Registry
SaProvider searchattribute.Provider
SaMapper searchattribute.Mapper
ClusterMetadata cluster.Metadata
ArchivalMetadata archiver.ArchivalMetadata
HostInfoProvider membership.HostInfoProvider
ShardController *shard.ControllerImpl
EventNotifier events.Notifier
ReplicationTaskFetchers ReplicationTaskFetchers
Config *configs.Config
Logger log.Logger
ThrottledLogger log.Logger
PersistenceExecutionManager persistence.ExecutionManager
PersistenceShardManager persistence.ShardManager
HistoryServiceResolver membership.ServiceResolver
MetricsClient metrics.Client
PayloadSerializer serialization.Serializer
TimeSource clock.TimeSource
NamespaceRegistry namespace.Registry
SaProvider searchattribute.Provider
SaMapper searchattribute.Mapper
ClusterMetadata cluster.Metadata
ArchivalMetadata archiver.ArchivalMetadata
HostInfoProvider membership.HostInfoProvider
ShardController *shard.ControllerImpl
EventNotifier events.Notifier
ReplicationTaskFetcherFactory replication.TaskFetcherFactory
}
)

Expand Down Expand Up @@ -139,26 +140,26 @@ var (
// NewHandler creates a thrift handler for the history service
func NewHandler(args NewHandlerArgs) *Handler {
handler := &Handler{
status: common.DaemonStatusInitialized,
config: args.Config,
tokenSerializer: common.NewProtoTaskTokenSerializer(),
logger: args.Logger,
throttledLogger: args.ThrottledLogger,
persistenceExecutionManager: args.PersistenceExecutionManager,
persistenceShardManager: args.PersistenceShardManager,
historyServiceResolver: args.HistoryServiceResolver,
metricsClient: args.MetricsClient,
payloadSerializer: args.PayloadSerializer,
timeSource: args.TimeSource,
namespaceRegistry: args.NamespaceRegistry,
saProvider: args.SaProvider,
saMapper: args.SaMapper,
clusterMetadata: args.ClusterMetadata,
archivalMetadata: args.ArchivalMetadata,
hostInfoProvider: args.HostInfoProvider,
controller: args.ShardController,
eventNotifier: args.EventNotifier,
replicationTaskFetchers: args.ReplicationTaskFetchers,
status: common.DaemonStatusInitialized,
config: args.Config,
tokenSerializer: common.NewProtoTaskTokenSerializer(),
logger: args.Logger,
throttledLogger: args.ThrottledLogger,
persistenceExecutionManager: args.PersistenceExecutionManager,
persistenceShardManager: args.PersistenceShardManager,
historyServiceResolver: args.HistoryServiceResolver,
metricsClient: args.MetricsClient,
payloadSerializer: args.PayloadSerializer,
timeSource: args.TimeSource,
namespaceRegistry: args.NamespaceRegistry,
saProvider: args.SaProvider,
saMapper: args.SaMapper,
clusterMetadata: args.ClusterMetadata,
archivalMetadata: args.ArchivalMetadata,
hostInfoProvider: args.HostInfoProvider,
controller: args.ShardController,
eventNotifier: args.EventNotifier,
replicationTaskFetcherFactory: args.ReplicationTaskFetcherFactory,
}

// prevent us from trying to serve requests before shard controller is started and ready
Expand All @@ -176,7 +177,7 @@ func (h *Handler) Start() {
return
}

h.replicationTaskFetchers.Start()
h.replicationTaskFetcherFactory.Start()

// events notifier must starts before controller
h.eventNotifier.Start()
Expand All @@ -195,7 +196,7 @@ func (h *Handler) Stop() {
return
}

h.replicationTaskFetchers.Stop()
h.replicationTaskFetcherFactory.Stop()
h.controller.Stop()
h.eventNotifier.Stop()
}
Expand Down

0 comments on commit 5da9b57

Please sign in to comment.