-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sidecar: Add /api/v1/flush
endpoint
#7359
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package v1 | ||
|
||
import ( | ||
"fmt" | ||
"net/http" | ||
"net/url" | ||
"os" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/opentracing/opentracing-go" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/common/route" | ||
"github.com/thanos-io/thanos/pkg/promclient" | ||
"github.com/thanos-io/thanos/pkg/shipper" | ||
|
||
"github.com/thanos-io/thanos/pkg/api" | ||
extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" | ||
"github.com/thanos-io/thanos/pkg/logging" | ||
) | ||
|
||
// SidecarAPI is a very simple API used by Thanos Sidecar. | ||
type SidecarAPI struct { | ||
baseAPI *api.BaseAPI | ||
client *promclient.Client | ||
shipper *shipper.Shipper | ||
promURL *url.URL | ||
dataDir string | ||
logger log.Logger | ||
reg prometheus.Registerer | ||
} | ||
|
||
// NewSidecarAPI creates an Thanos Sidecar API. | ||
func NewSidecarAPI( | ||
logger log.Logger, | ||
reg prometheus.Registerer, | ||
client *promclient.Client, | ||
shipper *shipper.Shipper, | ||
dataDir string, | ||
promURL *url.URL, | ||
) *SidecarAPI { | ||
return &SidecarAPI{ | ||
baseAPI: api.NewBaseAPI(logger, true, nil), | ||
logger: logger, | ||
client: client, | ||
reg: reg, | ||
shipper: shipper, | ||
dataDir: dataDir, | ||
promURL: promURL, | ||
} | ||
} | ||
|
||
func (s *SidecarAPI) Register(r *route.Router, tracer opentracing.Tracer, logger log.Logger, ins extpromhttp.InstrumentationMiddleware, logMiddleware *logging.HTTPServerMiddleware) { | ||
s.baseAPI.Register(r, tracer, logger, ins, logMiddleware) | ||
|
||
instr := api.GetInstr(tracer, logger, ins, logMiddleware, true) | ||
r.Post("/flush", instr("flush", s.flush)) | ||
} | ||
|
||
type flushResponse struct { | ||
BlocksUploaded int `json:"blocksUploaded"` | ||
} | ||
|
||
func (s *SidecarAPI) flush(r *http.Request) (interface{}, []error, *api.ApiError, func()) { | ||
dir, err := s.client.Snapshot(r.Context(), s.promURL, false) | ||
|
||
if err != nil { | ||
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: fmt.Errorf("failed to snapshot: %w", err)}, func() {} | ||
} | ||
|
||
snapshotDir := s.dataDir + "/" + dir | ||
|
||
s.shipper.SetDirectoryToSync(snapshotDir) | ||
uploaded, err := s.shipper.Sync(r.Context()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prometheus might produce a block in the middle of the snapshot call. I think you will have to disable the other syncer before doing anything. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the concern that it'd produce an overlapping block? Or that there's a potential race between two shippers |
||
if err := os.RemoveAll(snapshotDir); err != nil { | ||
s.logger.Log("failed to remove snapshot directory", err.Error()) | ||
} | ||
if err != nil { | ||
return nil, nil, &api.ApiError{Typ: api.ErrorInternal, Err: fmt.Errorf("failed to upload head block: %w", err)}, func() {} | ||
} | ||
return &flushResponse{BlocksUploaded: uploaded}, nil, nil, func() {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we immediately shutdown Prometheus after this is done? Otherwise, the same risk of overlapping blocks appears. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good point - if the endpoint is only meant to be called once before shutting down, does it even need to be an endpoint? Should it just be the default behavior of the sidecar shutting down? That (I think) should eliminate the concern of both overlapping blocks and two shippers racing against one another. Although the endpoint alleviates concerns around ordering shutdown, prometheus-operator could hit the endpoint and then just delete the statefulset. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would love it if all of this would be handled automatically by Sidecar when it is shutting down, but that would entail deeper integration between Sidecar & Prometheus. In Sidecar, we should only care about the head block and avoid overlaps. If only there were a way to tell Prometheus to trim data from the head block that had been uploaded by Sidecar already. :( Maybe an alternative would be to shut down Prometheus and then read the HEAD block ourselves from Sidecar? We could produce a block from it, upload it, and then trim the head. What do you think about such idea? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I quite like that idea - I played around with it locally and it does work! But there's a few issues with it, mostly that opening a ReadOnlyDB reads in the WAL before it can be persisted to disk, which depending on the size can be very time consuming & memory intense. If we did want to take this approach, I think there'd have to be a finalizer on the prometheus-owned statefulsets, and a controller that would remove the finalizer. The controller would create a pod that mounts the volume, opens a ReadOnlyDB, and then persists to disk & uploads. But this is quite a lot of orchestration. The more I think about this, the more I think persist-head-to-block needs to be an endpoint on prometheus/tsdb that we consume. There's quite an extensive thread here about it prometheus-junkyard/tsdb#346 |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package v1 | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net/http" | ||
"net/url" | ||
"testing" | ||
"time" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/thanos-io/thanos/pkg/clientconfig" | ||
"github.com/thanos-io/thanos/pkg/promclient" | ||
"github.com/thanos-io/thanos/pkg/runutil" | ||
"github.com/thanos-io/thanos/pkg/shipper" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/prometheus/prometheus/model/labels" | ||
"github.com/thanos-io/objstore" | ||
|
||
"github.com/efficientgo/core/testutil" | ||
baseAPI "github.com/thanos-io/thanos/pkg/api" | ||
"github.com/thanos-io/thanos/pkg/block/metadata" | ||
"github.com/thanos-io/thanos/pkg/testutil/custom" | ||
"github.com/thanos-io/thanos/pkg/testutil/e2eutil" | ||
) | ||
|
||
func TestMain(m *testing.M) { | ||
custom.TolerantVerifyLeakMain(m) | ||
} | ||
|
||
func TestFlushEndpoint(t *testing.T) { | ||
ctx := context.Background() | ||
|
||
logger := log.NewNopLogger() | ||
|
||
prom, err := e2eutil.NewPrometheus() | ||
testutil.Ok(t, err) | ||
|
||
prom.SetConfig(fmt.Sprintf(` | ||
global: | ||
external_labels: | ||
region: eu-west | ||
scrape_configs: | ||
- job_name: 'myself' | ||
# Quick scrapes for test purposes. | ||
scrape_interval: 1s | ||
scrape_timeout: 1s | ||
static_configs: | ||
- targets: ['%s'] | ||
`, prom.Addr())) | ||
testutil.Ok(t, prom.Start(ctx, logger)) | ||
defer func() { testutil.Ok(t, prom.Stop()) }() | ||
|
||
promURL, err := url.Parse("http://" + prom.Addr()) | ||
testutil.Ok(t, err) | ||
httpClient, err := clientconfig.NewHTTPClient(clientconfig.NewDefaultHTTPClientConfig(), "thanos-sidecar") | ||
testutil.Ok(t, err) | ||
promClient := promclient.NewClient(httpClient, logger, "thanos-sidecar") | ||
|
||
// Waits for targets to be present in prometheus, so we know there will be active chunks in the head block | ||
// to snapshot | ||
testutil.Ok(t, runutil.Retry(1*time.Second, ctx.Done(), func() error { | ||
targets, err := promClient.TargetsInGRPC(ctx, promURL, "") | ||
testutil.Ok(t, err) | ||
if len(targets.ActiveTargets) > 0 { | ||
return nil | ||
} | ||
return errors.New("empty targets response from Prometheus") | ||
})) | ||
|
||
bkt := objstore.WithNoopInstr(objstore.NewInMemBucket()) | ||
externalLabels := labels.FromStrings("external", "label") | ||
s := shipper.New(logger, nil, prom.Dir(), bkt, func() labels.Labels { return externalLabels }, metadata.TestSource, nil, false, metadata.NoneFunc, shipper.DefaultMetaFilename) | ||
now := time.Now() | ||
|
||
api := &SidecarAPI{ | ||
baseAPI: &baseAPI.BaseAPI{ | ||
Now: func() time.Time { return now }, | ||
}, | ||
logger: logger, | ||
promURL: promURL, | ||
dataDir: prom.Dir(), | ||
client: promClient, | ||
shipper: s, | ||
} | ||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "", nil) | ||
testutil.Ok(t, err) | ||
|
||
res, _, err, _ := api.flush(req) | ||
r := res.(*flushResponse) | ||
testutil.Assert(t, r.BlocksUploaded > 0) | ||
testutil.Equals(t, (*baseAPI.ApiError)(nil), err) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't
/api/v1/snapshot
just create hardlinks to existing blocks? https://github.com/prometheus/prometheus/blob/main/tsdb/block.go#L687-L696. You will have to somehow filter out the non-head block here. I'm not sure how to do that.Alternatively, Prometheus could have a new parameter to take a snapshot of the head only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shipper.Sync
only uploads blocks that don't already exist in the buckethttps://github.com/thanos-io/thanos/blob/main/pkg/shipper/shipper.go#L284-L289