Skip to content

Commit

Permalink
Merge pull request #2833 from nats-io/si_subjects
Browse files Browse the repository at this point in the history
Added in ability to get per subject details via StreamInfo.
  • Loading branch information
derekcollison committed Feb 2, 2022
2 parents 2ed7a81 + 5da0453 commit d3f78de
Show file tree
Hide file tree
Showing 8 changed files with 419 additions and 25 deletions.
12 changes: 11 additions & 1 deletion server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1148,5 +1148,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSStreamInfoMaxSubjectsErr",
"code": 500,
"error_code": 10117,
"description": "subject details would exceed maximum allowed",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
]
58 changes: 50 additions & 8 deletions server/filestore.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019-2021 The NATS Authors
// Copyright 2019-2022 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -89,6 +89,7 @@ type fileStore struct {
aek cipher.AEAD
lmb *msgBlock
blks []*msgBlock
psmc map[string]uint64
hh hash.Hash64
qch chan struct{}
cfs []*consumerFileStore
Expand Down Expand Up @@ -280,6 +281,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
fs := &fileStore{
fcfg: fcfg,
cfg: FileStreamInfo{Created: created, StreamConfig: cfg},
psmc: make(map[string]uint64),
prf: prf,
qch: make(chan struct{}),
}
Expand Down Expand Up @@ -879,6 +881,13 @@ func (fs *fileStore) recoverMsgs() error {
}
fs.state.Msgs += mb.msgs
fs.state.Bytes += mb.bytes
// Walk the fss for this mb and fill in fs.psmc
for subj, ss := range mb.fss {
if len(subj) > 0 {
fs.psmc[subj] += ss.Msgs
}
}

} else {
return err
}
Expand Down Expand Up @@ -988,6 +997,7 @@ func (fs *fileStore) expireMsgsOnRecover() {
mb.msgs--
purged++
// Update fss
fs.removePerSubject(sm.subj)
mb.removeSeqPerSubject(sm.subj, seq)
}

Expand Down Expand Up @@ -1528,6 +1538,11 @@ func (fs *fileStore) storeRawMsg(subj string, hdr, msg []byte, seq uint64, ts in
return err
}

// Adjust top level tracking of per subject msg counts.
if len(subj) > 0 {
fs.psmc[subj]++
}

// Adjust first if needed.
now := time.Unix(0, ts).UTC()
if fs.state.Msgs == 0 {
Expand Down Expand Up @@ -1724,6 +1739,19 @@ func (fs *fileStore) EraseMsg(seq uint64) (bool, error) {
return fs.removeMsg(seq, true, true)
}

// Convenience function to remove per subject tracking at the filestore level.
// Lock should be held.
func (fs *fileStore) removePerSubject(subj string) {
if len(subj) == 0 {
return
}
if n, ok := fs.psmc[subj]; ok && n == 1 {
delete(fs.psmc, subj)
} else if ok {
fs.psmc[subj]--
}
}

// Remove a message, optionally rewriting the mb file.
func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error) {
fsLock := func() {
Expand Down Expand Up @@ -1811,6 +1839,7 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
mb.bytes -= msz

// If we are tracking multiple subjects here make sure we update that accounting.
fs.removePerSubject(sm.subj)
mb.removeSeqPerSubject(sm.subj, seq)

var shouldWriteIndex, firstSeqNeedsUpdate bool
Expand Down Expand Up @@ -3274,19 +3303,18 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
mb.llseq = seq
}

// We use the high bit to denote we have already checked the checksum.
var hh hash.Hash64
if !hashChecked {
hh = mb.hh // This will force the hash check in msgFromBuf.
mb.cache.idx[seq-mb.cache.fseq] = (bi | hbit)
}

li := int(bi) - mb.cache.off
if li >= len(mb.cache.buf) {
return nil, errPartialCache
}
buf := mb.cache.buf[li:]

// We use the high bit to denote we have already checked the checksum.
var hh hash.Hash64
if !hashChecked {
hh = mb.hh // This will force the hash check in msgFromBuf.
}

// Parse from the raw buffer.
subj, hdr, msg, mseq, ts, err := msgFromBuf(buf, hh)
if err != nil {
Expand All @@ -3297,6 +3325,11 @@ func (mb *msgBlock) cacheLookup(seq uint64) (*fileStoredMsg, error) {
return nil, fmt.Errorf("sequence numbers for cache load did not match, %d vs %d", seq, mseq)
}

// Clear the check bit here after we know all is good.
if !hashChecked {
mb.cache.idx[seq-mb.cache.fseq] = (bi | hbit)
}

return &fileStoredMsg{subj, hdr, msg, seq, ts, mb, int64(bi)}, nil
}

Expand Down Expand Up @@ -3451,6 +3484,7 @@ func (fs *fileStore) FastState(state *StreamState) {
}
}
state.Consumers = len(fs.cfs)
state.NumSubjects = len(fs.psmc)
fs.mu.RUnlock()
}

Expand All @@ -3459,7 +3493,9 @@ func (fs *fileStore) State() StreamState {
fs.mu.RLock()
state := fs.state
state.Consumers = len(fs.cfs)
state.NumSubjects = len(fs.psmc)
state.Deleted = nil // make sure.

for _, mb := range fs.blks {
mb.mu.Lock()
fseq := mb.first.seq
Expand Down Expand Up @@ -3819,6 +3855,7 @@ func (fs *fileStore) PurgeEx(subject string, sequence, keep uint64) (purged uint
mb.msgs--
mb.bytes -= rl
// FSS updates.
fs.removePerSubject(sm.subj)
mb.removeSeqPerSubject(sm.subj, seq)
// Check for first message.
if seq == mb.first.seq {
Expand Down Expand Up @@ -3924,6 +3961,9 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {

fs.lmb.writeIndexInfo()

// Clear any per subject tracking.
fs.psmc = make(map[string]uint64)

cb := fs.scb
fs.mu.Unlock()

Expand Down Expand Up @@ -3988,6 +4028,7 @@ func (fs *fileStore) Compact(seq uint64) (uint64, error) {
smb.msgs--
purged++
// Update fss
fs.removePerSubject(sm.subj)
smb.removeSeqPerSubject(sm.subj, mseq)
}
}
Expand Down Expand Up @@ -4243,6 +4284,7 @@ func (mb *msgBlock) generatePerSubjectInfo() error {
if mb.fss == nil {
mb.fss = make(map[string]*SimpleState)
}

fseq, lseq := mb.first.seq, mb.last.seq
for seq := fseq; seq <= lseq; seq++ {
if sm, _ := mb.cacheLookup(seq); sm != nil && len(sm.subj) > 0 {
Expand Down
26 changes: 24 additions & 2 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,12 @@ type JSApiStreamDeleteResponse struct {

const JSApiStreamDeleteResponseType = "io.nats.jetstream.api.v1.stream_delete_response"

// Maximum number of subject details we will send in the stream info.
const JSMaxSubjectDetails = 100_000

type JSApiStreamInfoRequest struct {
DeletedDetails bool `json:"deleted_details,omitempty"`
DeletedDetails bool `json:"deleted_details,omitempty"`
SubjectsFilter string `json:"subjects_filter,omitempty"`
}

type JSApiStreamInfoResponse struct {
Expand Down Expand Up @@ -1697,14 +1701,15 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
}

var details bool
var subjects string
if !isEmptyRequest(msg) {
var req JSApiStreamInfoRequest
if err := json.Unmarshal(msg, &req); err != nil {
resp.Error = NewJSInvalidJSONError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
details = req.DeletedDetails
details, subjects = req.DeletedDetails, req.SubjectsFilter
}

mset, err := acc.lookupStream(streamName)
Expand All @@ -1730,6 +1735,23 @@ func (s *Server) jsStreamInfoRequest(sub *subscription, c *client, a *Account, s
resp.StreamInfo.Sources = mset.sourcesInfo()
}

// Check if they have asked for subject details.
if subjects != _EMPTY_ {
if mss := mset.store.SubjectsState(subjects); len(mss) > 0 {
if len(mss) > JSMaxSubjectDetails {
resp.StreamInfo = nil
resp.Error = NewJSStreamInfoMaxSubjectsError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
sd := make(map[string]uint64, len(mss))
for subj, ss := range mss {
sd[subj] = ss.Msgs
}
resp.StreamInfo.State.Subjects = sd
}

}
// Check for out of band catchups.
if mset.hasCatchupPeers() {
mset.checkClusterInfo(resp.StreamInfo)
Expand Down
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,9 @@ const (
// JSStreamHeaderExceedsMaximumErr header size exceeds maximum allowed of 64k
JSStreamHeaderExceedsMaximumErr ErrorIdentifier = 10097

// JSStreamInfoMaxSubjectsErr subject details would exceed maximum allowed
JSStreamInfoMaxSubjectsErr ErrorIdentifier = 10117

// JSStreamInvalidConfigF Stream configuration validation error string ({err})
JSStreamInvalidConfigF ErrorIdentifier = 10052

Expand Down Expand Up @@ -438,6 +441,7 @@ var (
JSStreamExternalDelPrefixOverlapsErrF: {Code: 400, ErrCode: 10022, Description: "stream external delivery prefix {prefix} overlaps with stream subject {subject}"},
JSStreamGeneralErrorF: {Code: 500, ErrCode: 10051, Description: "{err}"},
JSStreamHeaderExceedsMaximumErr: {Code: 400, ErrCode: 10097, Description: "header size exceeds maximum allowed of 64k"},
JSStreamInfoMaxSubjectsErr: {Code: 500, ErrCode: 10117, Description: "subject details would exceed maximum allowed"},
JSStreamInvalidConfigF: {Code: 500, ErrCode: 10052, Description: "{err}"},
JSStreamInvalidErr: {Code: 500, ErrCode: 10096, Description: "stream not valid"},
JSStreamInvalidExternalDeliverySubjErrF: {Code: 400, ErrCode: 10024, Description: "stream external delivery prefix {prefix} must not contain wildcards"},
Expand Down Expand Up @@ -1445,6 +1449,16 @@ func NewJSStreamHeaderExceedsMaximumError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSStreamHeaderExceedsMaximumErr]
}

// NewJSStreamInfoMaxSubjectsError creates a new JSStreamInfoMaxSubjectsErr error: "subject details would exceed maximum allowed"
func NewJSStreamInfoMaxSubjectsError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSStreamInfoMaxSubjectsErr]
}

// NewJSStreamInvalidConfigError creates a new JSStreamInvalidConfigF error: "{err}"
func NewJSStreamInvalidConfigError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down

0 comments on commit d3f78de

Please sign in to comment.