Skip to content

Commit

Permalink
js: Add GetMsg API to JetStreamManager
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Feb 7, 2021
1 parent b0da8a2 commit 0422a80
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 3 deletions.
3 changes: 3 additions & 0 deletions js.go
Expand Up @@ -73,6 +73,9 @@ const (
// apiStreamListT is the endpoint that will return all detailed stream information
apiStreamList = "STREAM.LIST"

// apiMsgGetT is the endpoint to get a message.
apiMsgGetT = "STREAM.MSG.GET.%s"

// apiMsgDeleteT is the endpoint to remove a message.
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"
)
Expand Down
51 changes: 51 additions & 0 deletions jsm.go
Expand Up @@ -41,6 +41,9 @@ type JetStreamManager interface {
// NewStreamLister is used to return pages of StreamInfo objects.
NewStreamLister() *StreamLister

// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
GetMsg(name string, seq uint64) (*StreamMsg, error)

// DeleteMsg erases a message from a Stream.
DeleteMsg(name string, seq uint64) error

Expand Down Expand Up @@ -462,6 +465,54 @@ func (js *js) DeleteStream(name string) error {
return nil
}

type apiMsgGetRequest struct {
Seq uint64 `json:"seq"`
}

// StreamMsg is a raw message stored in JetStream.
type StreamMsg struct {
Subject string `json:"subject"`
Sequence uint64 `json:"seq"`
Header []byte `json:"hdrs,omitempty"`
Data []byte `json:"data,omitempty"`
Time time.Time `json:"time"`
}

// apiMsgGetResponse is the response for a Stream get request.
type apiMsgGetResponse struct {
apiResponse
Message *StreamMsg `json:"message,omitempty"`
Success bool `json:"success,omitempty"`
}

// GetMsg retrieves a raw stream message stored in JetStream by sequence number.
func (js *js) GetMsg(name string, seq uint64) (*StreamMsg, error) {
if name == _EMPTY_ {
return nil, ErrStreamNameRequired
}

req, err := json.Marshal(&apiMsgGetRequest{Seq: seq})
if err != nil {
return nil, err
}

dsSubj := js.apiSubj(fmt.Sprintf(apiMsgGetT, name))
r, err := js.nc.Request(dsSubj, req, js.wait)
if err != nil {
return nil, err
}

var resp apiMsgGetResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
return nil, errors.New(resp.Error.Description)
}

return resp.Message, nil
}

type msgDeleteRequest struct {
Seq uint64 `json:"seq"`
}
Expand Down
7 changes: 4 additions & 3 deletions nats.go
Expand Up @@ -486,7 +486,7 @@ type Conn struct {
respRand *rand.Rand // Used for generating suffix
}

// A Subscription represents interest in a given subject.
// Subscription represents interest in a given subject.
type Subscription struct {
mu sync.Mutex
sid int64
Expand Down Expand Up @@ -530,7 +530,8 @@ type Subscription struct {
dropped int
}

// Msg is a structure used by Subscribers and PublishMsg().
// Msg represents a message delivered by NATS. This structure is used
// by Subscribers and PublishMsg().
type Msg struct {
Subject string
Reply string
Expand Down Expand Up @@ -2807,7 +2808,7 @@ func (nc *Conn) Publish(subj string, data []byte) error {
return nc.publish(subj, _EMPTY_, nil, data)
}

// Used to create a new message for publishing that will use headers.
// NewMsg creates a message for publishing that will use headers.
func NewMsg(subject string) *Msg {
return &Msg{
Subject: subject,
Expand Down
175 changes: 175 additions & 0 deletions test/js_test.go
Expand Up @@ -816,6 +816,164 @@ func TestJetStreamManagement(t *testing.T) {
})
}

func TestJetStreamManagement_GetMsg(t *testing.T) {
t.Run("1-node", func(t *testing.T) {
withJSServer(t, testJetStreamManagement_GetMsg)
})
t.Run("3-node", func(t *testing.T) {
withJSCluster(t, "GET", 3, testJetStreamManagement_GetMsg)
})
}

func testJetStreamManagement_GetMsg(t *testing.T, srvs ...*jsServer) {
s := srvs[0]
nc, err := nats.Connect(s.ClientURL())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer nc.Close()

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

_, err = js.AddStream(&nats.StreamConfig{
Name: "foo",
Subjects: []string{"foo.A", "foo.B", "foo.C"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for i := 0; i < 5; i++ {
msg := nats.NewMsg("foo.A")
data := fmt.Sprintf("A:%d", i)
msg.Data = []byte(data)
msg.Header.Add("X-Nats-Test-Data", data)
js.PublishMsg(msg)
js.Publish("foo.B", []byte(fmt.Sprintf("B:%d", i)))
js.Publish("foo.C", []byte(fmt.Sprintf("C:%d", i)))
}

var originalSeq uint64
t.Run("get message", func(t *testing.T) {
expected := 5
msgs := make([]*nats.Msg, 0)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

sub, err := js.Subscribe("foo.C", func(msg *nats.Msg) {
msgs = append(msgs, msg)
if len(msgs) == expected {
cancel()
}
})
if err != nil {
t.Fatal(err)
}
<-ctx.Done()
sub.Unsubscribe()

got := len(msgs)
if got != expected {
t.Fatalf("Expected: %d, got: %d", expected, got)
}

msg := msgs[3]
meta, err := msg.MetaData()
if err != nil {
t.Fatal(err)
}
originalSeq = meta.Stream

// Get the same message using JSM.
fetchedMsg, err := js.GetMsg("foo", originalSeq)
if err != nil {
t.Fatal(err)
}

expectedData := "C:3"
if string(fetchedMsg.Data) != expectedData {
t.Errorf("Expected: %v, got: %v", expectedData, string(fetchedMsg.Data))
}
})

t.Run("get deleted message", func(t *testing.T) {
err := js.DeleteMsg("foo", originalSeq)
if err != nil {
t.Fatal(err)
}

si, err := js.StreamInfo("foo")
if err != nil {
t.Fatal(err)
}
expected := 14
if int(si.State.Msgs) != expected {
t.Errorf("Expected %d msgs, got: %d", expected, si.State.Msgs)
}

// There should be only 4 messages since one deleted.
expected = 4
msgs := make([]*nats.Msg, 0)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

sub, err := js.Subscribe("foo.C", func(msg *nats.Msg) {
msgs = append(msgs, msg)

if len(msgs) == expected {
cancel()
}
})
if err != nil {
t.Fatal(err)
}
<-ctx.Done()
sub.Unsubscribe()

msg := msgs[3]
meta, err := msg.MetaData()
if err != nil {
t.Fatal(err)
}
newSeq := meta.Stream

// First message removed
if newSeq <= originalSeq {
t.Errorf("Expected %d to be higher sequence than %d",
newSeq, originalSeq)
}

// Try to fetch the same message which should be gone.
_, err = js.GetMsg("foo", originalSeq)
if err == nil || err.Error() != `deleted message` {
t.Errorf("Expected deleted message error, got: %v", err)
}
})

t.Run("get message with headers", func(t *testing.T) {
streamMsg, err := js.GetMsg("foo", 4)
if err != nil {
t.Fatal(err)
}
if streamMsg.Sequence != 4 {
t.Errorf("Expected %v, got: %v", 4, streamMsg.Sequence)
}
got := string(streamMsg.Header)
expected := "NATS/1.0\r\nX-Nats-Test-Data: A:1\r\n\r\n"
if got != expected {
t.Errorf("Expected %q, got: %q", expected, got)
}

expectedData := "A:1"
if string(streamMsg.Data) != expectedData {
t.Errorf("Expected %v, got: %v", expectedData, string(streamMsg.Data))
}
})
}

func TestJetStreamManagement_DeleteMsg(t *testing.T) {
s := RunBasicJetStreamServer()
defer s.Shutdown()
Expand Down Expand Up @@ -2323,6 +2481,23 @@ func setupJSClusterWithSize(t *testing.T, clusterName string, size int) []*jsSer
return nodes
}

func withJSServer(t *testing.T, tfn func(t *testing.T, srvs ...*jsServer)) {
t.Helper()

opts := natsserver.DefaultTestOptions
opts.Port = -1
opts.JetStream = true
s := &jsServer{Server: RunServerWithOptions(opts), myopts: &opts}

defer func() {
if config := s.JetStreamConfig(); config != nil {
os.RemoveAll(config.StoreDir)
}
s.Shutdown()
}()
tfn(t, s)
}

func withJSCluster(t *testing.T, clusterName string, size int, tfn func(t *testing.T, srvs ...*jsServer)) {
t.Helper()

Expand Down

0 comments on commit 0422a80

Please sign in to comment.