Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sidecar: Add /api/v1/flush endpoint #7359

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#7359](https://github.com/thanos-io/thanos/pull/7359) Sidecar: Add HTTP flush endpoint for uploading what's in the head block
- [#7317](https://github.com/thanos-io/thanos/pull/7317) Tracing: allow specifying resource attributes for the OTLP configuration.

### Changed
Expand Down
21 changes: 21 additions & 0 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"sync"
"time"

"github.com/prometheus/common/route"
sidecarAPI "github.com/thanos-io/thanos/pkg/api/sidecar"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"

extflag "github.com/efficientgo/tools/extkingpin"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand Down Expand Up @@ -401,6 +405,23 @@ func runSidecar(
}, func(error) {
cancel()
})

ins := extpromhttp.NewInstrumentationMiddleware(reg, nil)
opts := []logging.Option{logging.WithDecider(func(_ string, _ error) logging.Decision {
return logging.NoLogCall
})}
logMiddleware := logging.NewHTTPServerMiddleware(logger, opts...)
uploadCompactedFunc := func() bool { return conf.shipper.uploadCompacted }

flushShipper := shipper.New(logger, nil, "", bkt, m.Labels, metadata.SidecarSource,
uploadCompactedFunc, conf.shipper.allowOutOfOrderUpload, metadata.HashFunc(conf.shipper.hashFunc), conf.shipper.metaFileName)

router := route.New()
api := sidecarAPI.NewSidecarAPI(logger, reg, m.client, flushShipper, conf.tsdb.path, m.promURL)

api.Register(router.WithPrefix("/api/v1"), tracer, logger, ins, logMiddleware)

srv.Handle("/", router)
}

level.Info(logger).Log("msg", "starting sidecar")
Expand Down
84 changes: 84 additions & 0 deletions pkg/api/sidecar/v1.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package v1

import (
"fmt"
"net/http"
"net/url"
"os"

"github.com/go-kit/log"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/shipper"

"github.com/thanos-io/thanos/pkg/api"
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/logging"
)

// SidecarAPI is a very simple API used by Thanos Sidecar.
type SidecarAPI struct {
baseAPI *api.BaseAPI
client *promclient.Client
shipper *shipper.Shipper
promURL *url.URL
dataDir string
logger log.Logger
reg prometheus.Registerer
}

// NewSidecarAPI creates an Thanos Sidecar API.
func NewSidecarAPI(
logger log.Logger,
reg prometheus.Registerer,
client *promclient.Client,
shipper *shipper.Shipper,
dataDir string,
promURL *url.URL,
) *SidecarAPI {
return &SidecarAPI{
baseAPI: api.NewBaseAPI(logger, true, nil),
logger: logger,
client: client,
reg: reg,
shipper: shipper,
dataDir: dataDir,
promURL: promURL,
}
}

func (s *SidecarAPI) Register(r *route.Router, tracer opentracing.Tracer, logger log.Logger, ins extpromhttp.InstrumentationMiddleware, logMiddleware *logging.HTTPServerMiddleware) {
s.baseAPI.Register(r, tracer, logger, ins, logMiddleware)

instr := api.GetInstr(tracer, logger, ins, logMiddleware, true)
r.Post("/flush", instr("flush", s.flush))
}

type flushResponse struct {
BlocksUploaded int `json:"blocksUploaded"`
}

func (s *SidecarAPI) flush(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't /api/v1/snapshot just create hardlinks to existing blocks? https://github.com/prometheus/prometheus/blob/main/tsdb/block.go#L687-L696. You will have to somehow filter out the non-head block here. I'm not sure how to do that.

Alternatively, Prometheus could have a new parameter to take a snapshot of the head only.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shipper.Sync only uploads blocks that don't already exist in the bucket
https://github.com/thanos-io/thanos/blob/main/pkg/shipper/shipper.go#L284-L289

dir, err := s.client.Snapshot(r.Context(), s.promURL, false)

if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: fmt.Errorf("failed to snapshot: %w", err)}, func() {}
}

snapshotDir := s.dataDir + "/" + dir

s.shipper.SetDirectoryToSync(snapshotDir)
uploaded, err := s.shipper.Sync(r.Context())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prometheus might produce a block in the middle of the snapshot call. I think you will have to disable the other syncer before doing anything.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the concern that it'd produce an overlapping block? Or that there's a potential race between two shippers Syncing at the same time?

if err := os.RemoveAll(snapshotDir); err != nil {
s.logger.Log("failed to remove snapshot directory", err.Error())
}
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: fmt.Errorf("failed to upload head block: %w", err)}, func() {}
}
return &flushResponse{BlocksUploaded: uploaded}, nil, nil, func() {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we immediately shutdown Prometheus after this is done? Otherwise, the same risk of overlapping blocks appears.

Copy link
Author

@Nashluffy Nashluffy May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point - if the endpoint is only meant to be called once before shutting down, does it even need to be an endpoint? Should it just be the default behavior of the sidecar shutting down? That (I think) should eliminate the concern of both overlapping blocks and two shippers racing against one another.

Although the endpoint alleviates concerns around ordering shutdown, prometheus-operator could hit the endpoint and then just delete the statefulset.

Copy link
Member

@GiedriusS GiedriusS May 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would love it if all of this would be handled automatically by Sidecar when it is shutting down, but that would entail deeper integration between Sidecar & Prometheus. In Sidecar, we should only care about the head block and avoid overlaps. If only there were a way to tell Prometheus to trim data from the head block that had been uploaded by Sidecar already. :(

Maybe an alternative would be to shut down Prometheus and then read the HEAD block ourselves from Sidecar? We could produce a block from it, upload it, and then trim the head. What do you think about such idea?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I quite like that idea - I played around with it locally and it does work!
https://gist.github.com/Nashluffy/097e7df7d0a90b0cdefd2b87fb3129c8

But there's a few issues with it, mostly that opening a ReadOnlyDB reads in the WAL before it can be persisted to disk, which depending on the size can be very time consuming & memory intense. If we did want to take this approach, I think there'd have to be a finalizer on the prometheus-owned statefulsets, and a controller that would remove the finalizer. The controller would create a pod that mounts the volume, opens a ReadOnlyDB, and then persists to disk & uploads. But this is quite a lot of orchestration.

The more I think about this, the more I think persist-head-to-block needs to be an endpoint on prometheus/tsdb that we consume. There's quite an extensive thread here about it prometheus-junkyard/tsdb#346

}
97 changes: 97 additions & 0 deletions pkg/api/sidecar/v1_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package v1

import (
"context"
"fmt"
"net/http"
"net/url"
"testing"
"time"

"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/clientconfig"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/shipper"

"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/objstore"

"github.com/efficientgo/core/testutil"
baseAPI "github.com/thanos-io/thanos/pkg/api"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/testutil/custom"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
)

func TestMain(m *testing.M) {
custom.TolerantVerifyLeakMain(m)
}

func TestFlushEndpoint(t *testing.T) {
ctx := context.Background()

logger := log.NewNopLogger()

prom, err := e2eutil.NewPrometheus()
testutil.Ok(t, err)

prom.SetConfig(fmt.Sprintf(`
global:
external_labels:
region: eu-west
scrape_configs:
- job_name: 'myself'
# Quick scrapes for test purposes.
scrape_interval: 1s
scrape_timeout: 1s
static_configs:
- targets: ['%s']
`, prom.Addr()))
testutil.Ok(t, prom.Start(ctx, logger))
defer func() { testutil.Ok(t, prom.Stop()) }()

promURL, err := url.Parse("http://" + prom.Addr())
testutil.Ok(t, err)
httpClient, err := clientconfig.NewHTTPClient(clientconfig.NewDefaultHTTPClientConfig(), "thanos-sidecar")
testutil.Ok(t, err)
promClient := promclient.NewClient(httpClient, logger, "thanos-sidecar")

// Waits for targets to be present in prometheus, so we know there will be active chunks in the head block
// to snapshot
testutil.Ok(t, runutil.Retry(1*time.Second, ctx.Done(), func() error {
targets, err := promClient.TargetsInGRPC(ctx, promURL, "")
testutil.Ok(t, err)
if len(targets.ActiveTargets) > 0 {
return nil
}
return errors.New("empty targets response from Prometheus")
}))

bkt := objstore.WithNoopInstr(objstore.NewInMemBucket())
externalLabels := labels.FromStrings("external", "label")
s := shipper.New(logger, nil, prom.Dir(), bkt, func() labels.Labels { return externalLabels }, metadata.TestSource, nil, false, metadata.NoneFunc, shipper.DefaultMetaFilename)
now := time.Now()

api := &SidecarAPI{
baseAPI: &baseAPI.BaseAPI{
Now: func() time.Time { return now },
},
logger: logger,
promURL: promURL,
dataDir: prom.Dir(),
client: promClient,
shipper: s,
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "", nil)
testutil.Ok(t, err)

res, _, err, _ := api.flush(req)
r := res.(*flushResponse)
testutil.Assert(t, r.BlocksUploaded > 0)
testutil.Equals(t, (*baseAPI.ApiError)(nil), err)
}
7 changes: 7 additions & 0 deletions pkg/shipper/shipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,13 @@ func (s *Shipper) Sync(ctx context.Context) (uploaded int, err error) {
return uploaded, nil
}

func (s *Shipper) SetDirectoryToSync(dir string) {
s.mtx.Lock()
defer s.mtx.Unlock()

s.dir = dir
}

func (s *Shipper) UploadedBlocks() map[ulid.ULID]struct{} {
meta, err := ReadMetaFile(s.metadataFilePath)
if err != nil {
Expand Down