Skip to content

Commit

Permalink
Fix tests after updating go-liftbridge
Browse files Browse the repository at this point in the history
  • Loading branch information
tylertreat committed Apr 23, 2020
1 parent f81271a commit 1e4e6c1
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 34 deletions.
16 changes: 8 additions & 8 deletions server/activity_test.go
Expand Up @@ -36,9 +36,9 @@ func TestActivityStreamCreateStream(t *testing.T) {

// The first message read back should be the creation of the activity
// stream partition.
msgs := make(chan lift.Message, 1)
msgs := make(chan *lift.Message, 1)
ctx, cancel := context.WithCancel(context.Background())
err = client.Subscribe(ctx, activityStream, func(msg lift.Message, err error) {
err = client.Subscribe(ctx, activityStream, func(msg *lift.Message, err error) {
require.NoError(t, err)
msgs <- msg
cancel()
Expand Down Expand Up @@ -91,9 +91,9 @@ func TestActivityStreamDeleteStream(t *testing.T) {
require.NoError(t, client.DeleteStream(context.Background(), stream))

// The first message read back should be the stream deletion.
msgs := make(chan lift.Message, 1)
msgs := make(chan *lift.Message, 1)
ctx, cancel := context.WithCancel(context.Background())
err = client.Subscribe(ctx, activityStream, func(msg lift.Message, err error) {
err = client.Subscribe(ctx, activityStream, func(msg *lift.Message, err error) {
require.NoError(t, err)
msgs <- msg
cancel()
Expand Down Expand Up @@ -144,9 +144,9 @@ func TestActivityStreamPauseStream(t *testing.T) {
require.NoError(t, client.PauseStream(context.Background(), stream))

// The first message read back should be the stream pause.
msgs := make(chan lift.Message, 1)
msgs := make(chan *lift.Message, 1)
ctx, cancel := context.WithCancel(context.Background())
err = client.Subscribe(ctx, activityStream, func(msg lift.Message, err error) {
err = client.Subscribe(ctx, activityStream, func(msg *lift.Message, err error) {
require.NoError(t, err)
msgs <- msg
cancel()
Expand Down Expand Up @@ -203,9 +203,9 @@ func TestActivityStreamResumeStream(t *testing.T) {
require.NoError(t, err)

// The first message read back should be the stream resume.
msgs := make(chan lift.Message, 1)
msgs := make(chan *lift.Message, 1)
ctx, cancel := context.WithCancel(context.Background())
err = client.Subscribe(ctx, activityStream, func(msg lift.Message, err error) {
err = client.Subscribe(ctx, activityStream, func(msg *lift.Message, err error) {
require.NoError(t, err)
msgs <- msg
cancel()
Expand Down
12 changes: 6 additions & 6 deletions server/api_test.go
Expand Up @@ -26,7 +26,7 @@ type message struct {
Offset int64
}

func assertMsg(t *testing.T, expected *message, msg lift.Message) {
func assertMsg(t *testing.T, expected *message, msg *lift.Message) {
require.Equal(t, expected.Offset, msg.Offset())
require.Equal(t, expected.Key, msg.Key())
require.Equal(t, expected.Value, msg.Value())
Expand Down Expand Up @@ -335,7 +335,7 @@ func TestSubscribeStreamNotLeader(t *testing.T) {

// Subscribe on the follower.
err = client2.Subscribe(context.Background(), name,
func(msg lift.Message, err error) {
func(msg *lift.Message, err error) {
require.NoError(t, err)
fmt.Println("receiving msg")
}, lift.ReadISRReplica())
Expand Down Expand Up @@ -458,7 +458,7 @@ func TestStreamReceiveMsgFromReplica(t *testing.T) {
defer client2.Close()

// Subscribe on the follower.
err = client2.Subscribe(context.Background(), name, func(msg lift.Message, err error) {
err = client2.Subscribe(context.Background(), name, func(msg *lift.Message, err error) {
require.NoError(t, err)
//expect := expected[i]
//assertMsg(t, expect, msg)
Expand Down Expand Up @@ -514,7 +514,7 @@ func TestStreamReceiveMsgFromReplica(t *testing.T) {
i = num
ch1 = make(chan struct{})
err = client3.Subscribe(context.Background(), name,
func(msg lift.Message, err error) {
func(msg *lift.Message, err error) {
require.NoError(t, err)
expect := expected[i]
assertMsg(t, expect, msg)
Expand Down Expand Up @@ -568,7 +568,7 @@ func TestStreamPublishSubscribe(t *testing.T) {
i := 0
ch1 := make(chan struct{})
ch2 := make(chan struct{})
err = client.Subscribe(context.Background(), name, func(msg lift.Message, err error) {
err = client.Subscribe(context.Background(), name, func(msg *lift.Message, err error) {
require.NoError(t, err)
expect := expected[i]
assertMsg(t, expect, msg)
Expand Down Expand Up @@ -624,7 +624,7 @@ func TestStreamPublishSubscribe(t *testing.T) {
i = num
ch1 = make(chan struct{})
err = client2.Subscribe(context.Background(), name,
func(msg lift.Message, err error) {
func(msg *lift.Message, err error) {
require.NoError(t, err)
expect := expected[i]
assertMsg(t, expect, msg)
Expand Down
6 changes: 3 additions & 3 deletions server/replicator_test.go
Expand Up @@ -143,7 +143,7 @@ func TestStreamLeaderFailover(t *testing.T) {
i := 0
ch := make(chan struct{})
err = client.Subscribe(context.Background(), name,
func(msg lift.Message, err error) {
func(msg *lift.Message, err error) {
if i == num && err != nil {
return
}
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestStreamLeaderFailover(t *testing.T) {
i = 0
ch = make(chan struct{})
err = client.Subscribe(context.Background(), name,
func(msg lift.Message, err error) {
func(msg *lift.Message, err error) {
if i == num && err != nil {
return
}
Expand Down Expand Up @@ -440,7 +440,7 @@ func TestCommitOnRestart(t *testing.T) {
i := 0
ch := make(chan struct{})
err = client.Subscribe(context.Background(), name,
func(msg lift.Message, err error) {
func(msg *lift.Message, err error) {
if i == num*2 && err != nil {
return
}
Expand Down
34 changes: 17 additions & 17 deletions server/server_test.go
Expand Up @@ -541,7 +541,7 @@ func TestSubscribeOffsetOverflow(t *testing.T) {
// starting at offset 5.
gotMsg := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
err = client.Subscribe(ctx, name, func(msg lift.Message, err error) {
err = client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
require.NoError(t, err)
require.Equal(t, int64(5), msg.Offset())
close(gotMsg)
Expand Down Expand Up @@ -592,7 +592,7 @@ func TestSubscribeOffsetOverflowEmptyStream(t *testing.T) {
// starting at offset 0.
gotMsg := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
err = client.Subscribe(ctx, name, func(msg lift.Message, err error) {
err = client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
require.NoError(t, err)
require.Equal(t, int64(0), msg.Offset())
close(gotMsg)
Expand Down Expand Up @@ -658,7 +658,7 @@ func TestSubscribeOffsetUnderflow(t *testing.T) {
// Subscribe with underflowed offset. This should set the offset to 1.
gotMsg := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
err = client.Subscribe(ctx, name, func(msg lift.Message, err error) {
err = client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
require.NoError(t, err)
require.Equal(t, int64(1), msg.Offset())
close(gotMsg)
Expand Down Expand Up @@ -717,9 +717,9 @@ func TestStreamRetentionBytes(t *testing.T) {
forceLogClean(t, subject, name, s1)

// The first message read back should have offset 87.
msgs := make(chan lift.Message, 1)
msgs := make(chan *lift.Message, 1)
ctx, cancel := context.WithCancel(context.Background())
err = client.Subscribe(ctx, name, func(msg lift.Message, err error) {
err = client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
require.NoError(t, err)
msgs <- msg
cancel()
Expand Down Expand Up @@ -778,9 +778,9 @@ func TestStreamRetentionMessages(t *testing.T) {
forceLogClean(t, subject, name, s1)

// The first message read back should have offset 5.
msgs := make(chan lift.Message, 1)
msgs := make(chan *lift.Message, 1)
ctx, cancel := context.WithCancel(context.Background())
err = client.Subscribe(ctx, name, func(msg lift.Message, err error) {
err = client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
require.NoError(t, err)
msgs <- msg
cancel()
Expand Down Expand Up @@ -840,9 +840,9 @@ func TestStreamRetentionAge(t *testing.T) {

// We expect all segments but the last to be truncated due to age, so the
// first message read back should have offset 99.
msgs := make(chan lift.Message, 1)
msgs := make(chan *lift.Message, 1)
ctx, cancel := context.WithCancel(context.Background())
err = client.Subscribe(ctx, name, func(msg lift.Message, err error) {
err = client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
require.NoError(t, err)
msgs <- msg
cancel()
Expand Down Expand Up @@ -904,7 +904,7 @@ func TestSubscribeEarliest(t *testing.T) {
// Subscribe with EARLIEST. This should start reading from offset 1.
gotMsg := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
client.Subscribe(ctx, name, func(msg lift.Message, err error) {
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
require.NoError(t, err)
require.Equal(t, int64(1), msg.Offset())
close(gotMsg)
Expand Down Expand Up @@ -958,7 +958,7 @@ func TestSubscribeLatest(t *testing.T) {
// Subscribe with LATEST. This should start reading from offset 2.
gotMsg := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
client.Subscribe(ctx, name, func(msg lift.Message, err error) {
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
require.NoError(t, err)
require.Equal(t, int64(2), msg.Offset())
close(gotMsg)
Expand Down Expand Up @@ -1013,7 +1013,7 @@ func TestSubscribeNewOnly(t *testing.T) {
// offset 5.
gotMsg := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
err = client.Subscribe(ctx, name, func(msg lift.Message, err error) {
err = client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
require.NoError(t, err)
require.Equal(t, int64(5), msg.Offset())
close(gotMsg)
Expand Down Expand Up @@ -1082,7 +1082,7 @@ func TestSubscribeStartTime(t *testing.T) {
// Subscribe with TIMESTAMP 25. This should start reading from offset 3.
gotMsg := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
client.Subscribe(ctx, name, func(msg lift.Message, err error) {
client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
select {
case <-gotMsg:
return
Expand Down Expand Up @@ -1392,9 +1392,9 @@ func TestPauseStreamAllPartitions(t *testing.T) {
_, err = client.Publish(ctx, name, []byte("hello"))
require.NoError(t, err)

msgs := make(chan lift.Message, 1)
msgs := make(chan *lift.Message, 1)
ctx, cancel = context.WithCancel(context.Background())
err = client.Subscribe(ctx, name, func(msg lift.Message, err error) {
err = client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
require.NoError(t, err)
msgs <- msg
cancel()
Expand Down Expand Up @@ -1461,9 +1461,9 @@ func TestPauseStreamSomePartitions(t *testing.T) {
_, err = client.Publish(ctx, name, []byte("hello"), lift.ToPartition(1))
require.NoError(t, err)

msgs := make(chan lift.Message, 1)
msgs := make(chan *lift.Message, 1)
ctx, cancel = context.WithCancel(context.Background())
err = client.Subscribe(ctx, name, func(msg lift.Message, err error) {
err = client.Subscribe(ctx, name, func(msg *lift.Message, err error) {
require.NoError(t, err)
msgs <- msg
cancel()
Expand Down

0 comments on commit 1e4e6c1

Please sign in to comment.