Skip to content

Commit

Permalink
Bugfix: do not create duplicated gRPC stream
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 committed May 19, 2023
1 parent 9712711 commit 19085dc
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 57 deletions.
2 changes: 0 additions & 2 deletions service/history/configs/config.go
Expand Up @@ -238,7 +238,6 @@ type Config struct {
ReplicationEnableDLQMetrics dynamicconfig.BoolPropertyFn

ReplicationStreamSyncStatusDuration dynamicconfig.DurationPropertyFn
ReplicationStreamMinReconnectDuration dynamicconfig.DurationPropertyFn
ReplicationProcessorSchedulerQueueSize dynamicconfig.IntPropertyFn
ReplicationProcessorSchedulerWorkerCount dynamicconfig.IntPropertyFn

Expand Down Expand Up @@ -417,7 +416,6 @@ func NewConfig(
ReplicationEnableDLQMetrics: dc.GetBoolProperty(dynamicconfig.ReplicationEnableDLQMetrics, true),

ReplicationStreamSyncStatusDuration: dc.GetDurationProperty(dynamicconfig.ReplicationStreamSyncStatusDuration, 1*time.Second),
ReplicationStreamMinReconnectDuration: dc.GetDurationProperty(dynamicconfig.ReplicationStreamMinReconnectDuration, 4*time.Second),
ReplicationProcessorSchedulerQueueSize: dc.GetIntProperty(dynamicconfig.ReplicationProcessorSchedulerQueueSize, 128),
ReplicationProcessorSchedulerWorkerCount: dc.GetIntProperty(dynamicconfig.ReplicationProcessorSchedulerWorkerCount, 512),

Expand Down
7 changes: 7 additions & 0 deletions service/history/replication/bi_direction_stream.go
Expand Up @@ -63,6 +63,7 @@ type (
Send(Req) error
Recv() (<-chan StreamResp[Resp], error)
Close()
IsValid() bool
}
StreamResp[Resp any] struct {
Resp Resp
Expand Down Expand Up @@ -135,6 +136,12 @@ func (s *BiDirectionStreamImpl[Req, Resp]) Close() {
s.closeLocked()
}

func (s *BiDirectionStreamImpl[Req, Resp]) IsValid() bool {
s.Lock()
defer s.Unlock()
return s.status != streamStatusClosed
}

func (s *BiDirectionStreamImpl[Req, Resp]) closeLocked() {
if s.status == streamStatusClosed {
return
Expand Down
20 changes: 8 additions & 12 deletions service/history/replication/bi_direction_stream_test.go
Expand Up @@ -117,18 +117,21 @@ func (s *biDirectionStreamSuite) TestLazyInit() {
s.biDirectionStream.Unlock()
s.NoError(err)
s.Equal(s.streamClient, s.biDirectionStream.streamingClient)
s.True(s.biDirectionStream.IsValid())

s.biDirectionStream.Lock()
err = s.biDirectionStream.lazyInitLocked()
s.biDirectionStream.Unlock()
s.NoError(err)
s.Equal(s.streamClient, s.biDirectionStream.streamingClient)
s.True(s.biDirectionStream.IsValid())

s.biDirectionStream.Close()
s.biDirectionStream.Lock()
err = s.biDirectionStream.lazyInitLocked()
s.biDirectionStream.Unlock()
s.Error(err)
s.False(s.biDirectionStream.IsValid())
}

func (s *biDirectionStreamSuite) TestSend() {
Expand All @@ -140,9 +143,7 @@ func (s *biDirectionStreamSuite) TestSend() {
s.NoError(err)
}
s.Equal(reqs, s.streamClient.requests)
s.biDirectionStream.Lock()
defer s.biDirectionStream.Unlock()
s.Equal(streamStatusOpen, s.biDirectionStream.status)
s.True(s.biDirectionStream.IsValid())
}

func (s *biDirectionStreamSuite) TestSend_Err() {
Expand All @@ -152,9 +153,7 @@ func (s *biDirectionStreamSuite) TestSend_Err() {

err := s.biDirectionStream.Send(rand.Int())
s.Error(err)
s.biDirectionStream.Lock()
defer s.biDirectionStream.Unlock()
s.Equal(streamStatusClosed, s.biDirectionStream.status)
s.False(s.biDirectionStream.IsValid())
}

func (s *biDirectionStreamSuite) TestRecv() {
Expand All @@ -168,9 +167,7 @@ func (s *biDirectionStreamSuite) TestRecv() {
resps = append(resps, streamResp.Resp)
}
s.Equal(s.streamClient.responses, resps)
s.biDirectionStream.Lock()
defer s.biDirectionStream.Unlock()
s.Equal(streamStatusClosed, s.biDirectionStream.status)
s.False(s.biDirectionStream.IsValid())
}

func (s *biDirectionStreamSuite) TestRecv_Err() {
Expand All @@ -183,9 +180,8 @@ func (s *biDirectionStreamSuite) TestRecv_Err() {
s.Error(streamResp.Err)
_, ok := <-streamRespChan
s.False(ok)
s.biDirectionStream.Lock()
defer s.biDirectionStream.Unlock()
s.Equal(streamStatusClosed, s.biDirectionStream.status)
s.False(s.biDirectionStream.IsValid())

}

func (p *mockStreamClientProvider) Get(
Expand Down
48 changes: 6 additions & 42 deletions service/history/replication/stream_receiver.go
Expand Up @@ -26,7 +26,6 @@ package replication

import (
"context"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -61,10 +60,7 @@ type (
taskTracker ExecutableTaskTracker
shutdownChan channel.ShutdownOnce
logger log.Logger

sync.Mutex
streamCreationTime time.Time
stream Stream
stream Stream
}
)

Expand Down Expand Up @@ -94,8 +90,6 @@ func NewStreamReceiver(
taskTracker: taskTracker,
shutdownChan: channel.NewShutdownOnce(),
logger: logger,

streamCreationTime: time.Now().UTC(),
stream: newStream(
processToolBox,
clientShardKey,
Expand Down Expand Up @@ -150,9 +144,9 @@ func (r *StreamReceiver) sendEventLoop() {
select {
case <-timer.C:
timer.Reset(r.Config.ReplicationStreamSyncStatusDuration())
streamCreationTime, stream := r.getStream()
if err := r.ackMessage(stream); err != nil {
r.recreateStream(streamCreationTime)
if err := r.ackMessage(r.stream); err != nil {
r.logger.Error("StreamReceiver exit send loop", tag.Error(err))
return
}
case <-r.shutdownChan.Channel():
return
Expand All @@ -163,38 +157,8 @@ func (r *StreamReceiver) sendEventLoop() {
func (r *StreamReceiver) recvEventLoop() {
defer r.Stop()

for !r.shutdownChan.IsShutdown() {
streamCreationTime, stream := r.getStream()
_ = r.processMessages(stream)
r.recreateStream(streamCreationTime)
}
}

func (r *StreamReceiver) getStream() (time.Time, Stream) {
r.Lock()
defer r.Unlock()
return r.streamCreationTime, r.stream
}

func (r *StreamReceiver) recreateStream(
streamCreationTime time.Time,
) {
delay := streamCreationTime.Add(r.Config.ReplicationStreamMinReconnectDuration()).Sub(time.Now().UTC())
if delay > 0 {
select {
case <-time.After(delay):
case <-r.shutdownChan.Channel():
}
}

r.Lock()
defer r.Unlock()
r.streamCreationTime = time.Now().UTC()
r.stream = newStream(
r.ProcessToolBox,
r.clientShardKey,
r.serverShardKey,
)
err := r.processMessages(r.stream)
r.logger.Error("StreamReceiver exit recv loop", tag.Error(err))
}

func (r *StreamReceiver) ackMessage(
Expand Down
9 changes: 8 additions & 1 deletion service/history/replication/stream_receiver_test.go
Expand Up @@ -61,6 +61,7 @@ type (
mockStream struct {
requests []*adminservice.StreamWorkflowReplicationMessagesRequest
respChan chan StreamResp[*adminservice.StreamWorkflowReplicationMessagesResponse]
closed bool
}
mockScheduler struct {
tasks []TrackableExecutableTask
Expand Down Expand Up @@ -212,7 +213,13 @@ func (s *mockStream) Recv() (<-chan StreamResp[*adminservice.StreamWorkflowRepli
return s.respChan, nil
}

func (s *mockStream) Close() {}
func (s *mockStream) Close() {
s.closed = true
}

func (s *mockStream) IsValid() bool {
return !s.closed
}

func (s *mockScheduler) Submit(task TrackableExecutableTask) {
s.tasks = append(s.tasks, task)
Expand Down

0 comments on commit 19085dc

Please sign in to comment.