Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive: Optimized receive; fixed long term overallocation #3334

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/armon/go-metrics v0.3.3
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.1.1
github.com/chromedp/cdproto v0.0.0-20200424080200-0de008e41fa0
github.com/chromedp/chromedp v0.5.3
github.com/cortexproject/cortex v1.3.1-0.20200923145333-8587ea61fe17
Expand Down
24 changes: 17 additions & 7 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
package receive

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"io/ioutil"
"io"
stdlog "log"
"net"
"net/http"
Expand Down Expand Up @@ -282,13 +283,17 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
span, ctx := tracing.StartSpan(r.Context(), "receive_http")
defer span.Finish()

compressed, err := ioutil.ReadAll(r.Body)
compressed := &bytes.Buffer{}
if r.ContentLength >= 0 {
compressed.Grow(int(r.ContentLength))
}
_, err := io.Copy(compressed, r.Body)
Copy link
Contributor

@yeya24 yeya24 Oct 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious how much does it optimize? It is not very clear compared to a simple one line ioutil.Readall

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will comment (:

if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

reqBuf, err := snappy.Decode(nil, compressed)
reqBuf, err := snappy.Decode(nil, compressed.Bytes())
if err != nil {
level.Error(h.logger).Log("msg", "snappy decode error", "err", err)
http.Error(w, err.Error(), http.StatusBadRequest)
Expand All @@ -314,6 +319,10 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) {
if len(tenant) == 0 {
tenant = h.options.DefaultTenantID
}
if len(tenant) == 0 {
http.Error(w, "no tenant ID supplied", http.StatusBadRequest)
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between this if and line 319?


err = h.handleRequest(ctx, rep, tenant, &wreq)
switch err {
Expand Down Expand Up @@ -401,9 +410,10 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
}
}()

logger := log.With(h.logger, "tenant", tenant)
// Avoid log.With extra allocations for rare log lines.
logTags := []interface{}{"tenant", tenant}
if id, ok := middleware.RequestIDFromContext(pctx); ok {
logger = log.With(logger, "request-id", id)
logTags = append(logTags, "request-id", id)
}

ec := make(chan error)
Expand Down Expand Up @@ -524,7 +534,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
b.attempt++
dur := h.expBackoff.ForAttempt(b.attempt)
b.nextAllowed = time.Now().Add(dur)
level.Debug(h.logger).Log("msg", "target unavailable backing off", "for", dur)
level.Debug(h.logger).Log(append(logTags, "msg", "msg", "target unavailable backing off", "for", dur)...)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
level.Debug(h.logger).Log(append(logTags, "msg", "msg", "target unavailable backing off", "for", dur)...)
level.Debug(h.logger).Log(append(logTags, "msg", "target unavailable backing off", "for", dur)...)

} else {
h.peerStates[endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))}
}
Expand Down Expand Up @@ -553,7 +563,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, replicas ma
go func() {
for err := range ec {
if err != nil {
level.Debug(logger).Log("msg", "request failed, but not needed to achieve quorum", "err", err)
level.Debug(h.logger).Log(append(logTags, "msg", "request failed, but not needed to achieve quorum", "err", err)...)
}
}
}()
Expand Down