Skip to content

Commit

Permalink
Some test tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
willdot committed Jul 11, 2023
1 parent 15f1604 commit df4f603
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 15 deletions.
20 changes: 16 additions & 4 deletions send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ func TestSendNats(t *testing.T) {
Repeat: 1,
}

natsSub := setupNats(t)
consumerCtx, consumerCancel := context.WithCancel(context.Background())
defer consumerCancel()

natsSub := setupNats(t, consumerCtx)

err := send(cfg, mockFileReader)
require.NoError(t, err)
Expand Down Expand Up @@ -103,7 +106,10 @@ func TestSendRedis(t *testing.T) {
Repeat: 1,
}

redisSub := setupRedis(t)
consumerCtx, consumerCancel := context.WithCancel(context.Background())
defer consumerCancel()

redisSub := setupRedis(t, consumerCtx)

err := send(cfg, mockFileReader)
require.NoError(t, err)
Expand Down Expand Up @@ -136,7 +142,10 @@ func TestSendGooglePubSub(t *testing.T) {
Repeat: 1,
}

googlePubSub := setupGooglePubSub(t)
consumerCtx, consumerCancel := context.WithCancel(context.Background())
defer consumerCancel()

googlePubSub := setupGooglePubSub(t, consumerCtx)

err := send(cfg, mockFileReader)
require.NoError(t, err)
Expand Down Expand Up @@ -168,7 +177,10 @@ func TestSendKafka(t *testing.T) {
Repeat: 1,
}

kafkaConsumer := setupKakfa(t)
consumerCtx, consumerCancel := context.WithCancel(context.Background())
defer consumerCancel()

kafkaConsumer := setupKakfa(t, consumerCtx)

err := send(cfg, mockFileReader)
require.NoError(t, err)
Expand Down
33 changes: 22 additions & 11 deletions setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type natsSubscriber struct {
msgs chan *nats.Msg
}

func setupNats(t *testing.T) natsSubscriber {
func setupNats(t *testing.T, ctx context.Context) natsSubscriber {
nc, err := nats.Connect(nats_url)
require.NoError(t, err)

Expand All @@ -78,7 +78,13 @@ func setupNats(t *testing.T) natsSubscriber {

go func() {

Check failure on line 79 in setup_test.go

View workflow job for this annotation

GitHub Actions / build

SA2002: the goroutine calls T.Fatal, which must be called in the same goroutine as the test (staticcheck)
for {
msg, _ := sub.NextMsg(time.Second * 10)
msg, err := sub.NextMsgWithContext(ctx)
if err != nil {
if err == context.Canceled {
return
}
t.Fatal(err)

Check failure on line 86 in setup_test.go

View workflow job for this annotation

GitHub Actions / build

testinggoroutine: call to (*T).Fatal from a non-test goroutine (govet)
}
natsSub.msgs <- msg
}
}()
Expand All @@ -90,7 +96,7 @@ type redisSubscriber struct {
msgs chan *redis.Message
}

func setupRedis(t *testing.T) redisSubscriber {
func setupRedis(t *testing.T, ctx context.Context) redisSubscriber {
client := redis.NewClient(&redis.Options{
Addr: redis_url,
})
Expand All @@ -110,7 +116,10 @@ func setupRedis(t *testing.T) redisSubscriber {

go func() {
for {
msg, _ := subscriber.ReceiveMessage(context.Background())
msg, err := subscriber.ReceiveMessage(ctx)
if err != nil {
return
}
redisSub.msgs <- msg
}
}()
Expand Down Expand Up @@ -173,9 +182,8 @@ type googlePubSub struct {
msgs chan *pubsub.Message
}

func setupGooglePubSub(t *testing.T) googlePubSub {
func setupGooglePubSub(t *testing.T, ctx context.Context) googlePubSub {
t.Setenv("PUBSUB_EMULATOR_HOST", google_pub_sub_url)
ctx := context.Background()

client, err := pubsub.NewClient(ctx, test_project_id, option.WithoutAuthentication(), option.WithEndpoint(google_pub_sub_url))
require.NoError(t, err)
Expand Down Expand Up @@ -212,12 +220,11 @@ func setupGooglePubSub(t *testing.T) googlePubSub {
}

go func() {
err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
_ = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
msg.Ack()
pubSub.msgs <- msg
})

require.NoError(t, err)
}()

return pubSub
Expand All @@ -227,7 +234,7 @@ type kafkaConsumer struct {
msgs chan *sarama.ConsumerMessage
}

func setupKakfa(t *testing.T) kafkaConsumer {
func setupKakfa(t *testing.T, ctx context.Context) kafkaConsumer {
consumer, err := sarama.NewConsumer([]string{kafka_url}, sarama.NewConfig())
require.NoError(t, err)

Expand All @@ -253,8 +260,12 @@ func setupKakfa(t *testing.T) kafkaConsumer {

go func() {
for {
msg := <-pc.Messages()
kafkaConsumer.msgs <- msg
select {
case msg := <-pc.Messages():
kafkaConsumer.msgs <- msg
case <-ctx.Done():
return
}
}
}()

Expand Down

0 comments on commit df4f603

Please sign in to comment.