Skip to content

Commit

Permalink
js: Add option to consume from Streams that are mirrors or sourced
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
Signed-off-by: Jaime Piña <jaime@synadia.com>
  • Loading branch information
wallyqs authored and nsurfer committed Mar 17, 2021
1 parent d70f82c commit fc28381
Show file tree
Hide file tree
Showing 2 changed files with 264 additions and 24 deletions.
42 changes: 28 additions & 14 deletions js.go
Expand Up @@ -541,10 +541,14 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
deliver = NewInbox()
}
} else {
// Find the stream mapped to the subject.
stream, err = js.lookupStreamBySubject(subj)
if err != nil {
return nil, err
// Find the stream mapped to the subject if not bound to a stream already.
if o.stream == _EMPTY_ {
stream, err = js.lookupStreamBySubject(subj)
if err != nil {
return nil, err
}
} else {
stream = o.stream
}

// With an explicit durable name, then can lookup
Expand Down Expand Up @@ -860,6 +864,14 @@ func RateLimit(n uint64) SubOpt {
})
}

// BindStream binds a consumer to a stream explicitly based on a name.
func BindStream(name string) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.stream = name
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down Expand Up @@ -1040,11 +1052,12 @@ func (m *Msg) InProgress() error {

// MsgMetadata is the JetStream metadata associated with received messages.
type MsgMetaData struct {
Consumer uint64
Stream uint64
Delivered uint64
Pending uint64
Timestamp time.Time
Consumer uint64
Stream uint64
Delivered uint64
Pending uint64
Timestamp time.Time
StreamName string
}

// MetaData retrieves the metadata from a JetStream message.
Expand All @@ -1071,11 +1084,12 @@ func (m *Msg) MetaData() (*MsgMetaData, error) {
}

meta := &MsgMetaData{
Delivered: uint64(parseNum(tokens[4])),
Stream: uint64(parseNum(tokens[5])),
Consumer: uint64(parseNum(tokens[6])),
Timestamp: time.Unix(0, parseNum(tokens[7])),
Pending: uint64(parseNum(tokens[8])),
Delivered: uint64(parseNum(tokens[4])),
Stream: uint64(parseNum(tokens[5])),
Consumer: uint64(parseNum(tokens[6])),
Timestamp: time.Unix(0, parseNum(tokens[7])),
Pending: uint64(parseNum(tokens[8])),
StreamName: tokens[2],
}

return meta, nil
Expand Down
246 changes: 236 additions & 10 deletions test/js_test.go
Expand Up @@ -1769,9 +1769,15 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
t.Fatal(err)
}

toSend := 100
const (
toSend = 100
publishSubj = "TEST"
sourceName = "MY_SOURCE_TEST"
mirrorName = "MY_MIRROR_TEST"
)
for i := 0; i < toSend; i++ {
if _, err := js1.Publish("TEST", []byte("OK")); err != nil {
data := []byte(fmt.Sprintf("OK %d", i))
if _, err := js1.Publish(publishSubj, data); err != nil {
t.Fatalf("Unexpected publish error: %v", err)
}
}
Expand Down Expand Up @@ -1799,12 +1805,36 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
return nil
})
}
checkConsume := func(t *testing.T, js nats.JetStream, subject, stream string, want int) {
t.Helper()
sub, err := js.SubscribeSync(subject, nats.BindStream(stream))
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()

checkSubsPending(t, sub, want)

for i := 0; i < want; i++ {
msg, err := sub.NextMsg(time.Second)
if err != nil {
t.Fatal(err)
}
meta, err := msg.MetaData()
if err != nil {
t.Fatal(err)
}
if got, want := meta.StreamName, stream; got != want {
t.Fatalf("unexpected stream name, got=%q, want=%q", got, want)
}
}
}

_, err = js2.AddStream(&nats.StreamConfig{
Name: "MY_MIRROR_TEST",
Name: mirrorName,
Storage: nats.FileStorage,
Mirror: &nats.StreamSource{
Name: "TEST",
Name: publishSubj,
External: &nats.ExternalStream{
APIPrefix: "RI.JS.API",
DeliverPrefix: "RI.DELIVER.SYNC.MIRRORS",
Expand All @@ -1815,13 +1845,14 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
t.Fatal(err)
}
checkMsgCount(t, "MY_MIRROR_TEST")
checkConsume(t, js2, publishSubj, mirrorName, toSend)

_, err = js2.AddStream(&nats.StreamConfig{
Name: "MY_SOURCE_TEST",
Name: sourceName,
Storage: nats.FileStorage,
Sources: []*nats.StreamSource{
&nats.StreamSource{
Name: "TEST",
Name: publishSubj,
External: &nats.ExternalStream{
APIPrefix: "RI.JS.API",
DeliverPrefix: "RI.DELIVER.SYNC.SOURCES",
Expand All @@ -1833,6 +1864,7 @@ func TestJetStreamCrossAccountMirrorsAndSources(t *testing.T) {
t.Fatal(err)
}
checkMsgCount(t, "MY_SOURCE_TEST")
checkConsume(t, js2, publishSubj, sourceName, toSend)
}

func TestJetStreamAutoMaxAckPending(t *testing.T) {
Expand Down Expand Up @@ -3322,7 +3354,7 @@ func TestJetStream_ClusterPlacement(t *testing.T) {
}

func TestJetStreamStreamMirror(t *testing.T) {
withJSCluster(t, "MIRROR", 3, testJetStreamMirror_Source)
withJSServer(t, testJetStreamMirror_Source)
}

func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) {
Expand Down Expand Up @@ -3427,6 +3459,90 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) {
if got < totalMsgs {
t.Errorf("Expected %v, got: %v", totalMsgs, got)
}

t.Run("consume from mirror", func(t *testing.T) {
sub, err := js.SubscribeSync("origin", nats.BindStream("m1"))
if err != nil {
t.Fatal(err)
}

mmsgs := make([]*nats.Msg, 0)
for i := 0; i < totalMsgs; i++ {
msg, err := sub.NextMsg(2 * time.Second)
if err != nil {
t.Error(err)
}
meta, err := msg.MetaData()
if err != nil {
t.Error(err)
}
if meta.StreamName != "m1" {
t.Errorf("Expected m1, got: %v", meta.StreamName)
}
mmsgs = append(mmsgs, msg)
}
if len(mmsgs) != totalMsgs {
t.Errorf("Expected to consume %v msgs, got: %v", totalMsgs, len(mmsgs))
}
})
})

t.Run("consume from original source", func(t *testing.T) {
sub, err := js.SubscribeSync("origin")
defer sub.Unsubscribe()
if err != nil {
t.Fatal(err)
}
msg, err := sub.NextMsg(2 * time.Second)
if err != nil {
t.Error(err)
}
meta, err := msg.MetaData()
if err != nil {
t.Error(err)
}
if meta.StreamName != "origin" {
t.Errorf("Expected m1, got: %v", meta.StreamName)
}
})

t.Run("bind to non existing stream fails", func(t *testing.T) {
_, err := js.SubscribeSync("origin", nats.BindStream("foo"))
if err == nil {
t.Fatal("Unexpected success")
}
if err.Error() != `stream not found` {
t.Fatal("Expected stream not found error")
}
})

t.Run("bind to stream with wrong subject fails", func(t *testing.T) {
_, err := js.SubscribeSync("secret", nats.BindStream("origin"))
if err == nil {
t.Fatal("Unexpected success")
}
if err.Error() != `consumer filter subject is not a valid subset of the interest subjects` {
t.Fatal("Expected stream not found error")
}
})

t.Run("bind to origin stream", func(t *testing.T) {
// This would only avoid the stream names lookup.
sub, err := js.SubscribeSync("origin", nats.BindStream("origin"))
if err != nil {
t.Fatal(err)
}
msg, err := sub.NextMsg(2 * time.Second)
if err != nil {
t.Error(err)
}
meta, err := msg.MetaData()
if err != nil {
t.Error(err)
}
if meta.StreamName != "origin" {
t.Errorf("Expected m1, got: %v", meta.StreamName)
}
})

t.Run("get mirror info", func(t *testing.T) {
Expand Down Expand Up @@ -3514,6 +3630,19 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) {
if got != expected {
t.Errorf("Expected %v, got: %v", expected, got)
}

t.Run("consume from sourced stream", func(t *testing.T) {
// Cannot lookup subjects of a stream that is itself sourced.
t.SkipNow()
sub, err := js.SubscribeSync("origin", nats.BindStream("s1"))
if err != nil {
t.Error(err)
}
_, err = sub.NextMsg(2 * time.Second)
if err != nil {
t.Error(err)
}
})
})

t.Run("update stream with sources", func(t *testing.T) {
Expand Down Expand Up @@ -3545,16 +3674,103 @@ func testJetStreamMirror_Source(t *testing.T, nodes ...*jsServer) {
t.Errorf("Expected %v, got: %v", expected, got)
}

got = len(updated.Sources)
got = int(updated.Config.MaxMsgs)
expected = int(config.MaxMsgs)
if got != expected {
t.Errorf("Expected %v, got: %v", expected, got)
}
})

got = int(updated.Config.MaxMsgs)
expected = int(config.MaxMsgs)
t.Run("create sourced stream from origin", func(t *testing.T) {
sources := make([]*nats.StreamSource, 0)
sources = append(sources, &nats.StreamSource{Name: "origin"})
sources = append(sources, &nats.StreamSource{Name: "m1"})
streamName := "s2"
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Sources: sources,
Storage: nats.FileStorage,
Replicas: 1,
})
if err != nil {
t.Fatalf("Unexpected error creating stream: %v", err)
}

msgs := make([]*nats.RawStreamMsg, 0)

// Stored message sequences start at 1
startSequence := 1
expectedTotal := totalMsgs * 2

GetNextMsg:
for i := startSequence; i < expectedTotal+1; i++ {
var (
err error
seq = uint64(i)
msg *nats.RawStreamMsg
timeout = time.Now().Add(5 * time.Second)
)

Retry:
for time.Now().Before(timeout) {
msg, err = js.GetMsg(streamName, seq)
if err != nil {
time.Sleep(100 * time.Millisecond)
continue Retry
}
msgs = append(msgs, msg)
continue GetNextMsg
}
if err != nil {
t.Fatalf("Unexpected error fetching seq=%v: %v", seq, err)
}
}

got := len(msgs)
if got < expectedTotal {
t.Errorf("Expected %v, got: %v", expectedTotal, got)
}

si, err := js.StreamInfo(streamName)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
got = int(si.State.Msgs)
if got != expectedTotal {
t.Errorf("Expected %v, got: %v", expectedTotal, got)
}

got = len(si.Sources)
expected := 2
if got != expected {
t.Errorf("Expected %v, got: %v", expected, got)
}

t.Run("consume from sourced stream", func(t *testing.T) {
sub, err := js.SubscribeSync("origin", nats.BindStream(streamName))
if err != nil {
t.Fatal(err)
}

mmsgs := make([]*nats.Msg, 0)
for i := 0; i < totalMsgs; i++ {
msg, err := sub.NextMsg(2 * time.Second)
if err != nil {
t.Error(err)
}
meta, err := msg.MetaData()
if err != nil {
t.Error(err)
}
if meta.StreamName != streamName {
t.Errorf("Expected m1, got: %v", meta.StreamName)
}
mmsgs = append(mmsgs, msg)
}
if len(mmsgs) != totalMsgs {
t.Errorf("Expected to consume %v msgs, got: %v", totalMsgs, len(mmsgs))
}
})
})
}

Expand Down Expand Up @@ -4158,3 +4374,13 @@ func checkFor(t *testing.T, totalWait, sleepDur time.Duration, f func() error) {
t.Fatal(err.Error())
}
}

func checkSubsPending(t *testing.T, sub *nats.Subscription, numExpected int) {
t.Helper()
checkFor(t, 4*time.Second, 20*time.Millisecond, func() error {
if nmsgs, _, err := sub.Pending(); err != nil || nmsgs != numExpected {
return fmt.Errorf("Did not receive correct number of messages: %d vs %d", nmsgs, numExpected)
}
return nil
})
}

0 comments on commit fc28381

Please sign in to comment.