Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simpler implementation, with no race condition #1

Merged
merged 1 commit into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ test_short:
go test ./... -short

test_race:
go test ./... -short -race
go test ./... -race

test_stress:
go test -v -tags=stress -timeout=45m ./...
Expand Down
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@ require (
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.1
github.com/vmihailenco/msgpack v4.0.4+incompatible
github.com/wk8/go-error-buffer v0.0.0-20230515211523-1bb61b128a10
)

require (
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/friendsofgo/errors v0.9.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.7 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,19 @@ github.com/Rican7/retry v0.3.1 h1:scY4IbO8swckzoA/11HgBwaZRJEyY9vaNJshcdhp1Mc=
github.com/Rican7/retry v0.3.1/go.mod h1:CxSDrhAyXmTMeEuRAnArMu1FHu48vtfjLREWqVl7Vw0=
github.com/ThreeDotsLabs/watermill v1.2.0 h1:TU3TML1dnQ/ifK09F2+4JQk2EKhmhXe7Qv7eb5ZpTS8=
github.com/ThreeDotsLabs/watermill v1.2.0/go.mod h1:IuVxGk/kgCN0cex2S94BLglUiB0PwOm8hbUhm6g2Nx4=
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/friendsofgo/errors v0.9.2 h1:X6NYxef4efCBdwI7BgS820zFaN7Cphrmb+Pljdzjtgk=
github.com/friendsofgo/errors v0.9.2/go.mod h1:yCvFW5AkDIL9qn7suHVLiI/gH228n7PC4Pn44IGoTOI=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
Expand All @@ -26,8 +32,11 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8=
github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
Expand All @@ -46,6 +55,10 @@ github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKs
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI=
github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/wk8/go-error-buffer v0.0.0-20230515211523-1bb61b128a10 h1:YX7AEbO6/OLivzT2piLGjZyxv30cmMs2weQ587w/JQI=
github.com/wk8/go-error-buffer v0.0.0-20230515211523-1bb61b128a10/go.mod h1:N0jirnKcRGOtdZiyUpIq/yxrLNzZL7snWCjqTayQTaQ=
github.com/wk8/go-ordered-map/v2 v2.1.7 h1:aUZ1xBMdbvY8wnNt77qqo4nyT3y0pX4Usat48Vm+hik=
github.com/wk8/go-ordered-map/v2 v2.1.7/go.mod h1:9Xvgm2mV2kSq2SAm0Y608tBmu8akTzI7c2bz7/G7ZN4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
Expand All @@ -56,6 +69,7 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c=
Expand Down
24 changes: 1 addition & 23 deletions pkg/redisstream/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,16 @@ package redisstream

import (
"context"
"sync"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
)

type Publisher struct {
config PublisherConfig
client redis.UniversalClient
logger watermill.LoggerAdapter

closed bool
closeMutex sync.Mutex
}

// NewPublisher creates a new redis stream Publisher.
Expand All @@ -35,7 +30,6 @@ func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publ
config: config,
client: config.Client,
logger: logger,
closed: false,
}, nil
}

Expand Down Expand Up @@ -69,10 +63,6 @@ func (c *PublisherConfig) Validate() error {
// Publish is blocking and wait for redis response
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
if p.closed {
return errors.New("publisher closed")
}

logFields := make(watermill.LogFields, 3)
logFields["topic"] = topic

Expand Down Expand Up @@ -108,17 +98,5 @@ func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
}

func (p *Publisher) Close() error {
p.closeMutex.Lock()
defer p.closeMutex.Unlock()

if p.closed {
return nil
}
p.closed = true

if err := p.client.Close(); err != nil {
return err
}

return nil
}
166 changes: 90 additions & 76 deletions pkg/redisstream/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"github.com/stretchr/testify/require"
)

// should be long enough to be robust even for CI boxes
const testInterval = 250 * time.Millisecond

func redisClient() (redis.UniversalClient, error) {
client := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Expand Down Expand Up @@ -64,8 +67,7 @@ func createPubSubWithConsumerGroup(t *testing.T, consumerGroup string) (message.
Client: redisClientOrFail(t),
Consumer: watermill.NewShortUUID(),
ConsumerGroup: consumerGroup,
BlockTime: 10 * time.Millisecond,
ClaimInterval: 3 * time.Second,
ClaimInterval: 10 * time.Millisecond,
MaxIdleTime: 5 * time.Second,
})
}
Expand Down Expand Up @@ -129,79 +131,7 @@ func TestSubscriber(t *testing.T) {
require.NoError(t, subscriber.Close())
}

func TestFanOut(t *testing.T) {
topic := watermill.NewShortUUID()

subscriber1, err := NewSubscriber(
SubscriberConfig{
Client: redisClientOrFail(t),
Consumer: watermill.NewShortUUID(),
ConsumerGroup: "",
},
watermill.NewStdLogger(true, false),
)
require.NoError(t, err)

subscriber2, err := NewSubscriber(
SubscriberConfig{
Client: redisClientOrFail(t),
Consumer: watermill.NewShortUUID(),
ConsumerGroup: "",
},
watermill.NewStdLogger(true, false),
)
require.NoError(t, err)

publisher, err := NewPublisher(
PublisherConfig{
Client: redisClientOrFail(t),
},
watermill.NewStdLogger(false, false),
)
require.NoError(t, err)
for i := 0; i < 10; i++ {
require.NoError(t, publisher.Publish(topic, message.NewMessage(watermill.NewShortUUID(), []byte("test"+strconv.Itoa(i)))))
}

messages1, err := subscriber1.Subscribe(context.Background(), topic)
require.NoError(t, err)
messages2, err := subscriber2.Subscribe(context.Background(), topic)
require.NoError(t, err)

// wait for initial XREAD before publishing messages to avoid message loss
time.Sleep(2 * DefaultBlockTime)
for i := 10; i < 50; i++ {
require.NoError(t, publisher.Publish(topic, message.NewMessage(watermill.NewShortUUID(), []byte("test"+strconv.Itoa(i)))))
}

for i := 10; i < 50; i++ {
msg := <-messages1
if msg == nil {
t.Fatal("msg nil")
}
t.Logf("subscriber 1: %v %v %v", msg.UUID, msg.Metadata, string(msg.Payload))
require.Equal(t, string(msg.Payload), "test"+strconv.Itoa(i))
msg.Ack()
}
for i := 10; i < 50; i++ {
msg := <-messages2
if msg == nil {
t.Fatal("msg nil")
}
t.Logf("subscriber 2: %v %v %v", msg.UUID, msg.Metadata, string(msg.Payload))
require.Equal(t, string(msg.Payload), "test"+strconv.Itoa(i))
msg.Ack()
}

require.NoError(t, publisher.Close())
require.NoError(t, subscriber1.Close())
require.NoError(t, subscriber2.Close())
}

func TestClaimIdle(t *testing.T) {
// should be long enough to be robust even for CI boxes
testInterval := 250 * time.Millisecond

topic := watermill.NewShortUUID()
consumerGroup := watermill.NewShortUUID()
testLogger := watermill.NewStdLogger(true, false)
Expand All @@ -228,7 +158,7 @@ func TestClaimIdle(t *testing.T) {
// handles loop variables in function literals
subID := subscriberID

suscriber, err := NewSubscriber(
subscriber, err := NewSubscriber(
SubscriberConfig{
Client: redisClientOrFail(t),
Consumer: strconv.Itoa(subID),
Expand Down Expand Up @@ -264,7 +194,7 @@ func TestClaimIdle(t *testing.T) {
router.AddNoPublisherHandler(
strconv.Itoa(subID),
topic,
suscriber,
subscriber,
func(msg *message.Message) error {
msgID, err := strconv.Atoi(string(msg.Payload))
require.NoError(t, err)
Expand Down Expand Up @@ -343,3 +273,87 @@ func TestClaimIdle(t *testing.T) {

assert.GreaterOrEqual(t, nMsgsWithRetries, 3)
}

// this test checks that even workers that are idle for a while will
// try to claim messages that have been idle for too long, which is not covered by TestClaimIdle
func TestMessagesGetClaimedEvenByIdleWorkers(t *testing.T) {
topic := watermill.NewShortUUID()
consumerGroup := watermill.NewShortUUID()
testLogger := watermill.NewStdLogger(true, false)

router, err := message.NewRouter(message.RouterConfig{
CloseTimeout: testInterval,
}, testLogger)
require.NoError(t, err)

receivedCh := make(chan int)
payload := message.Payload("coucou toi")

// let's create a few subscribers, that just wait for a while each time they receive anything
nSubscribers := 8
for subscriberID := 0; subscriberID < nSubscribers; subscriberID++ {
subID := subscriberID

subscriber, err := NewSubscriber(
SubscriberConfig{
Client: redisClientOrFail(t),
Consumer: strconv.Itoa(subID),
ConsumerGroup: consumerGroup,
ClaimInterval: testInterval,
MaxIdleTime: testInterval,
},
testLogger,
)
require.NoError(t, err)

router.AddNoPublisherHandler(
strconv.Itoa(subID),
topic,
subscriber,
func(msg *message.Message) error {
assert.Equal(t, msg.Payload, payload)

receivedCh <- subID
time.Sleep(time.Duration(nSubscribers+2) * testInterval)

return nil
},
)
}

runCtx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
require.NoError(t, router.Run(runCtx))
}()

// now let's push only one message
publisher, err := NewPublisher(
PublisherConfig{
Client: redisClientOrFail(t),
},
testLogger,
)
require.NoError(t, err)
msg := message.NewMessage(watermill.NewShortUUID(), payload)
require.NoError(t, publisher.Publish(topic, msg))

// it should get retried by all subscribers
seenSubscribers := make([]bool, nSubscribers)
for receivedCount := 0; receivedCount != nSubscribers; receivedCount++ {
select {
case subscriberID := <-receivedCh:
assert.False(t, seenSubscribers[subscriberID], "subscriber %d seen more than once", subscriberID)
seenSubscribers[subscriberID] = true

case <-time.After(time.Duration(nSubscribers) * 2 * testInterval):
t.Fatalf("timed out waiting for new messages, only received %d messages", receivedCount)
}
}

// shut everything down
cancel()
wg.Wait()
}