diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ef910c64f..5fc8c89b27 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added +- [#7359](https://github.com/thanos-io/thanos/pull/7359) Sidecar: Add HTTP flush endpoint for uploading what's in the head block - [#7317](https://github.com/thanos-io/thanos/pull/7317) Tracing: allow specifying resource attributes for the OTLP configuration. ### Changed diff --git a/cmd/thanos/sidecar.go b/cmd/thanos/sidecar.go index 95ad4ba693..c8d7b0af71 100644 --- a/cmd/thanos/sidecar.go +++ b/cmd/thanos/sidecar.go @@ -12,6 +12,10 @@ import ( "sync" "time" + "github.com/prometheus/common/route" + sidecarAPI "github.com/thanos-io/thanos/pkg/api/sidecar" + extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" + extflag "github.com/efficientgo/tools/extkingpin" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -401,6 +405,23 @@ func runSidecar( }, func(error) { cancel() }) + + ins := extpromhttp.NewInstrumentationMiddleware(reg, nil) + opts := []logging.Option{logging.WithDecider(func(_ string, _ error) logging.Decision { + return logging.NoLogCall + })} + logMiddleware := logging.NewHTTPServerMiddleware(logger, opts...) + uploadCompactedFunc := func() bool { return conf.shipper.uploadCompacted } + + flushShipper := shipper.New(logger, nil, "", bkt, m.Labels, metadata.SidecarSource, + uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName) + + router := route.New() + api := sidecarAPI.NewSidecarAPI(logger, reg, m.client, flushShipper, conf.tsdb.path, m.promURL) + + api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware) + + srv.Handle("/", router) } level.Info(logger).Log("msg", "starting sidecar") diff --git a/pkg/api/sidecar/v1.go b/pkg/api/sidecar/v1.go new file mode 100644 index 0000000000..32dcb324c6 --- /dev/null +++ b/pkg/api/sidecar/v1.go @@ -0,0 +1,84 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package v1 + +import ( + "fmt" + "net/http" + "net/url" + "os" + + "github.com/go-kit/log" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/route" + "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/shipper" + + "github.com/thanos-io/thanos/pkg/api" + extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" + "github.com/thanos-io/thanos/pkg/logging" +) + +// SidecarAPI is a very simple API used by Thanos Sidecar. +type SidecarAPI struct { + baseAPI *api.BaseAPI + client *promclient.Client + shipper *shipper.Shipper + promURL *url.URL + dataDir string + logger log.Logger + reg prometheus.Registerer +} + +// NewSidecarAPI creates an Thanos Sidecar API. +func NewSidecarAPI( + logger log.Logger, + reg prometheus.Registerer, + client *promclient.Client, + shipper *shipper.Shipper, + dataDir string, + promURL *url.URL, +) *SidecarAPI { + return &SidecarAPI{ + baseAPI: api.NewBaseAPI(logger, true, nil), + logger: logger, + client: client, + reg: reg, + shipper: shipper, + dataDir: dataDir, + promURL: promURL, + } +} + +func (s *SidecarAPI) Register(r *route.Router, tracer opentracing.Tracer, logger log.Logger, ins extpromhttp.InstrumentationMiddleware, logMiddleware *logging.HTTPServerMiddleware) { + s.baseAPI.Register(r, tracer, logger, ins, logMiddleware) + + instr := api.GetInstr(tracer, logger, ins, logMiddleware, true) + r.Post("/flush", instr("flush", s.flush)) +} + +type flushResponse struct { + BlocksUploaded int `json:"blocksUploaded"` +} + +func (s *SidecarAPI) flush(r *http.Request) (interface{}, []error, *api.ApiError, func()) { + dir, err := s.client.Snapshot(r.Context(), s.promURL, false) + + if err != nil { + return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: fmt.Errorf("failed to snapshot: %w", err)}, func() {} + } + + snapshotDir := s.dataDir + "/" + dir + + s.shipper.SetDirectoryToSync(snapshotDir) + uploaded, err := s.shipper.Sync(r.Context()) + if err := os.RemoveAll(snapshotDir); err != nil { + s.logger.Log("failed to remove snapshot directory", err.Error()) + } + if err != nil { + return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: fmt.Errorf("failed to upload head block: %w", err)}, func() {} + } + return &flushResponse{BlocksUploaded: uploaded}, nil, nil, func() {} +} diff --git a/pkg/api/sidecar/v1_test.go b/pkg/api/sidecar/v1_test.go new file mode 100644 index 0000000000..b515e6dd28 --- /dev/null +++ b/pkg/api/sidecar/v1_test.go @@ -0,0 +1,97 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package v1 + +import ( + "context" + "fmt" + "net/http" + "net/url" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/clientconfig" + "github.com/thanos-io/thanos/pkg/promclient" + "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/shipper" + + "github.com/go-kit/log" + "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/objstore" + + "github.com/efficientgo/core/testutil" + baseAPI "github.com/thanos-io/thanos/pkg/api" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/testutil/custom" + "github.com/thanos-io/thanos/pkg/testutil/e2eutil" +) + +func TestMain(m *testing.M) { + custom.TolerantVerifyLeakMain(m) +} + +func TestFlushEndpoint(t *testing.T) { + ctx := context.Background() + + logger := log.NewNopLogger() + + prom, err := e2eutil.NewPrometheus() + testutil.Ok(t, err) + + prom.SetConfig(fmt.Sprintf(` +global: + external_labels: + region: eu-west +scrape_configs: +- job_name: 'myself' + # Quick scrapes for test purposes. + scrape_interval: 1s + scrape_timeout: 1s + static_configs: + - targets: ['%s'] +`, prom.Addr())) + testutil.Ok(t, prom.Start(ctx, logger)) + defer func() { testutil.Ok(t, prom.Stop()) }() + + promURL, err := url.Parse("http://" + prom.Addr()) + testutil.Ok(t, err) + httpClient, err := clientconfig.NewHTTPClient(clientconfig.NewDefaultHTTPClientConfig(), "thanos-sidecar") + testutil.Ok(t, err) + promClient := promclient.NewClient(httpClient, logger, "thanos-sidecar") + + // Waits for targets to be present in prometheus, so we know there will be active chunks in the head block + // to snapshot + testutil.Ok(t, runutil.Retry(1*time.Second, ctx.Done(), func() error { + targets, err := promClient.TargetsInGRPC(ctx, promURL, "") + testutil.Ok(t, err) + if len(targets.ActiveTargets) > 0 { + return nil + } + return errors.New("empty targets response from Prometheus") + })) + + bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) + externalLabels := labels.FromStrings("external", "label") + s := shipper.New(logger, nil, prom.Dir(), bkt, func() labels.Labels { return externalLabels }, metadata.TestSource, nil, false, metadata.NoneFunc, shipper.DefaultMetaFilename) + now := time.Now() + + api := &SidecarAPI{ + baseAPI: &baseAPI.BaseAPI{ + Now: func() time.Time { return now }, + }, + logger: logger, + promURL: promURL, + dataDir: prom.Dir(), + client: promClient, + shipper: s, + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, "", nil) + testutil.Ok(t, err) + + res, _, err, _ := api.flush(req) + r := res.(*flushResponse) + testutil.Assert(t, r.BlocksUploaded > 0) + testutil.Equals(t, (*baseAPI.ApiError)(nil), err) +} diff --git a/pkg/shipper/shipper.go b/pkg/shipper/shipper.go index 6b7be6cd12..cde80e54c4 100644 --- a/pkg/shipper/shipper.go +++ b/pkg/shipper/shipper.go @@ -351,6 +351,13 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) { return uploaded, nil } +func (s *Shipper) SetDirectoryToSync(dir string) { + s.mtx.Lock() + defer s.mtx.Unlock() + + s.dir = dir +} + func (s *Shipper) UploadedBlocks() map[ulid.ULID]struct{} { meta, err := ReadMetaFile(s.metadataFilePath) if err != nil {