Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to use fmt.Appendf in stream.go and consumer.go #5100

Merged
merged 1 commit into from Feb 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions server/consumer.go
Expand Up @@ -3100,7 +3100,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
// Since we can't send that message to the requestor, we need to
// notify that we are closing the request.
const maxBytesT = "NATS/1.0 409 Message Size Exceeds MaxBytes\r\n%s: %d\r\n%s: %d\r\n\r\n"
hdr := []byte(fmt.Sprintf(maxBytesT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
hdr := fmt.Appendf(nil, maxBytesT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
// Remove the current one, no longer valid due to max bytes limit.
o.waiting.removeCurrent()
Expand All @@ -3123,7 +3123,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
}
} else {
// We do check for expiration in `processWaiting`, but it is possible to hit the expiry here, and not there.
hdr := []byte(fmt.Sprintf("NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
o.waiting.removeCurrent()
if o.node != nil {
Expand All @@ -3135,7 +3135,7 @@ func (o *consumer) nextWaiting(sz int) *waitingRequest {
}
if wr.interest != wr.reply {
const intExpT = "NATS/1.0 408 Interest Expired\r\n%s: %d\r\n%s: %d\r\n\r\n"
hdr := []byte(fmt.Sprintf(intExpT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
hdr := fmt.Appendf(nil, intExpT, JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
// Remove the current one, no longer valid.
Expand Down Expand Up @@ -3208,7 +3208,7 @@ func (o *consumer) processNextMsgRequest(reply string, msg []byte) {
}

sendErr := func(status int, description string) {
hdr := []byte(fmt.Sprintf("NATS/1.0 %d %s\r\n\r\n", status, description))
hdr := fmt.Appendf(nil, "NATS/1.0 %d %s\r\n\r\n", status, description)
o.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}

Expand Down Expand Up @@ -3600,7 +3600,7 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
wr := wq.reqs[rp]
// Check expiration.
if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) {
hdr := []byte(fmt.Sprintf("NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b))
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
remove(wr, rp)
i++
Expand Down Expand Up @@ -4033,7 +4033,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {

// If given request fulfilled batch size, but there are still pending bytes, send information about it.
if wrn <= 0 && wrb > 0 {
o.outq.send(newJSPubMsg(dsubj, _EMPTY_, _EMPTY_, []byte(fmt.Sprintf(JsPullRequestRemainingBytesT, JSPullRequestPendingMsgs, wrn, JSPullRequestPendingBytes, wrb)), nil, nil, 0))
o.outq.send(newJSPubMsg(dsubj, _EMPTY_, _EMPTY_, fmt.Appendf(nil, JsPullRequestRemainingBytesT, JSPullRequestPendingMsgs, wrn, JSPullRequestPendingBytes, wrb), nil, nil, 0))
}
// Reset our idle heartbeat timer if set.
if hb != nil {
Expand Down Expand Up @@ -4096,10 +4096,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) {
func (o *consumer) sendIdleHeartbeat(subj string) {
const t = "NATS/1.0 100 Idle Heartbeat\r\n%s: %d\r\n%s: %d\r\n\r\n"
sseq, dseq := o.sseq-1, o.dseq-1
hdr := []byte(fmt.Sprintf(t, JSLastConsumerSeq, dseq, JSLastStreamSeq, sseq))
hdr := fmt.Appendf(nil, t, JSLastConsumerSeq, dseq, JSLastStreamSeq, sseq)
if fcp := o.fcid; fcp != _EMPTY_ {
// Add in that we are stalled on flow control here.
addOn := []byte(fmt.Sprintf("%s: %s\r\n\r\n", JSConsumerStalled, fcp))
addOn := fmt.Appendf(nil, "%s: %s\r\n\r\n", JSConsumerStalled, fcp)
hdr = append(hdr[:len(hdr)-LEN_CR_LF], []byte(addOn)...)
}
o.outq.send(newJSPubMsg(subj, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
Expand Down
10 changes: 5 additions & 5 deletions server/stream.go
Expand Up @@ -4164,7 +4164,7 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {

if isBatchRequest {
if len(hdr) == 0 {
hdr = []byte(fmt.Sprintf(dgb, name, sm.subj, sm.seq, ts.Format(time.RFC3339Nano), np, lseq))
hdr = fmt.Appendf(nil, dgb, name, sm.subj, sm.seq, ts.Format(time.RFC3339Nano), np, lseq)
} else {
hdr = copyBytes(hdr)
hdr = genHeader(hdr, JSStream, name)
Expand All @@ -4178,7 +4178,7 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {
np--
} else {
if len(hdr) == 0 {
hdr = []byte(fmt.Sprintf(dg, name, sm.subj, sm.seq, ts.Format(time.RFC3339Nano)))
hdr = fmt.Appendf(nil, dg, name, sm.subj, sm.seq, ts.Format(time.RFC3339Nano))
} else {
hdr = copyBytes(hdr)
hdr = genHeader(hdr, JSStream, name)
Expand All @@ -4204,7 +4204,7 @@ func (mset *stream) getDirectRequest(req *JSApiMsgGetRequest, reply string) {
if mset.lastSeq() > validThrough {
np, _ = store.NumPending(seq, req.NextFor, false)
}
hdr := []byte(fmt.Sprintf("NATS/1.0 204 EOB\r\nNats-Num-Pending: %d\r\nNats-Last-Sequence: %d\r\n\r\n", np, lseq))
hdr := fmt.Appendf(nil, "NATS/1.0 204 EOB\r\nNats-Num-Pending: %d\r\nNats-Last-Sequence: %d\r\n\r\n", np, lseq)
mset.outq.send(newJSPubMsg(reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
}
}
Expand Down Expand Up @@ -4685,10 +4685,10 @@ func (mset *stream) processJetStreamMsg(subject, reply string, hdr, msg []byte,
const ht = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Time-Stamp: %s\r\nNats-Last-Sequence: %d\r\n\r\n"
const htho = "NATS/1.0\r\nNats-Stream: %s\r\nNats-Subject: %s\r\nNats-Sequence: %d\r\nNats-Time-Stamp: %s\r\nNats-Last-Sequence: %d\r\nNats-Msg-Size: %d\r\n\r\n"
if !thdrsOnly {
hdr = []byte(fmt.Sprintf(ht, name, subject, seq, tsStr, tlseq))
hdr = fmt.Appendf(nil, ht, name, subject, seq, tsStr, tlseq)
rpMsg = copyBytes(msg)
} else {
hdr = []byte(fmt.Sprintf(htho, name, subject, seq, tsStr, tlseq, len(msg)))
hdr = fmt.Appendf(nil, htho, name, subject, seq, tsStr, tlseq, len(msg))
}
} else {
// Slow path.
Expand Down