diff --git a/server/stream.go b/server/stream.go index 068a2904e27..275c5b3e449 100644 --- a/server/stream.go +++ b/server/stream.go @@ -213,37 +213,42 @@ type ExternalStream struct { // Stream is a jetstream stream of messages. When we receive a message internally destined // for a Stream we will direct link from the client to this structure. type stream struct { - mu sync.RWMutex - js *jetStream - jsa *jsAccount - acc *Account - srv *Server - client *client - sysc *client - sid atomic.Uint64 - pubAck []byte - outq *jsOutQ - msgs *ipQueue[*inMsg] - gets *ipQueue[*directGetReq] - store StreamStore - ackq *ipQueue[uint64] - lseq uint64 - lmsgId string - consumers map[string]*consumer - numFilter int // number of filtered consumers - cfg StreamConfig - created time.Time - stype StorageType - tier string - ddmap map[string]*ddentry - ddarr []*ddentry - ddindex int - ddtmr *time.Timer - qch chan struct{} - mqch chan struct{} - active bool - ddloaded bool - closed atomic.Bool + mu sync.RWMutex // Read/write lock for the stream. + js *jetStream // The internal *jetStream for the account. + jsa *jsAccount // The JetStream account-level information. + acc *Account // The account this stream is defined in. + srv *Server // The server we are running in. + client *client // The internal JetStream client. + sysc *client // The internal JetStream system client. + + // The current last subscription ID for the subscriptions through `client`. + // Those subscriptions are for the subjects filters being listened to and captured by the stream. + sid atomic.Uint64 + + pubAck []byte // The template (prefix) to generate the pubAck responses for this stream quickly. + outq *jsOutQ // Queue of *jsPubMsg for sending messages. + msgs *ipQueue[*inMsg] // Intra-process queue for the ingress of messages. + gets *ipQueue[*directGetReq] // Intra-process queue for the direct get requests. + store StreamStore // The storage for this stream. + ackq *ipQueue[uint64] // Intra-process queue for acks. + lseq uint64 // The sequence number of the last message stored in the stream. + lmsgId string // The de-duplication message ID of the last message stored in the stream. + consumers map[string]*consumer // The consumers for this stream. + numFilter int // The number of filtered consumers. + cfg StreamConfig // The stream's config. + created time.Time // Time the stream was created. + stype StorageType // The storage type. + tier string // The tier is the number of replicas for the stream (e.g. "R1" or "R3"). + ddmap map[string]*ddentry // The dedupe map. + ddarr []*ddentry // The dedupe array. + ddindex int // The dedupe index. + ddtmr *time.Timer // The dedupe timer. + qch chan struct{} // The quit channel. + mqch chan struct{} // The monitor's quit channel. + active bool // Indicates that there are active internal subscriptions (for the subject filters) + // and/or mirror/sources consumers are scheduled to be established or already started. + ddloaded bool // set to true when the deduplication structures are been built. + closed atomic.Bool // Set to true when stop() is called on the stream. // Mirror mirror *sourceInfo @@ -256,7 +261,7 @@ type stream struct { // Indicates we have direct consumers. directs int - // For input subject transform + // For input subject transform. itr *subjectTransform // For republishing. @@ -264,10 +269,10 @@ type stream struct { // For processing consumers without main stream lock. clsMu sync.RWMutex - cList []*consumer - sch chan struct{} - sigq *ipQueue[*cMsg] - csl *Sublist // Consumer Sublist + cList []*consumer // Consumer list. + sch chan struct{} // Channel to signal consumers. + sigq *ipQueue[*cMsg] // Intra-process queue for the messages to signal to the consumers. + csl *Sublist // Consumer subscription list. // Leader will store seq/msgTrace in clustering mode. Used in applyStreamEntries // to know if trace event should be sent after processing. @@ -279,51 +284,58 @@ type stream struct { // TODO(dlc) - Hide everything below behind two pointers. // Clustered mode. - sa *streamAssignment - node RaftNode - catchup atomic.Bool - syncSub *subscription - infoSub *subscription - clMu sync.Mutex - clseq uint64 - clfs uint64 - inflight map[uint64]uint64 - leader string - lqsent time.Time - catchups map[string]uint64 - uch chan struct{} - compressOK bool - inMonitor bool + sa *streamAssignment // What the meta controller uses to assign streams to peers. + node RaftNode // Our RAFT node for the stream's group. + catchup atomic.Bool // Used to signal we are in catchup mode. + catchups map[string]uint64 // The number of messages that need to be caught per peer. + syncSub *subscription // Internal subscription for sync messages (on "$JSC.SYNC"). + infoSub *subscription // Internal subscription for stream info requests. + clMu sync.Mutex // The mutex for clseq and clfs. + clseq uint64 // The commit log current last sequence number. + clfs uint64 // The count of the commit log failed sequence numbers. + inflight map[uint64]uint64 // Inflight message sizes per clseq. + leader string // The current leader for the RAFT group. + lqsent time.Time // The time at which the last lost quorum advisory was sent. Used to rate limit. + uch chan struct{} // The channel to signal updates to the monitor routine. + compressOK bool // True if we can do message compression in RAFT and catchup logic + inMonitor bool // True if the monitor routine has been started. // Direct get subscription. directSub *subscription lastBySub *subscription - monitorWg sync.WaitGroup + monitorWg sync.WaitGroup // Wait group for the monitor routine. } type sourceInfo struct { - name string - iname string - cname string - sub *subscription - dsub *subscription + name string // The name of the stream being sourced. + iname string // The unique index name of this particular source. + cname string // The name of the current consumer for this source. + sub *subscription // The subscription to the consumer. + + // (mirrors only) The subscription to the direct get request subject for + // the source stream's name on the `_sys_` queue group. + dsub *subscription + + // (mirrors only) The subscription to the direct get last per subject request subject for + // the source stream's name on the `_sys_` queue group. lbsub *subscription - msgs *ipQueue[*inMsg] - sseq uint64 - dseq uint64 - start time.Time - lag uint64 - err *ApiError - fails int - last time.Time - lreq time.Time - qch chan struct{} - sip bool // setup in progress - wg sync.WaitGroup - sf string // subject filter - sfs []string // subject filters - trs []*subjectTransform // subject transforms + + msgs *ipQueue[*inMsg] // Intra-process queue for incoming messages. + sseq uint64 // Last stream message sequence number seen from the source. + dseq uint64 // Last delivery (i.e. consumer's) sequence number. + start time.Time // The time of the last message recorded in the stream doing the sourcing. + lag uint64 // 0 or number of messages pending (as last reported by the consumer) - 1. + err *ApiError // The API error that caused the last consumer setup to fail. + fails int // The number of times trying to setup the consumer failed. + last time.Time // Time the consumer was created or of last message it received. + lreq time.Time // The last time setupMirrorConsumer/setupSourceConsumer was called. + qch chan struct{} // Quit channel. + sip bool // Setup in progress. + wg sync.WaitGroup // WaitGroup for the consumer's go routine. + sf string // The subject filter. + sfs []string // The subject filters. + trs []*subjectTransform // The subject transforms. } // For mirrors and direct get @@ -371,9 +383,9 @@ const ( // Dedupe entry type ddentry struct { - id string - seq uint64 - ts int64 + id string // The unique message ID provided by the client. + seq uint64 // The sequence number of the message. + ts int64 // The timestamp of the message. } // Replicas Range @@ -698,7 +710,8 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt } // Composes the index name. Contains the stream name, subject filter, and transform destination -// when the stream is external we will use additional information in case the (external) stream names are the same. +// when the stream is external we will use the api prefix as part of the index name +// (as the same stream name could be used in multiple JS domains) func (ssi *StreamSource) composeIName() string { var iName = ssi.Name @@ -1834,7 +1847,7 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) } mset.setStartingSequenceForSources(needsStartingSeqNum) for iName := range neededCopy { - mset.setSourceConsumer(iName, mset.sources[iName].sseq+1, time.Time{}) + mset.setupSourceConsumer(iName, mset.sources[iName].sseq+1, time.Time{}) } } } @@ -2750,7 +2763,7 @@ func (mset *stream) retrySourceConsumer(iName string) { } } -// Same than setSourceConsumer but simply issue a debug statement indicating +// Same than setupSourceConsumer but simply issue a debug statement indicating // that there is a retry. // // Lock should be held. @@ -2759,8 +2772,8 @@ func (mset *stream) retrySourceConsumerAtSeq(iname string, seq uint64) { s.Debugf("Retrying source consumer for '%s > %s'", mset.acc.Name, mset.cfg.Name) - // setSourceConsumer will check that the source is still configured. - mset.setSourceConsumer(iname, seq, time.Time{}) + // setupSourceConsumer will check that the source is still configured. + mset.setupSourceConsumer(iname, seq, time.Time{}) } // Lock should be held. @@ -2800,13 +2813,13 @@ func (mset *stream) cancelSourceInfo(si *sourceInfo) { const sourceConsumerRetryThreshold = 2 * time.Second -// This will schedule a call to setSourceConsumer, taking into account the last -// time it was retried and determine the soonest setSourceConsumer can be called +// This will schedule a call to setupSourceConsumer, taking into account the last +// time it was retried and determine the soonest setupSourceConsumer can be called // without tripping the sourceConsumerRetryThreshold. // // Lock held on entry func (mset *stream) scheduleSetSourceConsumerRetry(si *sourceInfo, seq uint64, startTime time.Time) { - // We are trying to figure out how soon we can retry. setSourceConsumer will reject + // We are trying to figure out how soon we can retry. setupSourceConsumer will reject // a retry if last was done less than "sourceConsumerRetryThreshold" ago. next := sourceConsumerRetryThreshold - time.Since(si.lreq) if next < 0 { @@ -2822,7 +2835,7 @@ func (mset *stream) scheduleSetSourceConsumerRetry(si *sourceInfo, seq uint64, s mset.scheduleSetSourceConsumer(si.iname, seq, next, startTime) } -// Simply schedules setSourceConsumer at the given delay. +// Simply schedules setupSourceConsumer at the given delay. // // Lock held on entry func (mset *stream) scheduleSetSourceConsumer(iname string, seq uint64, delay time.Duration, startTime time.Time) { @@ -2841,12 +2854,12 @@ func (mset *stream) scheduleSetSourceConsumer(iname string, seq uint64, delay ti defer mset.mu.Unlock() delete(mset.sourceRetries, iname) - mset.setSourceConsumer(iname, seq, startTime) + mset.setupSourceConsumer(iname, seq, startTime) }) } // Lock should be held. -func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.Time) { +func (mset *stream) setupSourceConsumer(iname string, seq uint64, startTime time.Time) { // Ignore if closed. if mset.closed.Load() { return @@ -3130,8 +3143,8 @@ func (mset *stream) processSourceMsgs(si *sourceInfo, ready *sync.WaitGroup) { if stalled { mset.mu.Lock() // We don't need to schedule here, we are going to simply - // call setSourceConsumer with the current state+1. - mset.setSourceConsumer(iname, si.sseq+1, time.Time{}) + // call setupSourceConsumer with the current state+1. + mset.setupSourceConsumer(iname, si.sseq+1, time.Time{}) mset.mu.Unlock() } } @@ -3287,7 +3300,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { } // Retry in all type of errors. // This will make sure the source is still in mset.sources map, - // find the last sequence and then call setSourceConsumer. + // find the last sequence and then call setupSourceConsumer. mset.retrySourceConsumer(iname) } return false @@ -3552,7 +3565,7 @@ func (mset *stream) setupSourceConsumers() error { // Setup our consumers at the proper starting position. for _, ssi := range mset.cfg.Sources { if si := mset.sources[ssi.iname]; si != nil { - mset.setSourceConsumer(ssi.iname, si.sseq+1, time.Time{}) + mset.setupSourceConsumer(ssi.iname, si.sseq+1, time.Time{}) } }