Skip to content

Commit

Permalink
Merged the paging functionality into the existing StreamInfo(): if yo…
Browse files Browse the repository at this point in the history
…u pass a value in SubjectFilters in the streamInfoOpts it returns the subject details for the subjects that match (for all the matching subjects and doing the pagination if needed)
  • Loading branch information
jnmoyne committed Sep 14, 2022
1 parent 200b836 commit a51d3ea
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 55 deletions.
19 changes: 15 additions & 4 deletions js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,7 +1097,7 @@ func TestJetStreamConvertDirectMsgResponseToMsg(t *testing.T) {
}
}

func TestJetStreamStreamInfoSubjectDetails(t *testing.T) {
func TestJetStreamStreamInfoWithSubjectDetails(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

Expand All @@ -1120,12 +1120,23 @@ func TestJetStreamStreamInfoSubjectDetails(t *testing.T) {
nc.Publish(fmt.Sprintf("test.%d", i), payload)
}

result, err := js.StreamContainedSubjects("TEST", &StreamInfoRequest{SubjectsFilter: "test.*"})
// Check that passing a filter returns the subject details
result, err := js.StreamInfo("TEST", &StreamInfoRequest{SubjectsFilter: ">"})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if len(result) != 100001 {
t.Fatalf("expected 100001 subjects in the stream, but got %d instead", len(result))
if len(result.State.Subjects) != 100001 {
t.Fatalf("expected 100001 subjects in the stream, but got %d instead", len(result.State.Subjects))
}

// Check that passing no filter does not return any subject details
result, err = js.StreamInfo("TEST")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if len(result.State.Subjects) != 0 {
t.Fatalf("expected 0 subjects details from StreamInfo, but got %d instead", len(result.State.Subjects))
}
}
64 changes: 13 additions & 51 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ type JetStreamManager interface {
// StreamInfo retrieves information from a stream.
StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error)

// StreamContainedSubjects queries the stream for the subjects it holds with optional filter
StreamContainedSubjects(stream string, opts ...JSOpt) (map[string]uint64, error)

// PurgeStream purges a stream messages.
PurgeStream(name string, opts ...JSOpt) error

Expand Down Expand Up @@ -709,66 +706,32 @@ func (js *js) StreamInfo(stream string, opts ...JSOpt) (*StreamInfo, error) {
if cancel != nil {
defer cancel()
}
var req []byte
if o.streamInfoOpts != nil {
if req, err = json.Marshal(o.streamInfoOpts); err != nil {
return nil, err
}
}
siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))

r, err := js.apiRequestWithContext(o.ctx, siSubj, req)
if err != nil {
return nil, err
}
var resp streamInfoResponse
if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
if resp.Error != nil {
if errors.Is(resp.Error, ErrStreamNotFound) {
return nil, ErrStreamNotFound
}
return nil, resp.Error
}

return resp.StreamInfo, nil
}

func (js *js) StreamContainedSubjects(stream string, opts ...JSOpt) (map[string]uint64, error) {
if err := checkStreamName(stream); err != nil {
return nil, err
}
o, cancel, err := getJSContextOpts(js.opts, opts...)
if err != nil {
return nil, err
}
if cancel != nil {
defer cancel()
}

var i int
var subjectMessagesMap map[string]uint64 = nil
var req []byte

if o.streamInfoOpts != nil && o.streamInfoOpts.SubjectsFilter == _EMPTY_ {
o.streamInfoOpts.SubjectsFilter = ">"
siOpts := StreamInfoRequest{}
if o.streamInfoOpts != nil {
siOpts.DeletedDetails = o.streamInfoOpts.DeletedDetails
siOpts.SubjectsFilter = o.streamInfoOpts.SubjectsFilter
}

for {
var req []byte

o.streamInfoOpts.Offset = i

if req, err = json.Marshal(o.streamInfoOpts); err != nil {
siOpts.Offset = i
if req, err = json.Marshal(&siOpts); err != nil {
return nil, err
}

r, err := js.apiRequestWithContext(o.ctx, js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream)), req)
siSubj := js.apiSubj(fmt.Sprintf(apiStreamInfoT, stream))

r, err := js.apiRequestWithContext(o.ctx, siSubj, req)
if err != nil {
return nil, err
}

var resp streamInfoResponse

if err := json.Unmarshal(r.Data, &resp); err != nil {
return nil, err
}
Expand Down Expand Up @@ -796,11 +759,10 @@ func (js *js) StreamContainedSubjects(stream string, opts ...JSOpt) (map[string]
i++
}
if i == total {
break
resp.StreamInfo.State.Subjects = subjectMessagesMap
return resp.StreamInfo, nil
}
}

return subjectMessagesMap, nil
}

// StreamInfo shows config and current state for this stream.
Expand Down

0 comments on commit a51d3ea

Please sign in to comment.