Skip to content

Commit

Permalink
Wire replication stream E2E (#4097)
Browse files Browse the repository at this point in the history
* Add replication stream API &UT
  • Loading branch information
wxing1292 committed Mar 29, 2023
1 parent fca3662 commit 8f030db
Show file tree
Hide file tree
Showing 27 changed files with 1,349 additions and 309 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ update-linters:

update-mockgen:
@printf $(COLOR) "Install/update mockgen tool..."
@go install github.com/golang/mock/mockgen@v1.6.0
@go install github.com/golang/mock/mockgen@v1.7.0-rc.1

update-proto-plugins:
@printf $(COLOR) "Install/update proto plugins..."
Expand Down
5 changes: 5 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,9 @@ const (

// keys for history

// EnableReplicationStream turn on replication stream
EnableReplicationStream = "history.enableReplicationStream"

// HistoryRPS is request rate per second for each history host
HistoryRPS = "history.rps"
// HistoryPersistenceMaxQPS is the max qps history host can query DB
Expand Down Expand Up @@ -649,6 +652,8 @@ const (
ReplicationTaskProcessorShardQPS = "history.ReplicationTaskProcessorShardQPS"
// ReplicationBypassCorruptedData is the flag to bypass corrupted workflow data in source cluster
ReplicationBypassCorruptedData = "history.ReplicationBypassCorruptedData"
// ReplicationProcessorSchedulerWorkerCount is the replication task executor worker count
ReplicationProcessorSchedulerWorkerCount = "history.ReplicationProcessorSchedulerWorkerCount"

// keys for worker

Expand Down
4 changes: 4 additions & 0 deletions common/rpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/rpc/interceptor"
serviceerrors "go.temporal.io/server/common/serviceerror"
)

Expand Down Expand Up @@ -92,6 +93,9 @@ func Dial(hostName string, tlsConfig *tls.Config, logger log.Logger, interceptor
errorInterceptor,
)...,
),
grpc.WithChainStreamInterceptor(
interceptor.StreamErrorInterceptor,
),
grpc.WithDefaultServiceConfig(DefaultServiceConfig),
grpc.WithDisableServiceConfig(),
grpc.WithConnectParams(cp),
Expand Down
12 changes: 2 additions & 10 deletions common/tasks/fifo_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,7 @@ type (
status int32
options *FIFOSchedulerOptions

monitor Monitor[T]
logger log.Logger
logger log.Logger

tasksChan chan T
shutdownChan chan struct{}
Expand All @@ -66,16 +65,14 @@ type (

// NewFIFOScheduler creates a new FIFOScheduler
func NewFIFOScheduler[T Task](
scheduleMoniter Monitor[T],
options *FIFOSchedulerOptions,
logger log.Logger,
) *FIFOScheduler[T] {
return &FIFOScheduler[T]{
status: common.DaemonStatusInitialized,
options: options,

monitor: scheduleMoniter,
logger: logger,
logger: logger,

tasksChan: make(chan T, options.QueueSize),
shutdownChan: make(chan struct{}),
Expand All @@ -91,7 +88,6 @@ func (f *FIFOScheduler[T]) Start() {
return
}

f.monitor.Start()
f.startWorkers(f.options.WorkerCount())

f.shutdownWG.Add(1)
Expand All @@ -113,8 +109,6 @@ func (f *FIFOScheduler[T]) Stop() {
// must be called after the close of the shutdownChan
f.drainTasks()

f.monitor.Stop()

go func() {
if success := common.AwaitWaitGroup(&f.shutdownWG, time.Minute); !success {
f.logger.Warn("fifo scheduler timed out waiting for workers")
Expand Down Expand Up @@ -225,8 +219,6 @@ func (f *FIFOScheduler[T]) processTask(
func (f *FIFOScheduler[T]) executeTask(
task T,
) {
f.monitor.RecordStart(task)

operation := func() error {
if err := task.Execute(); err != nil {
return task.HandleErr(err)
Expand Down
7 changes: 0 additions & 7 deletions common/tasks/fifo_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ type (
scheduler *FIFOScheduler[*MockTask]
retryPolicy backoff.RetryPolicy
}

noopMonitor[T Task] struct{}
)

func TestFIFOSchedulerSuite(t *testing.T) {
Expand Down Expand Up @@ -220,15 +218,10 @@ func (s *fifoSchedulerSuite) TestStartStopWorkers() {

func (s *fifoSchedulerSuite) newTestProcessor() *FIFOScheduler[*MockTask] {
return NewFIFOScheduler[*MockTask](
&noopMonitor[*MockTask]{},
&FIFOSchedulerOptions{
QueueSize: 1,
WorkerCount: dynamicconfig.GetIntPropertyFn(1),
},
log.NewNoopLogger(),
)
}

func (m *noopMonitor[T]) Start() {}
func (m *noopMonitor[T]) Stop() {}
func (m *noopMonitor[T]) RecordStart(T) {}
40 changes: 0 additions & 40 deletions common/tasks/monitor.go

This file was deleted.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/gogo/status v1.1.1
github.com/golang-jwt/jwt/v4 v4.4.3
github.com/golang/mock v1.6.0
github.com/golang/mock v1.7.0-rc.1
github.com/google/go-cmp v0.5.9
github.com/google/uuid v1.3.0
github.com/iancoleman/strcase v0.2.0
Expand Down
4 changes: 3 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -684,8 +684,9 @@ github.com/golang/mock v1.4.1/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt
github.com/golang/mock v1.4.3/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/golang/mock v1.7.0-rc.1 h1:YojYx61/OLFsiv6Rw1Z96LpldJIy31o+UHmwAUMJ6/U=
github.com/golang/mock v1.7.0-rc.1/go.mod h1:s42URUywIqd+OcERslBJvOjepvNymP31m3q8d/GkuRs=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
Expand Down Expand Up @@ -1409,6 +1410,7 @@ golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.8/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU=
golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.3.0/go.mod h1:/rWhSS2+zyEVwoJf8YAX6L2f0ntZ7Kn/mGgAWcipA5k=
Expand Down
Loading

0 comments on commit 8f030db

Please sign in to comment.