Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runutil: add Exhaust* fns, initial users #1302

Merged
merged 11 commits into from Jul 22, 2019
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -17,6 +17,8 @@ We use *breaking* word for marking changes that are not backward compatible (rel

- [#1327](https://github.com/thanos-io/thanos/pull/1327) `/series` API end-point now properly returns an empty array just like Prometheus if there are no results

- [#1302](https://github.com/thanos-io/thanos/pull/1302) Thanos now efficiently reuses HTTP keep-alive connections

## [v0.6.0](https://github.com/thanos-io/thanos/releases/tag/v0.6.0) - 2019.07.18

### Added
Expand Down
2 changes: 1 addition & 1 deletion pkg/alert/alert.go
Expand Up @@ -374,7 +374,7 @@ func (s *Sender) sendOne(ctx context.Context, url string, b []byte) error {
if err != nil {
return errors.Wrapf(err, "send request to %q", url)
}
defer runutil.CloseWithLogOnErr(s.logger, resp.Body, "send one alert")
defer runutil.ExhaustCloseWithLogOnErr(s.logger, resp.Body, "send one alert")

if resp.StatusCode/100 != 2 {
return errors.Errorf("bad response status %v from %q", resp.Status, url)
Expand Down
2 changes: 1 addition & 1 deletion pkg/objstore/cos/cos.go
Expand Up @@ -145,7 +145,7 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
return nil, err
}
if _, err := resp.Body.Read(nil); err != nil {
runutil.CloseWithLogOnErr(b.logger, resp.Body, "cos get range obj close")
runutil.ExhaustCloseWithLogOnErr(b.logger, resp.Body, "cos get range obj close")
return nil, err
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/promclient/promclient.go
Expand Up @@ -68,7 +68,7 @@ func ExternalLabels(ctx context.Context, logger log.Logger, base *url.URL) (labe
if err != nil {
return nil, errors.Wrapf(err, "request flags against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -185,7 +185,7 @@ func ConfiguredFlags(ctx context.Context, logger log.Logger, base *url.URL) (Fla
if err != nil {
return Flags{}, errors.Wrapf(err, "request config against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -234,7 +234,7 @@ func Snapshot(ctx context.Context, logger log.Logger, base *url.URL, skipHead bo
if err != nil {
return "", errors.Wrapf(err, "request snapshot against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body")

b, err := ioutil.ReadAll(resp.Body)
if err != nil {
Expand Down Expand Up @@ -317,7 +317,7 @@ func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query s
if err != nil {
return nil, nil, errors.Wrapf(err, "perform GET request against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "query body")

// Decode only ResultType and load Result only as RawJson since we don't know
// structure of the Result yet.
Expand Down Expand Up @@ -452,7 +452,7 @@ func MetricValues(ctx context.Context, logger log.Logger, base *url.URL, perMetr
if err != nil {
return errors.Wrapf(err, "perform GET request against %s", u.String())
}
defer runutil.CloseWithLogOnErr(logger, resp.Body, "metrics body")
defer runutil.ExhaustCloseWithLogOnErr(logger, resp.Body, "metrics body")

if resp.StatusCode != http.StatusOK {
return errors.Errorf("server returned HTTP status %s", resp.Status)
Expand Down
2 changes: 1 addition & 1 deletion pkg/reloader/reloader.go
Expand Up @@ -329,7 +329,7 @@ func (r *Reloader) triggerReload(ctx context.Context) error {
if err != nil {
return errors.Wrap(err, "reload request failed")
}
defer runutil.CloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body")
defer runutil.ExhaustCloseWithLogOnErr(r.logger, resp.Body, "trigger reload resp body")

if resp.StatusCode != 200 {
return errors.Errorf("received non-200 response: %s; have you set `--web.enable-lifecycle` Prometheus flag?", resp.Status)
Expand Down
31 changes: 31 additions & 0 deletions pkg/runutil/runutil.go
Expand Up @@ -38,11 +38,18 @@
// // ...
//
// If Close() returns error, err will capture it and return by argument.
//
// The rununtil.Exhaust* family of functions provide the same functionality but
// they take an io.ReadCloser and they exhaust the whole reader before closing
// them. They are useful when trying to use http keep-alive connections because
// for the same connection to be re-used the whole response body needs to be
// exhausted.
package runutil

import (
"fmt"
"io"
"io/ioutil"
"os"
"time"

Expand Down Expand Up @@ -108,6 +115,16 @@ func CloseWithLogOnErr(logger log.Logger, closer io.Closer, format string, a ...
level.Warn(logger).Log("msg", "detected close error", "err", errors.Wrap(err, fmt.Sprintf(format, a...)))
}

// ExhaustCloseWithLogOnErr closes the io.ReadCloser with a log message on error but exhausts the reader before.
func ExhaustCloseWithLogOnErr(logger log.Logger, r io.ReadCloser, format string, a ...interface{}) {
_, err := io.Copy(ioutil.Discard, r)
if err != nil {
level.Warn(logger).Log("msg", "failed to exhaust reader, performance may be impeded", "err", err)
}

CloseWithLogOnErr(logger, r, format, a...)
}

// CloseWithErrCapture runs function and on error return error by argument including the given error (usually
// from caller function).
func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...interface{}) {
Expand All @@ -118,3 +135,17 @@ func CloseWithErrCapture(err *error, closer io.Closer, format string, a ...inter

*err = merr.Err()
}

// ExhaustCloseWithErrCapture closes the io.ReadCloser with error capture but exhausts the reader before.
func ExhaustCloseWithErrCapture(err *error, r io.ReadCloser, format string, a ...interface{}) {
_, copyErr := io.Copy(ioutil.Discard, r)

CloseWithErrCapture(err, r, format, a...)

// Prepend the io.Copy error.
merr := tsdberrors.MultiError{}
merr.Add(copyErr)
merr.Add(*err)

*err = merr.Err()
}
6 changes: 3 additions & 3 deletions pkg/store/prometheus.go
Expand Up @@ -258,7 +258,7 @@ func (p *PrometheusStore) promSeries(ctx context.Context, q prompb.Query) (*prom
return nil, errors.Wrap(err, "send request")
}
spanReqDo.Finish()
defer runutil.CloseWithLogOnErr(p.logger, presp.Body, "prom series request body")
defer runutil.ExhaustCloseWithLogOnErr(p.logger, presp.Body, "prom series request body")

if presp.StatusCode/100 != 2 {
return nil, errors.Errorf("request failed with code %s", presp.Status)
Expand Down Expand Up @@ -388,7 +388,7 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, _ *storepb.LabelNamesR
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label names request body")
defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "label names request body")

if resp.StatusCode/100 != 2 {
return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status))
Expand Down Expand Up @@ -448,7 +448,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer runutil.CloseWithLogOnErr(p.logger, resp.Body, "label values request body")
defer runutil.ExhaustCloseWithLogOnErr(p.logger, resp.Body, "label values request body")

if resp.StatusCode/100 != 2 {
return nil, status.Error(codes.Internal, fmt.Sprintf("request Prometheus server failed, code %s", resp.Status))
Expand Down