Skip to content

Commit

Permalink
Update replication processor/executor init logic (#2926)
Browse files Browse the repository at this point in the history
* Update replication processor/executor init logic
  • Loading branch information
yux0 committed Jun 3, 2022
1 parent 58d2964 commit 4c11b96
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 151 deletions.
7 changes: 5 additions & 2 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import (
"sync"
"time"

"github.com/uber-go/tally/v4"
"github.com/uber/tchannel-go"
"go.temporal.io/api/operatorservice/v1"
"go.temporal.io/api/workflowservice/v1"

"github.com/uber-go/tally/v4"
"github.com/uber/tchannel-go"
"go.uber.org/fx"
"google.golang.org/grpc"

Expand Down Expand Up @@ -65,6 +66,7 @@ import (
"go.temporal.io/server/common/searchattribute"
"go.temporal.io/server/service/frontend"
"go.temporal.io/server/service/history"
"go.temporal.io/server/service/history/replication"
"go.temporal.io/server/service/history/workflow"
"go.temporal.io/server/service/matching"
"go.temporal.io/server/service/worker"
Expand Down Expand Up @@ -547,6 +549,7 @@ func (c *temporalImpl) startHistory(
fx.Provide(workflow.NewTaskGeneratorProvider),
history.QueueProcessorModule,
history.Module,
replication.Module,
fx.Populate(&historyService, &clientBean, &namespaceRegistry),
fx.NopLogger)
err = app.Err()
Expand Down
18 changes: 0 additions & 18 deletions service/history/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,9 @@ import (
"go.uber.org/fx"
"google.golang.org/grpc"

"go.temporal.io/server/client"
"go.temporal.io/server/common"
"go.temporal.io/server/common/archiver/provider"
"go.temporal.io/server/common/clock"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/log"
Expand All @@ -55,7 +53,6 @@ import (
"go.temporal.io/server/service"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/replication"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
warchiver "go.temporal.io/server/service/worker/archiver"
Expand All @@ -76,7 +73,6 @@ var Module = fx.Options(
fx.Provide(PersistenceMaxQpsProvider),
fx.Provide(ServiceResolverProvider),
fx.Provide(EventNotifierProvider),
fx.Provide(ReplicationTaskFetcherFactoryProvider),
fx.Provide(ArchivalClientProvider),
fx.Provide(HistoryEngineFactoryProvider),
fx.Provide(HandlerProvider),
Expand Down Expand Up @@ -252,20 +248,6 @@ func EventNotifierProvider(
)
}

func ReplicationTaskFetcherFactoryProvider(
logger log.Logger,
config *configs.Config,
clusterMetadata cluster.Metadata,
clientBean client.Bean,
) replication.TaskFetcherFactory {
return replication.NewTaskFetcherFactory(
logger,
config,
clusterMetadata,
clientBean,
)
}

func ArchivalClientProvider(
archiverProvider provider.ArchiverProvider,
sdkClientFactory sdk.ClientFactory,
Expand Down
26 changes: 23 additions & 3 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type (
replicationAckMgr replication.AckManager
nDCReplicator nDCHistoryReplicator
nDCActivityReplicator nDCActivityReplicator
replicationProcessorMgr common.Daemon
eventNotifier events.Notifier
tokenSerializer common.TaskTokenSerializer
metricsClient metrics.Client
Expand Down Expand Up @@ -129,6 +130,8 @@ func NewEngineWithShardContext(
archivalClient archiver.Client,
eventSerializer serialization.Serializer,
queueProcessorFactories []queues.ProcessorFactory,
replicationTaskFetcherFactory replication.TaskFetcherFactory,
replicationTaskExecutorProvider replication.TaskExecutorProvider,
) shard.Engine {
currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()

Expand Down Expand Up @@ -213,8 +216,24 @@ func NewEngineWithShardContext(
)

historyEngImpl.workflowTaskHandler = newWorkflowTaskHandlerCallback(historyEngImpl)
historyEngImpl.replicationDLQHandler = replication.NewLazyDLQHandler(shard, workflowDeleteManager, historyCache, clientBean)

historyEngImpl.replicationDLQHandler = replication.NewLazyDLQHandler(
shard,
workflowDeleteManager,
historyCache,
clientBean,
replicationTaskExecutorProvider,
)
historyEngImpl.replicationProcessorMgr = replication.NewTaskProcessorManager(
config,
shard,
historyEngImpl,
historyCache,
workflowDeleteManager,
clientBean,
eventSerializer,
replicationTaskFetcherFactory,
replicationTaskExecutorProvider,
)
return historyEngImpl
}

Expand All @@ -236,6 +255,7 @@ func (e *historyEngineImpl) Start() {
for _, queueProcessor := range e.queueProcessors {
queueProcessor.Start()
}
e.replicationProcessorMgr.Start()

// failover callback will try to create a failover queue processor to scan all inflight tasks
// if domain needs to be failovered. However, in the multicursor queue logic, the scan range
Expand Down Expand Up @@ -264,7 +284,7 @@ func (e *historyEngineImpl) Stop() {
for _, queueProcessor := range e.queueProcessors {
queueProcessor.Stop()
}

e.replicationProcessorMgr.Stop()
// unset the failover callback
e.shard.GetNamespaceRegistry().UnregisterNamespaceChangeCallback(e)
}
Expand Down
25 changes: 15 additions & 10 deletions service/history/historyEngineFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/queues"
"go.temporal.io/server/service/history/replication"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
"go.temporal.io/server/service/worker/archiver"
Expand All @@ -43,16 +44,18 @@ type (
HistoryEngineFactoryParams struct {
fx.In

ClientBean client.Bean
MatchingClient resource.MatchingClient
SdkClientFactory sdk.ClientFactory
EventNotifier events.Notifier
Config *configs.Config
RawMatchingClient resource.MatchingRawClient
NewCacheFn workflow.NewCacheFn
ArchivalClient archiver.Client
EventSerializer serialization.Serializer
QueueProcessorFactories []queues.ProcessorFactory `group:"queueProcessorFactory"`
ClientBean client.Bean
MatchingClient resource.MatchingClient
SdkClientFactory sdk.ClientFactory
EventNotifier events.Notifier
Config *configs.Config
RawMatchingClient resource.MatchingRawClient
NewCacheFn workflow.NewCacheFn
ArchivalClient archiver.Client
EventSerializer serialization.Serializer
QueueProcessorFactories []queues.ProcessorFactory `group:"queueProcessorFactory"`
ReplicationTaskFetcherFactory replication.TaskFetcherFactory
ReplicationTaskExecutorProvider replication.TaskExecutorProvider
}

historyEngineFactory struct {
Expand All @@ -75,5 +78,7 @@ func (f *historyEngineFactory) CreateEngine(
f.ArchivalClient,
f.EventSerializer,
f.QueueProcessorFactories,
f.ReplicationTaskFetcherFactory,
f.ReplicationTaskExecutorProvider,
)
}
6 changes: 3 additions & 3 deletions service/history/nDCWorkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (r *nDCWorkflowImpl) happensAfter(
return false, err
}

return workflowHappensAfter(
return WorkflowHappensAfter(
thisLastWriteVersion,
thisLastEventTaskID,
thatLastWriteVersion,
Expand Down Expand Up @@ -169,7 +169,7 @@ func (r *nDCWorkflowImpl) suppressBy(
return workflow.TransactionPolicyActive, err
}

if workflowHappensAfter(
if WorkflowHappensAfter(
lastWriteVersion,
lastEventTaskID,
incomingLastWriteVersion,
Expand Down Expand Up @@ -283,7 +283,7 @@ func (r *nDCWorkflowImpl) zombiefyWorkflow() error {
)
}

func workflowHappensAfter(
func WorkflowHappensAfter(
thisLastWriteVersion int64,
thisLastEventTaskID int64,
thatLastWriteVersion int64,
Expand Down
8 changes: 4 additions & 4 deletions service/history/nDCWorkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (s *nDCWorkflowSuite) TestHappensAfter_LargerVersion() {
thatLastWriteVersion := thisLastWriteVersion - 1
thatLastEventTaskID := int64(123)

s.True(workflowHappensAfter(
s.True(WorkflowHappensAfter(
thisLastWriteVersion,
thisLastEventTaskID,
thatLastWriteVersion,
Expand All @@ -142,7 +142,7 @@ func (s *nDCWorkflowSuite) TestHappensAfter_SmallerVersion() {
thatLastWriteVersion := thisLastWriteVersion + 1
thatLastEventTaskID := int64(23)

s.False(workflowHappensAfter(
s.False(WorkflowHappensAfter(
thisLastWriteVersion,
thisLastEventTaskID,
thatLastWriteVersion,
Expand All @@ -156,7 +156,7 @@ func (s *nDCWorkflowSuite) TestHappensAfter_SameVersion_SmallerTaskID() {
thatLastWriteVersion := thisLastWriteVersion
thatLastEventTaskID := thisLastEventTaskID + 1

s.False(workflowHappensAfter(
s.False(WorkflowHappensAfter(
thisLastWriteVersion,
thisLastEventTaskID,
thatLastWriteVersion,
Expand All @@ -170,7 +170,7 @@ func (s *nDCWorkflowSuite) TestHappensAfter_SameVersion_LatrgerTaskID() {
thatLastWriteVersion := thisLastWriteVersion
thatLastEventTaskID := thisLastEventTaskID - 1

s.True(workflowHappensAfter(
s.True(WorkflowHappensAfter(
thisLastWriteVersion,
thisLastEventTaskID,
thatLastWriteVersion,
Expand Down
30 changes: 0 additions & 30 deletions service/history/queueProcessorFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ var QueueProcessorModule = fx.Options(
Group: queues.ProcessorFactoryFxGroup,
Target: NewVisibilityQueueProcessorFactory,
},
fx.Annotated{
Group: queues.ProcessorFactoryFxGroup,
Target: NewReplicationQueueProcessorFactory,
},
),
)

Expand Down Expand Up @@ -307,32 +303,6 @@ func (f *visibilityQueueProcessorFactory) CreateProcessor(
)
}

func NewReplicationQueueProcessorFactory(
params replicationQueueProcessorFactoryParams,
) queues.ProcessorFactory {

return &replicationQueueProcessorFactory{
replicationQueueProcessorFactoryParams: params,
}
}

func (f *replicationQueueProcessorFactory) CreateProcessor(
shard shard.Context,
engine shard.Engine,
workflowCache workflow.Cache,
) queues.Processor {
return replication.NewTaskProcessorFactory(
f.ArchivalClient,
f.Config,
engine,
f.EventSerializer,
shard,
f.TaskFetcherFactory,
workflowCache,
f.ClientBean,
)
}

func newQueueProcessorHostRateLimiter(
hostRPS dynamicconfig.IntPropertyFn,
fallBackRPS dynamicconfig.IntPropertyFn,
Expand Down
42 changes: 22 additions & 20 deletions service/history/replication/dlq_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,14 @@ type (
}

dlqHandlerImpl struct {
taskExecutorsLock sync.Mutex
taskExecutors map[string]TaskExecutor
shard shard.Context
deleteManager workflow.DeleteManager
workflowCache workflow.Cache
resender xdc.NDCHistoryResender
logger log.Logger
taskExecutorsLock sync.Mutex
taskExecutors map[string]TaskExecutor
shard shard.Context
deleteManager workflow.DeleteManager
workflowCache workflow.Cache
resender xdc.NDCHistoryResender
taskExecutorProvider TaskExecutorProvider
logger log.Logger
}
)

Expand All @@ -87,13 +88,15 @@ func NewLazyDLQHandler(
deleteManager workflow.DeleteManager,
workflowCache workflow.Cache,
clientBean client.Bean,
taskExecutorProvider TaskExecutorProvider,
) DLQHandler {
return newDLQHandler(
shard,
deleteManager,
workflowCache,
clientBean,
make(map[string]TaskExecutor),
taskExecutorProvider,
)
}

Expand All @@ -103,6 +106,7 @@ func newDLQHandler(
workflowCache workflow.Cache,
clientBean client.Bean,
taskExecutors map[string]TaskExecutor,
taskExecutorProvider TaskExecutorProvider,
) *dlqHandlerImpl {

if taskExecutors == nil {
Expand Down Expand Up @@ -131,8 +135,9 @@ func newDLQHandler(
shard.GetConfig().StandbyTaskReReplicationContextTimeout,
shard.GetLogger(),
),
taskExecutors: taskExecutors,
logger: shard.GetLogger(),
taskExecutors: taskExecutors,
taskExecutorProvider: taskExecutorProvider,
logger: shard.GetLogger(),
}
}

Expand Down Expand Up @@ -334,17 +339,14 @@ func (r *dlqHandlerImpl) getOrCreateTaskExecutor(clusterName string) (TaskExecut
if err != nil {
return nil, err
}
taskExecutor := NewTaskExecutor(
clusterName,
r.shard,
r.shard.GetNamespaceRegistry(),
r.resender,
engine,
r.deleteManager,
r.workflowCache,
r.shard.GetMetricsClient(),
r.shard.GetLogger(),
)
taskExecutor := r.taskExecutorProvider(TaskExecutorParams{
RemoteCluster: clusterName,
Shard: r.shard,
HistoryResender: r.resender,
HistoryEngine: engine,
DeleteManager: r.deleteManager,
WorkflowCache: r.workflowCache,
})
r.taskExecutors[clusterName] = taskExecutor
return taskExecutor, nil
}
10 changes: 10 additions & 0 deletions service/history/replication/dlq_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,16 @@ func (s *dlqHandlerSuite) SetupTest() {
workflow.NewMockCache(s.controller),
s.mockClientBean,
s.taskExecutors,
func(params TaskExecutorParams) TaskExecutor {
return NewTaskExecutor(
params.RemoteCluster,
params.Shard,
params.HistoryResender,
params.HistoryEngine,
params.DeleteManager,
params.WorkflowCache,
)
},
)
}

Expand Down

0 comments on commit 4c11b96

Please sign in to comment.