From 68961ffeddadb155ae93b5fc03f15e53e7df0290 Mon Sep 17 00:00:00 2001 From: Neil Twigg Date: Tue, 21 Feb 2023 14:47:07 +0000 Subject: [PATCH] Refactor `ipQueue` to use generics, reduce allocations --- server/consumer.go | 7 +- server/events.go | 5 +- server/ipqueue.go | 46 +++++------ server/ipqueue_test.go | 50 ++++++------ server/jetstream.go | 2 +- server/jetstream_api.go | 14 ++-- server/jetstream_cluster.go | 22 +++--- server/monitor.go | 5 +- server/mqtt.go | 20 +++-- server/raft.go | 151 +++++++++++++++++------------------- server/sendq.go | 7 +- server/server.go | 4 +- server/stream.go | 44 +++++------ 13 files changed, 180 insertions(+), 197 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 5c07d870d5..d53a30a730 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -307,7 +307,7 @@ type consumer struct { ptail *proposal // Ack queue - ackMsgs *ipQueue + ackMsgs *ipQueue[*jsAckMsg] // For stream signaling. sigSub *subscription @@ -743,7 +743,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri } } // Create ackMsgs queue now that we have a consumer name - o.ackMsgs = s.newIPQueue(fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name)) + o.ackMsgs = newIPQueue[*jsAckMsg](s, fmt.Sprintf("[ACC:%s] consumer '%s' on stream '%s' ackMsgs", accName, o.name, mset.cfg.Name)) // Create our request waiting queue. if o.isPullMode() { @@ -3168,8 +3168,7 @@ func (o *consumer) processInboundAcks(qch chan struct{}) { select { case <-o.ackMsgs.ch: acks := o.ackMsgs.pop() - for _, acki := range acks { - ack := acki.(*jsAckMsg) + for _, ack := range acks { o.processAck(ack.subject, ack.reply, ack.hdr, ack.msg) ack.returnToPool() } diff --git a/server/events.go b/server/events.go index d38a7118c1..755cd0e632 100644 --- a/server/events.go +++ b/server/events.go @@ -92,7 +92,7 @@ type internal struct { sweeper *time.Timer stmr *time.Timer replies map[string]msgHandler - sendq *ipQueue // of *pubMsg + sendq *ipQueue[*pubMsg] resetCh chan struct{} wg sync.WaitGroup sq *sendq @@ -332,8 +332,7 @@ RESET: select { case <-sendq.ch: msgs := sendq.pop() - for _, pmi := range msgs { - pm := pmi.(*pubMsg) + for _, pm := range msgs { if pm.si != nil { pm.si.Name = servername pm.si.Domain = domain diff --git a/server/ipqueue.go b/server/ipqueue.go index 4f288e82bf..fe091e4431 100644 --- a/server/ipqueue.go +++ b/server/ipqueue.go @@ -21,11 +21,11 @@ import ( const ipQueueDefaultMaxRecycleSize = 4 * 1024 // This is a generic intra-process queue. -type ipQueue struct { +type ipQueue[T any] struct { inprogress int64 sync.RWMutex ch chan struct{} - elts []interface{} + elts []T pos int pool *sync.Pool mrs int @@ -47,12 +47,12 @@ func ipQueue_MaxRecycleSize(max int) ipQueueOpt { } } -func (s *Server) newIPQueue(name string, opts ...ipQueueOpt) *ipQueue { +func newIPQueue[T any](s *Server, name string, opts ...ipQueueOpt) *ipQueue[T] { qo := ipQueueOpts{maxRecycleSize: ipQueueDefaultMaxRecycleSize} for _, o := range opts { o(&qo) } - q := &ipQueue{ + q := &ipQueue[T]{ ch: make(chan struct{}, 1), mrs: qo.maxRecycleSize, pool: &sync.Pool{}, @@ -66,7 +66,7 @@ func (s *Server) newIPQueue(name string, opts ...ipQueueOpt) *ipQueue { // Add the element `e` to the queue, notifying the queue channel's `ch` if the // entry is the first to be added, and returns the length of the queue after // this element is added. -func (q *ipQueue) push(e interface{}) int { +func (q *ipQueue[T]) push(e T) int { var signal bool q.Lock() l := len(q.elts) - q.pos @@ -76,10 +76,10 @@ func (q *ipQueue) push(e interface{}) int { if eltsi != nil { // Reason we use pointer to slice instead of slice is explained // here: https://staticcheck.io/docs/checks#SA6002 - q.elts = (*(eltsi.(*[]interface{})))[:0] + q.elts = (*(eltsi.(*[]T)))[:0] } if cap(q.elts) == 0 { - q.elts = make([]interface{}, 0, 32) + q.elts = make([]T, 0, 32) } } q.elts = append(q.elts, e) @@ -103,8 +103,8 @@ func (q *ipQueue) push(e interface{}) int { // something, but by the time it calls `pop()`, the drain() would have // emptied the queue. So the caller should never assume that pop() will // return a slice of 1 or more, it could return `nil`. -func (q *ipQueue) pop() []interface{} { - var elts []interface{} +func (q *ipQueue[T]) pop() []T { + var elts []T q.Lock() if q.pos == 0 { elts = q.elts @@ -117,23 +117,23 @@ func (q *ipQueue) pop() []interface{} { return elts } -func (q *ipQueue) resetAndReturnToPool(elts *[]interface{}) { - for i, l := 0, len(*elts); i < l; i++ { - (*elts)[i] = nil - } +func (q *ipQueue[T]) resetAndReturnToPool(elts *[]T) { + (*elts) = (*elts)[:0] q.pool.Put(elts) } // Returns the first element from the queue, if any. See comment above // regarding calling after being notified that there is something and -// the use of drain(). In short, the caller should always expect that -// pop() or popOne() may return `nil`. -func (q *ipQueue) popOne() interface{} { +// the use of drain(). In short, the caller should always check the +// boolean return value to ensure that the value is genuine and not a +// default empty value. +func (q *ipQueue[T]) popOne() (T, bool) { q.Lock() l := len(q.elts) - q.pos if l < 1 { q.Unlock() - return nil + var empty T + return empty, false } e := q.elts[q.pos] q.pos++ @@ -150,7 +150,7 @@ func (q *ipQueue) popOne() interface{} { q.elts, q.pos = nil, 0 } q.Unlock() - return e + return e, true } // After a pop(), the slice can be recycled for the next push() when @@ -159,7 +159,7 @@ func (q *ipQueue) popOne() interface{} { // of the slice. // Reason we use pointer to slice instead of slice is explained // here: https://staticcheck.io/docs/checks#SA6002 -func (q *ipQueue) recycle(elts *[]interface{}) { +func (q *ipQueue[T]) recycle(elts *[]T) { // If invoked with a nil list, nothing to do. if elts == nil || *elts == nil { return @@ -179,7 +179,7 @@ func (q *ipQueue) recycle(elts *[]interface{}) { } // Returns the current length of the queue. -func (q *ipQueue) len() int { +func (q *ipQueue[T]) len() int { q.RLock() l := len(q.elts) - q.pos q.RUnlock() @@ -190,7 +190,7 @@ func (q *ipQueue) len() int { // Note that this could cause a reader go routine that has been // notified that there is something in the queue (reading from queue's `ch`) // may then get nothing if `drain()` is invoked before the `pop()` or `popOne()`. -func (q *ipQueue) drain() { +func (q *ipQueue[T]) drain() { if q == nil { return } @@ -213,13 +213,13 @@ func (q *ipQueue) drain() { // For that reason, the queue maintains a count of elements returned through // the pop() API. When the caller will call q.recycle(), this count will // be reduced by the size of the slice returned by pop(). -func (q *ipQueue) inProgress() int64 { +func (q *ipQueue[T]) inProgress() int64 { return atomic.LoadInt64(&q.inprogress) } // Remove this queue from the server's map of ipQueues. // All ipQueue operations (such as push/pop/etc..) are still possible. -func (q *ipQueue) unregister() { +func (q *ipQueue[T]) unregister() { if q == nil { return } diff --git a/server/ipqueue_test.go b/server/ipqueue_test.go index acf6f088fc..ba0e9f34b4 100644 --- a/server/ipqueue_test.go +++ b/server/ipqueue_test.go @@ -21,7 +21,7 @@ import ( func TestIPQueueBasic(t *testing.T) { s := &Server{} - q := s.newIPQueue("test") + q := newIPQueue[int](s, "test") // Check that pool has been created if q.pool == nil { t.Fatal("Expected pool to have been created") @@ -42,7 +42,7 @@ func TestIPQueueBasic(t *testing.T) { } // Try to change the max recycle size - q2 := s.newIPQueue("test2", ipQueue_MaxRecycleSize(10)) + q2 := newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize(10)) if q2.mrs != 10 { t.Fatalf("Expected max recycle size to be 10, got %v", q2.mrs) } @@ -82,14 +82,14 @@ func TestIPQueueBasic(t *testing.T) { t.Fatalf("Should have gotten 1 element, got %v", len(elts)) } q2.push(2) - if e := q2.popOne(); e.(int) != 2 { + if e, ok := q2.popOne(); !ok || e != 2 { t.Fatalf("popOne failed: %+v", e) } } func TestIPQueuePush(t *testing.T) { s := &Server{} - q := s.newIPQueue("test") + q := newIPQueue[int](s, "test") q.push(1) if l := q.len(); l != 1 { t.Fatalf("Expected len to be 1, got %v", l) @@ -115,7 +115,7 @@ func TestIPQueuePush(t *testing.T) { func TestIPQueuePop(t *testing.T) { s := &Server{} - q := s.newIPQueue("test") + q := newIPQueue[int](s, "test") q.push(1) <-q.ch elts := q.pop() @@ -154,14 +154,14 @@ func TestIPQueuePop(t *testing.T) { func TestIPQueuePopOne(t *testing.T) { s := &Server{} - q := s.newIPQueue("test") + q := newIPQueue[int](s, "test") q.push(1) <-q.ch - e := q.popOne() - if e == nil { + e, ok := q.popOne() + if !ok { t.Fatal("Got nil") } - if i := e.(int); i != 1 { + if i := e; i != 1 { t.Fatalf("Expected 1, got %v", i) } if l := q.len(); l != 0 { @@ -179,11 +179,11 @@ func TestIPQueuePopOne(t *testing.T) { } q.push(2) q.push(3) - e = q.popOne() - if e == nil { + e, ok = q.popOne() + if !ok { t.Fatal("Got nil") } - if i := e.(int); i != 2 { + if i := e; i != 2 { t.Fatalf("Expected 2, got %v", i) } if l := q.len(); l != 1 { @@ -195,11 +195,11 @@ func TestIPQueuePopOne(t *testing.T) { default: t.Fatalf("Should have been notified that there is more") } - e = q.popOne() - if e == nil { + e, ok = q.popOne() + if !ok { t.Fatal("Got nil") } - if i := e.(int); i != 3 { + if i := e; i != 3 { t.Fatalf("Expected 3, got %v", i) } if l := q.len(); l != 0 { @@ -213,26 +213,26 @@ func TestIPQueuePopOne(t *testing.T) { } // Calling it again now that we know there is nothing, we // should get nil. - if e = q.popOne(); e != nil { + if e, ok = q.popOne(); ok { t.Fatalf("Expected nil, got %v", e) } - q = s.newIPQueue("test2") + q = newIPQueue[int](s, "test2") q.push(1) q.push(2) // Capture current capacity q.RLock() c := cap(q.elts) q.RUnlock() - e = q.popOne() - if e == nil || e.(int) != 1 { + e, ok = q.popOne() + if !ok || e != 1 { t.Fatalf("Invalid value: %v", e) } if l := q.len(); l != 1 { t.Fatalf("Expected len to be 1, got %v", l) } values := q.pop() - if len(values) != 1 || values[0].(int) != 2 { + if len(values) != 1 || values[0] != 2 { t.Fatalf("Unexpected values: %v", values) } if cap(values) != c-1 { @@ -247,7 +247,7 @@ func TestIPQueuePopOne(t *testing.T) { func TestIPQueueMultiProducers(t *testing.T) { s := &Server{} - q := s.newIPQueue("test") + q := newIPQueue[int](s, "test") wg := sync.WaitGroup{} wg.Add(3) @@ -269,7 +269,7 @@ func TestIPQueueMultiProducers(t *testing.T) { case <-q.ch: values := q.pop() for _, v := range values { - m[v.(int)] = struct{}{} + m[v] = struct{}{} } q.recycle(&values) if n := q.inProgress(); n != 0 { @@ -285,7 +285,7 @@ func TestIPQueueMultiProducers(t *testing.T) { func TestIPQueueRecycle(t *testing.T) { s := &Server{} - q := s.newIPQueue("test") + q := newIPQueue[int](s, "test") total := 1000 for iter := 0; iter < 5; iter++ { var sz int @@ -317,7 +317,7 @@ func TestIPQueueRecycle(t *testing.T) { } } - q = s.newIPQueue("test2", ipQueue_MaxRecycleSize(10)) + q = newIPQueue[int](s, "test2", ipQueue_MaxRecycleSize(10)) for i := 0; i < 100; i++ { q.push(i) } @@ -357,7 +357,7 @@ func TestIPQueueRecycle(t *testing.T) { func TestIPQueueDrain(t *testing.T) { s := &Server{} - q := s.newIPQueue("test") + q := newIPQueue[int](s, "test") for iter, recycled := 0, false; iter < 5 && !recycled; iter++ { for i := 0; i < 100; i++ { q.push(i + 1) diff --git a/server/jetstream.go b/server/jetstream.go index 8ca10d47e9..78f4e00c7f 100644 --- a/server/jetstream.go +++ b/server/jetstream.go @@ -142,7 +142,7 @@ type jsAccount struct { store TemplateStore // From server - sendq *ipQueue // of *pubMsg + sendq *ipQueue[*pubMsg] // Usage/limits related fields that will be protected by usageMu usageMu sync.RWMutex diff --git a/server/jetstream_api.go b/server/jetstream_api.go index a44f444760..c14106c3cb 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -789,8 +789,7 @@ func (s *Server) processJSAPIRoutedRequests() { select { case <-queue.ch: reqs := queue.pop() - for _, req := range reqs { - r := req.(*jsAPIRoutedReq) + for _, r := range reqs { client.pa = r.pa start := time.Now() r.jsub.icb(r.sub, client, r.acc, r.subject, r.reply, r.msg) @@ -813,7 +812,7 @@ func (s *Server) setJetStreamExportSubs() error { // Start the go routine that will process API requests received by the // subscription below when they are coming from routes, etc.. - s.jsAPIRoutedReqs = s.newIPQueue("Routed JS API Requests") + s.jsAPIRoutedReqs = newIPQueue[*jsAPIRoutedReq](s, "Routed JS API Requests") s.startGoRoutine(s.processJSAPIRoutedRequests) // This is the catch all now for all JetStream API calls. @@ -3301,7 +3300,7 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC // For signaling to upper layers. resultCh := make(chan result, 1) - activeQ := s.newIPQueue(fmt.Sprintf("[ACC:%s] stream '%s' restore", acc.Name, streamName)) // of int + activeQ := newIPQueue[int](s, fmt.Sprintf("[ACC:%s] stream '%s' restore", acc.Name, streamName)) // of int var total int @@ -3441,9 +3440,10 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC doneCh <- err return case <-activeQ.ch: - n := activeQ.popOne().(int) - total += n - notActive.Reset(activityInterval) + if n, ok := activeQ.popOne(); ok { + total += n + notActive.Reset(activityInterval) + } case <-notActive.C: err := fmt.Errorf("restore for stream '%s > %s' is stalled", acc, streamName) doneCh <- err diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index ba5a751240..c642b87635 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -1032,8 +1032,8 @@ func (js *jetStream) monitorCluster() { return case <-aq.ch: ces := aq.pop() - for _, cei := range ces { - if cei == nil { + for _, ce := range ces { + if ce == nil { // Signals we have replayed all of our metadata. js.clearMetaRecovering() @@ -1056,7 +1056,6 @@ func (js *jetStream) monitorCluster() { s.Debugf("Recovered JetStream cluster metadata") continue } - ce := cei.(*CommittedEntry) // FIXME(dlc) - Deal with errors. if didSnap, didRemoval, err := js.applyMetaEntries(ce.Entries, ru); err == nil { _, nb := n.Applied(ce.Index) @@ -1952,9 +1951,9 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps case <-aq.ch: var ne, nb uint64 ces := aq.pop() - for _, cei := range ces { + for _, ce := range ces { // No special processing needed for when we are caught up on restart. - if cei == nil { + if ce == nil { isRecovering = false // Check on startup if we should snapshot/compact. if _, b := n.Size(); b > compactSizeMin || n.NeedSnapshot() { @@ -1962,7 +1961,6 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps } continue } - ce := cei.(*CommittedEntry) // Apply our entries. if err := js.applyStreamEntries(mset, ce, isRecovering); err == nil { // Update our applied. @@ -4073,16 +4071,15 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) { return case <-aq.ch: ces := aq.pop() - for _, cei := range ces { + for _, ce := range ces { // No special processing needed for when we are caught up on restart. - if cei == nil { + if ce == nil { recovering = false if n.NeedSnapshot() { doSnapshot(true) } continue } - ce := cei.(*CommittedEntry) if err := js.applyConsumerEntries(o, ce, isLeader); err == nil { ne, nb := n.Applied(ce.Index) // If we have at least min entries to compact, go ahead and snapshot/compact. @@ -7234,7 +7231,7 @@ RETRY: s.sendInternalMsgLocked(mrec.reply, _EMPTY_, nil, err.Error()) } - msgsQ := s.newIPQueue(qname) // of *im + msgsQ := newIPQueue[*im](s, qname) defer msgsQ.unregister() // Send our catchup request here. @@ -7266,8 +7263,7 @@ RETRY: mrecs := msgsQ.pop() - for _, mreci := range mrecs { - mrec := mreci.(*im) + for _, mrec := range mrecs { msg := mrec.msg // Check for eof signaling. @@ -7319,7 +7315,7 @@ RETRY: msgsQ.recycle(&mrecs) case <-notActive.C: if mrecs := msgsQ.pop(); len(mrecs) > 0 { - mrec := mrecs[0].(*im) + mrec := mrecs[0] notifyLeaderStopCatchup(mrec, errCatchupStalled) msgsQ.recycle(&mrecs) } diff --git a/server/monitor.go b/server/monitor.go index 3322a3eeee..5cda71c27b 100644 --- a/server/monitor.go +++ b/server/monitor.go @@ -1126,7 +1126,10 @@ func (s *Server) HandleIPQueuesz(w http.ResponseWriter, r *http.Request) { s.ipQueues.Range(func(k, v interface{}) bool { name := k.(string) - queue := v.(*ipQueue) + queue := v.(interface { + len() int + inProgress() uint64 + }) pending := queue.len() inProgress := int(queue.inProgress()) if !all && (pending == 0 && inProgress == 0) { diff --git a/server/mqtt.go b/server/mqtt.go index 054523da11..f5494661c9 100644 --- a/server/mqtt.go +++ b/server/mqtt.go @@ -227,17 +227,17 @@ type mqttAccountSessionManager struct { sl *Sublist // sublist allowing to find retained messages for given subscription retmsgs map[string]*mqttRetainedMsg // retained messages jsa mqttJSA - rrmLastSeq uint64 // Restore retained messages expected last sequence - rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded - sp *ipQueue // of uint64. Used for cluster-wide processing of session records being persisted - domainTk string // Domain (with trailing "."), or possibly empty. This is added to session subject. + rrmLastSeq uint64 // Restore retained messages expected last sequence + rrmDoneCh chan struct{} // To notify the caller that all retained messages have been loaded + sp *ipQueue[uint64] // Used for cluster-wide processing of session records being persisted + domainTk string // Domain (with trailing "."), or possibly empty. This is added to session subject. } type mqttJSA struct { mu sync.Mutex id string c *client - sendq *ipQueue // of *mqttJSPubMsg + sendq *ipQueue[*mqttJSPubMsg] rplyr string replies sync.Map nuid *nuid.NUID @@ -979,11 +979,11 @@ func (s *Server) mqttCreateAccountSessionManager(acc *Account, quitCh chan struc id: id, c: c, rplyr: mqttJSARepliesPrefix + id + ".", - sendq: s.newIPQueue(qname + "send"), // of *mqttJSPubMsg + sendq: newIPQueue[*mqttJSPubMsg](s, qname+"send"), nuid: nuid.New(), quitCh: quitCh, }, - sp: s.newIPQueue(qname + "sp"), // of uint64 + sp: newIPQueue[uint64](s, qname+"sp"), } // TODO record domain name in as here @@ -1648,7 +1648,7 @@ func (as *mqttAccountSessionManager) sessPersistProcessing(closeCh chan struct{} case <-sp.ch: seqs := sp.pop() for _, seq := range seqs { - as.processSessPersistRecord(seq.(uint64)) + as.processSessPersistRecord(seq) } sp.recycle(&seqs) case <-closeCh: @@ -1747,9 +1747,7 @@ func (as *mqttAccountSessionManager) sendJSAPIrequests(s *Server, c *client, acc select { case <-sendq.ch: pmis := sendq.pop() - for _, pmi := range pmis { - r := pmi.(*mqttJSPubMsg) - + for _, r := range pmis { var nsize int msg := r.msg diff --git a/server/raft.go b/server/raft.go index 26280a2415..97b2f00ea2 100644 --- a/server/raft.go +++ b/server/raft.go @@ -63,7 +63,7 @@ type RaftNode interface { AdjustClusterSize(csz int) error AdjustBootClusterSize(csz int) error ClusterSize() int - ApplyQ() *ipQueue // of *CommittedEntry + ApplyQ() *ipQueue[*CommittedEntry] PauseApply() error ResumeApply() LeadChangeC() <-chan bool @@ -145,12 +145,12 @@ type raft struct { active time.Time llqrt time.Time lsut time.Time - term uint64 - pterm uint64 - pindex uint64 - commit uint64 - applied uint64 - leader string + term uint64 // The current vote term + pterm uint64 // Previous term from the last snapshot + pindex uint64 // Previous index from the last snapshot + commit uint64 // Sequence number of the most recent commit + applied uint64 // Sequence number of the most recently applied commit + leader string // The ID of the leader vote string hash string s *Server @@ -185,7 +185,7 @@ type raft struct { catchup *catchupState // For leader or server catching up a follower. - progress map[string]*ipQueue // of uint64 + progress map[string]*ipQueue[uint64] // For when we have paused our applyC. paused bool @@ -193,13 +193,13 @@ type raft struct { pobserver bool // Queues and Channels - prop *ipQueue // of *Entry - entry *ipQueue // of *appendEntry - resp *ipQueue // of *appendEntryResponse - apply *ipQueue // of *CommittedEntry - reqs *ipQueue // of *voteRequest - votes *ipQueue // of *voteResponse - stepdown *ipQueue // of string + prop *ipQueue[*Entry] + entry *ipQueue[*appendEntry] + resp *ipQueue[*appendEntryResponse] + apply *ipQueue[*CommittedEntry] + reqs *ipQueue[*voteRequest] + votes *ipQueue[*voteResponse] + stepdown *ipQueue[string] leadc chan bool quit chan struct{} @@ -394,13 +394,13 @@ func (s *Server) startRaftNode(accName string, cfg *RaftConfig) (RaftNode, error quit: make(chan struct{}), wtvch: make(chan struct{}, 1), wpsch: make(chan struct{}, 1), - reqs: s.newIPQueue(qpfx + "vreq"), // of *voteRequest - votes: s.newIPQueue(qpfx + "vresp"), // of *voteResponse - prop: s.newIPQueue(qpfx + "entry"), // of *Entry - entry: s.newIPQueue(qpfx + "appendEntry"), // of *appendEntry - resp: s.newIPQueue(qpfx + "appendEntryResponse"), // of *appendEntryResponse - apply: s.newIPQueue(qpfx + "committedEntry"), // of *CommittedEntry - stepdown: s.newIPQueue(qpfx + "stepdown"), // of string + reqs: newIPQueue[*voteRequest](s, qpfx+"vreq"), + votes: newIPQueue[*voteResponse](s, qpfx+"vresp"), + prop: newIPQueue[*Entry](s, qpfx+"entry"), + entry: newIPQueue[*appendEntry](s, qpfx+"appendEntry"), + resp: newIPQueue[*appendEntryResponse](s, qpfx+"appendEntryResponse"), + apply: newIPQueue[*CommittedEntry](s, qpfx+"committedEntry"), + stepdown: newIPQueue[string](s, qpfx+"stepdown"), accName: accName, leadc: make(chan bool, 1), observer: cfg.Observer, @@ -1438,9 +1438,9 @@ func (n *raft) UpdateKnownPeers(knownPeers []string) { } } -func (n *raft) ApplyQ() *ipQueue { return n.apply } // queue of *CommittedEntry -func (n *raft) LeadChangeC() <-chan bool { return n.leadc } -func (n *raft) QuitC() <-chan struct{} { return n.quit } +func (n *raft) ApplyQ() *ipQueue[*CommittedEntry] { return n.apply } +func (n *raft) LeadChangeC() <-chan bool { return n.leadc } +func (n *raft) QuitC() <-chan struct{} { return n.quit } func (n *raft) Created() time.Time { n.RLock() @@ -1486,7 +1486,9 @@ func (n *raft) shutdown(shouldDelete bool) { } // Unregistering ipQueues do not prevent them from push/pop // just will remove them from the central monitoring map - queues := []*ipQueue{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply, n.stepdown} + queues := []interface { + unregister() + }{n.reqs, n.votes, n.prop, n.entry, n.resp, n.apply, n.stepdown} for _, q := range queues { q.unregister() } @@ -1718,27 +1720,12 @@ func (n *raft) setObserver(isObserver bool, extSt extensionState) { // Invoked when being notified that there is something in the entryc's queue func (n *raft) processAppendEntries() { aes := n.entry.pop() - for _, aei := range aes { - ae := aei.(*appendEntry) + for _, ae := range aes { n.processAppendEntry(ae, ae.sub) } n.entry.recycle(&aes) } -func convertVoteRequest(i interface{}) *voteRequest { - if i == nil { - return nil - } - return i.(*voteRequest) -} - -func convertVoteResponse(i interface{}) *voteResponse { - if i == nil { - return nil - } - return i.(*voteResponse) -} - func (n *raft) runAsFollower() { for { elect := n.electTimer() @@ -1773,11 +1760,14 @@ func (n *raft) runAsFollower() { n.resp.popOne() case <-n.reqs.ch: // Because of drain() it is possible that we get nil from popOne(). - n.processVoteRequest(convertVoteRequest(n.reqs.popOne())) + if voteReq, ok := n.reqs.popOne(); ok { + n.processVoteRequest(voteReq) + } case <-n.stepdown.ch: - newLeader := n.stepdown.popOne().(string) - n.switchToFollower(newLeader) - return + if newLeader, ok := n.stepdown.popOne(); ok { + n.switchToFollower(newLeader) + return + } } } } @@ -2062,8 +2052,7 @@ func (n *raft) runAsLeader() { return case <-n.resp.ch: ars := n.resp.pop() - for _, ari := range ars { - ar := ari.(*appendEntryResponse) + for _, ar := range ars { n.processAppendEntryResponse(ar) } n.resp.recycle(&ars) @@ -2073,8 +2062,7 @@ func (n *raft) runAsLeader() { es := n.prop.pop() sz := 0 - for i, bi := range es { - b := bi.(*Entry) + for i, b := range es { if b.Type == EntryRemovePeer { n.doRemovePeerAsLeader(string(b.Data)) } @@ -2100,8 +2088,8 @@ func (n *raft) runAsLeader() { } case <-n.votes.ch: // Because of drain() it is possible that we get nil from popOne(). - vresp := convertVoteResponse(n.votes.popOne()) - if vresp == nil { + vresp, ok := n.votes.popOne() + if !ok { continue } if vresp.term > n.Term() { @@ -2111,11 +2099,14 @@ func (n *raft) runAsLeader() { n.trackPeer(vresp.peer) case <-n.reqs.ch: // Because of drain() it is possible that we get nil from popOne(). - n.processVoteRequest(convertVoteRequest(n.reqs.popOne())) + if voteReq, ok := n.reqs.popOne(); ok { + n.processVoteRequest(voteReq) + } case <-n.stepdown.ch: - newLeader := n.stepdown.popOne().(string) - n.switchToFollower(newLeader) - return + if newLeader, ok := n.stepdown.popOne(); ok { + n.switchToFollower(newLeader) + return + } case <-n.entry.ch: n.processAppendEntries() } @@ -2185,7 +2176,7 @@ func (n *raft) loadFirstEntry() (ae *appendEntry, err error) { return n.loadEntry(state.FirstSeq) } -func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue /* of uint64 */) { +func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64]) { n.RLock() s, reply := n.s, n.areply peer, subj, last := ar.peer, ar.reply, n.pindex @@ -2259,19 +2250,20 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue /* of n.debug("Catching up for %q stalled", peer) return case <-indexUpdatesQ.ch: - index := indexUpdatesQ.popOne().(uint64) - // Update our activity timer. - timeout.Reset(activityInterval) - // Update outstanding total. - total -= om[index] - delete(om, index) - if next == 0 { - next = index - } - // Check if we are done. - if index > last || sendNext() { - n.debug("Finished catching up") - return + if index, ok := indexUpdatesQ.popOne(); ok { + // Update our activity timer. + timeout.Reset(activityInterval) + // Update outstanding total. + total -= om[index] + delete(om, index) + if next == 0 { + next = index + } + // Check if we are done. + if index > last || sendNext() { + n.debug("Finished catching up") + return + } } } } @@ -2309,7 +2301,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.debug("Being asked to catch up follower: %q", ar.peer) n.Lock() if n.progress == nil { - n.progress = make(map[string]*ipQueue) + n.progress = make(map[string]*ipQueue[uint64]) } else if q, ok := n.progress[ar.peer]; ok { n.debug("Will cancel existing entry for catching up %q", ar.peer) delete(n.progress, ar.peer) @@ -2353,7 +2345,7 @@ func (n *raft) catchupFollower(ar *appendEntryResponse) { n.debug("Our first entry [%d:%d] does not match request from follower [%d:%d]", ae.pterm, ae.pindex, ar.term, ar.index) } // Create a queue for delivering updates from responses. - indexUpdates := n.s.newIPQueue(fmt.Sprintf("[ACC:%s] RAFT '%s' indexUpdates", n.accName, n.group)) // of uint64 + indexUpdates := newIPQueue[uint64](n.s, fmt.Sprintf("[ACC:%s] RAFT '%s' indexUpdates", n.accName, n.group)) indexUpdates.push(ae.pindex) n.progress[ar.peer] = indexUpdates n.Unlock() @@ -2614,8 +2606,8 @@ func (n *raft) runAsCandidate() { return case <-n.votes.ch: // Because of drain() it is possible that we get nil from popOne(). - vresp := convertVoteResponse(n.votes.popOne()) - if vresp == nil { + vresp, ok := n.votes.popOne() + if !ok { continue } n.RLock() @@ -2646,11 +2638,14 @@ func (n *raft) runAsCandidate() { } case <-n.reqs.ch: // Because of drain() it is possible that we get nil from popOne(). - n.processVoteRequest(convertVoteRequest(n.reqs.popOne())) + if voteReq, ok := n.reqs.popOne(); ok { + n.processVoteRequest(voteReq) + } case <-n.stepdown.ch: - newLeader := n.stepdown.popOne().(string) - n.switchToFollower(newLeader) - return + if newLeader, ok := n.stepdown.popOne(); ok { + n.switchToFollower(newLeader) + return + } } } } diff --git a/server/sendq.go b/server/sendq.go index 49fcfb19af..2c4139710d 100644 --- a/server/sendq.go +++ b/server/sendq.go @@ -27,12 +27,12 @@ type outMsg struct { type sendq struct { mu sync.Mutex - q *ipQueue // of *outMsg + q *ipQueue[*outMsg] s *Server } func (s *Server) newSendQ() *sendq { - sq := &sendq{s: s, q: s.newIPQueue("SendQ")} + sq := &sendq{s: s, q: newIPQueue[*outMsg](s, "SendQ")} s.startGoRoutine(sq.internalLoop) return sq } @@ -56,8 +56,7 @@ func (sq *sendq) internalLoop() { return case <-q.ch: pms := q.pop() - for _, pmi := range pms { - pm := pmi.(*outMsg) + for _, pm := range pms { c.pa.subject = []byte(pm.subj) c.pa.size = len(pm.msg) + len(pm.hdr) c.pa.szb = []byte(strconv.Itoa(c.pa.size)) diff --git a/server/server.go b/server/server.go index 2ac9697398..782372a37f 100644 --- a/server/server.go +++ b/server/server.go @@ -290,7 +290,7 @@ type Server struct { syncOutSem chan struct{} // Queue to process JS API requests that come from routes (or gateways) - jsAPIRoutedReqs *ipQueue + jsAPIRoutedReqs *ipQueue[*jsAPIRoutedReq] } // For tracking JS nodes. @@ -1259,7 +1259,7 @@ func (s *Server) setSystemAccount(acc *Account) error { sid: 1, servers: make(map[string]*serverUpdate), replies: make(map[string]msgHandler), - sendq: s.newIPQueue("System sendQ"), // of *pubMsg + sendq: newIPQueue[*pubMsg](s, "System sendQ"), resetCh: make(chan struct{}), sq: s.newSendQ(), statsz: eventsHBInterval, diff --git a/server/stream.go b/server/stream.go index fffb39eb0e..0b4e9d8ebd 100644 --- a/server/stream.go +++ b/server/stream.go @@ -192,9 +192,9 @@ type stream struct { sid int pubAck []byte outq *jsOutQ - msgs *ipQueue // of *inMsg + msgs *ipQueue[*inMsg] store StreamStore - ackq *ipQueue // of uint64 + ackq *ipQueue[uint64] lseq uint64 lmsgId string consumers map[string]*consumer @@ -227,7 +227,7 @@ type stream struct { clsMu sync.RWMutex cList []*consumer sch chan struct{} - sigq *ipQueue // of *cMsg + sigq *ipQueue[*cMsg] csl *Sublist // TODO(dlc) - Hide everything below behind two pointers. @@ -260,7 +260,7 @@ type sourceInfo struct { sub *subscription dsub *subscription lbsub *subscription - msgs *ipQueue // of *inMsg + msgs *ipQueue[*inMsg] sseq uint64 dseq uint64 start time.Time @@ -439,19 +439,19 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt tier: tier, stype: cfg.Storage, consumers: make(map[string]*consumer), - msgs: s.newIPQueue(qpfx + "messages"), // of *inMsg + msgs: newIPQueue[*inMsg](s, qpfx+"messages"), qch: make(chan struct{}), uch: make(chan struct{}, 4), sch: make(chan struct{}, 1), } // Start our signaling routine to process consumers. - mset.sigq = s.newIPQueue(qpfx + "obs") // of *cMsg + mset.sigq = newIPQueue[*cMsg](s, qpfx+"obs") // of *cMsg go mset.signalConsumersLoop() // For no-ack consumers when we are interest retention. if cfg.Retention != LimitsPolicy { - mset.ackq = s.newIPQueue(qpfx + "acks") // of uint64 + mset.ackq = newIPQueue[uint64](s, qpfx+"acks") } // Check for RePublish. @@ -1906,8 +1906,7 @@ func (mset *stream) processMirrorMsgs(mirror *sourceInfo, ready *sync.WaitGroup) return case <-msgs.ch: ims := msgs.pop() - for _, imi := range ims { - im := imi.(*inMsg) + for _, im := range ims { if !mset.processInboundMirrorMsg(im) { break } @@ -2288,7 +2287,7 @@ func (mset *stream) setupMirrorConsumer() error { // delivering messages as soon as the consumer request is received. qname := fmt.Sprintf("[ACC:%s] stream mirror '%s' of '%s' msgs", mset.acc.Name, mset.cfg.Name, mset.cfg.Mirror.Name) // Create a new queue each time - mirror.msgs = mset.srv.newIPQueue(qname) /* of *inMsg */ + mirror.msgs = newIPQueue[*inMsg](mset.srv, qname) msgs := mirror.msgs sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { hdr, msg := c.msgParts(copyBytes(rmsg)) // Need to copy. @@ -2592,7 +2591,7 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T // delivering messages as soon as the consumer request is received. qname := fmt.Sprintf("[ACC:%s] stream source '%s' from '%s' msgs", mset.acc.Name, mset.cfg.Name, si.name) // Create a new queue each time - si.msgs = mset.srv.newIPQueue(qname) // of *inMsg + si.msgs = newIPQueue[*inMsg](mset.srv, qname) msgs := si.msgs sub, err := mset.subscribeInternal(deliverSubject, func(sub *subscription, c *client, _ *Account, subject, reply string, rmsg []byte) { hdr, msg := c.msgParts(copyBytes(rmsg)) // Need to copy. @@ -2699,8 +2698,7 @@ func (mset *stream) processSourceMsgs(si *sourceInfo, ready *sync.WaitGroup) { return case <-msgs.ch: ims := msgs.pop() - for _, imi := range ims { - im := imi.(*inMsg) + for _, im := range ims { if !mset.processInboundSourceMsg(si, im) { break } @@ -3489,7 +3487,7 @@ type inMsg struct { msg []byte } -func (mset *stream) queueInbound(ib *ipQueue, subj, rply string, hdr, msg []byte) { +func (mset *stream) queueInbound(ib *ipQueue[*inMsg], subj, rply string, hdr, msg []byte) { ib.push(&inMsg{subj, rply, hdr, msg}) } @@ -4156,8 +4154,7 @@ func (mset *stream) signalConsumersLoop() { return case <-sch: cms := msgs.pop() - for _, cm := range cms { - m := cm.(*cMsg) + for _, m := range cms { seq, subj := m.seq, m.subj m.returnToPool() // Signal all appropriate consumers. @@ -4248,7 +4245,7 @@ func (pm *jsPubMsg) size() int { // Queue of *jsPubMsg for sending internal system messages. type jsOutQ struct { - *ipQueue + *ipQueue[*jsPubMsg] } func (q *jsOutQ) sendMsg(subj string, msg []byte) { @@ -4289,7 +4286,7 @@ func (mset *stream) setupSendCapabilities() { return } qname := fmt.Sprintf("[ACC:%s] stream '%s' sendQ", mset.acc.Name, mset.cfg.Name) - mset.outq = &jsOutQ{mset.srv.newIPQueue(qname)} // of *jsPubMsg + mset.outq = &jsOutQ{newIPQueue[*jsPubMsg](mset.srv, qname)} go mset.internalLoop() } @@ -4325,7 +4322,7 @@ func (mset *stream) internalLoop() { // For the ack msgs queue for interest retention. var ( amch chan struct{} - ackq *ipQueue // of uint64 + ackq *ipQueue[uint64] ) if mset.ackq != nil { ackq, amch = mset.ackq, mset.ackq.ch @@ -4340,8 +4337,7 @@ func (mset *stream) internalLoop() { select { case <-outq.ch: pms := outq.pop() - for _, pmi := range pms { - pm := pmi.(*jsPubMsg) + for _, pm := range pms { c.pa.subject = []byte(pm.dsubj) c.pa.deliver = []byte(pm.subj) c.pa.size = len(pm.msg) + len(pm.hdr) @@ -4393,9 +4389,7 @@ func (mset *stream) internalLoop() { // This can possibly change now so needs to be checked here. isClustered := mset.IsClustered() ims := msgs.pop() - for _, imi := range ims { - im := imi.(*inMsg) - + for _, im := range ims { // If we are clustered we need to propose this message to the underlying raft group. if isClustered { mset.processClusteredInboundMsg(im.subj, im.rply, im.hdr, im.msg) @@ -4407,7 +4401,7 @@ func (mset *stream) internalLoop() { case <-amch: seqs := ackq.pop() for _, seq := range seqs { - mset.ackMsg(nil, seq.(uint64)) + mset.ackMsg(nil, seq) } ackq.recycle(&seqs) case <-qch: