Skip to content

Commit

Permalink
fix: race problem in timeout mono (#42)
Browse files Browse the repository at this point in the history
* fix: race problem in timeout mono

* add ut

* fix ut

* set codecov
  • Loading branch information
jjeffcaii committed Jun 27, 2022
1 parent 7f7286f commit a7bd3e9
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 26 deletions.
4 changes: 4 additions & 0 deletions codecov.yml
@@ -0,0 +1,4 @@
coverage:
range: 70..100
round: down
precision: 2
58 changes: 32 additions & 26 deletions mono/timeout.go
Expand Up @@ -2,6 +2,8 @@ package mono

import (
"context"
"math"
"sync/atomic"
"time"

"github.com/jjeffcaii/reactor-go"
Expand All @@ -13,37 +15,56 @@ type monoTimeout struct {
timeout time.Duration
}

func newMonoTimeout(source reactor.RawPublisher, timeout time.Duration) *monoTimeout {
return &monoTimeout{
source: source,
timeout: timeout,
}
}

func (m *monoTimeout) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
m.source.SubscribeWith(ctx, &timeoutSubscriber{
actual: s,
timeout: m.timeout,
done: make(chan struct{}),
})
}

type timeoutSubscriber struct {
actual reactor.Subscriber
timeout time.Duration
done chan struct{}
closed int32
}

func (t *timeoutSubscriber) OnComplete() {
select {
case <-t.done:
default:
if atomic.CompareAndSwapInt32(&t.closed, 0, math.MaxInt32) || atomic.CompareAndSwapInt32(&t.closed, 1, math.MaxInt32) {
close(t.done)
t.actual.OnComplete()
}
}

func (t *timeoutSubscriber) OnError(err error) {
select {
case <-t.done:
hooks.Global().OnErrorDrop(err)
default:
if atomic.CompareAndSwapInt32(&t.closed, 0, -1) {
close(t.done)
t.actual.OnError(err)
return
}

// item is emitted before error reach, should be processed as completed.
if atomic.CompareAndSwapInt32(&t.closed, 1, -1) {
close(t.done)
t.actual.OnComplete()
}

hooks.Global().OnErrorDrop(err)
}

func (t *timeoutSubscriber) OnNext(any reactor.Any) {
select {
case <-t.done:
hooks.Global().OnNextDrop(any)
default:
if atomic.CompareAndSwapInt32(&t.closed, 0, 1) {
t.actual.OnNext(any)
} else {
hooks.Global().OnNextDrop(any)
}
}

Expand All @@ -59,18 +80,3 @@ func (t *timeoutSubscriber) OnSubscribe(ctx context.Context, subscription reacto
}()
t.actual.OnSubscribe(ctx, subscription)
}

func (m *monoTimeout) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
m.source.SubscribeWith(ctx, &timeoutSubscriber{
actual: s,
timeout: m.timeout,
done: make(chan struct{}),
})
}

func newMonoTimeout(source reactor.RawPublisher, timeout time.Duration) *monoTimeout {
return &monoTimeout{
source: source,
timeout: timeout,
}
}
85 changes: 85 additions & 0 deletions mono/timeout_test.go
Expand Up @@ -2,6 +2,7 @@ package mono

import (
"context"
"io"
"testing"
"time"

Expand Down Expand Up @@ -33,3 +34,87 @@ func TestMonoTimeout_SubscribeWith(t *testing.T) {
newMonoTimeout(source, 20*time.Millisecond).SubscribeWith(context.Background(), s)
time.Sleep(30 * time.Millisecond)
}

func TestTimeoutSubscriber(t *testing.T) {
t.Run("ErrorCompleteNext", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

ms := NewMockSubscription(ctrl)

ms.EXPECT().Request(gomock.Any()).AnyTimes()
ms.EXPECT().Cancel().AnyTimes()

msub := NewMockSubscriber(ctrl)
msub.EXPECT().OnSubscribe(gomock.Any(), gomock.Any()).Do(MockRequestInfinite).Times(1)
msub.EXPECT().OnError(gomock.Any()).Times(1)
msub.EXPECT().OnNext(gomock.Any()).Times(0)
msub.EXPECT().OnComplete().Times(0)

sub := &timeoutSubscriber{
actual: msub,
done: make(chan struct{}),
}

sub.OnSubscribe(context.Background(), ms)

sub.OnError(io.EOF)
sub.OnComplete()
sub.OnNext(1)
})

t.Run("CompleteErrorNext", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

ms := NewMockSubscription(ctrl)

ms.EXPECT().Request(gomock.Any()).AnyTimes()
ms.EXPECT().Cancel().AnyTimes()

msub := NewMockSubscriber(ctrl)
msub.EXPECT().OnSubscribe(gomock.Any(), gomock.Any()).Do(MockRequestInfinite).Times(1)
msub.EXPECT().OnError(gomock.Any()).Times(0)
msub.EXPECT().OnNext(gomock.Any()).Times(0)
msub.EXPECT().OnComplete().Times(1)

sub := &timeoutSubscriber{
actual: msub,
done: make(chan struct{}),
}

sub.OnSubscribe(context.Background(), ms)

sub.OnComplete()
sub.OnError(io.EOF)
sub.OnNext(1)
})

t.Run("NextErrorComplete", func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

ms := NewMockSubscription(ctrl)

ms.EXPECT().Request(gomock.Any()).AnyTimes()
ms.EXPECT().Cancel().AnyTimes()

msub := NewMockSubscriber(ctrl)
msub.EXPECT().OnSubscribe(gomock.Any(), gomock.Any()).Do(MockRequestInfinite).Times(1)
msub.EXPECT().OnError(gomock.Any()).Times(0)
msub.EXPECT().OnNext(gomock.Any()).Times(1)
msub.EXPECT().OnComplete().Times(1)

sub := &timeoutSubscriber{
actual: msub,
done: make(chan struct{}),
}

sub.OnSubscribe(context.Background(), ms)

sub.OnNext(1)
sub.OnError(io.EOF)
sub.OnComplete()
})

}

0 comments on commit a7bd3e9

Please sign in to comment.