Skip to content

Commit

Permalink
Change HTML title
Browse files Browse the repository at this point in the history
Signed-off-by: Prem Kumar <prmsrswt@gmail.com>
  • Loading branch information
onprem committed Jul 28, 2020
2 parents 0b89b52 + 8d62d42 commit 6a085f3
Show file tree
Hide file tree
Showing 35 changed files with 781 additions and 166 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Expand Up @@ -13,17 +13,26 @@ We use *breaking* word for marking changes that are not backward compatible (rel

### Fixed

- [#2937](https://github.com/thanos-io/thanos/pull/2937) Receive: Fixing auto-configuration of --receive.local-endpoint
- [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers.
- [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix`
- [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase
- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive, Querier: Fixed leaks on receive and querier Store API Series, which were leaking on errors.
- [#2895](https://github.com/thanos-io/thanos/pull/2895) Compact: Fix increment of `thanos_compact_downsample_total` metric for downsample of 5m resolution blocks.
- [#2858](https://github.com/thanos-io/thanos/pull/2858) Store: Fix `--store.grpc.series-sample-limit` implementation. The limit is now applied to the sum of all samples fetched across all queried blocks via a single Series call, instead of applying it individually to each block.
- [#2936](https://github.com/thanos-io/thanos/pull/2936) Compact: Fix ReplicaLabelRemover panic when replicaLabels are not specified.

### Added

- [#2832](https://github.com/thanos-io/thanos/pull/2832) ui: React: Add runtime and build info page
- [#2305](https://github.com/thanos-io/thanos/pull/2305) Receive,Sidecar,Ruler: Propagate correct (stricter) MinTime for no-block TSDBs.
- [#2865](https://github.com/thanos-io/thanos/pull/2865) ui: Migrate Thanos Ruler UI to React
- [#2892](https://github.com/thanos-io/thanos/pull/2892) Receive: Receiver fails when the initial upload fails.

### Changed

- [#2893](https://github.com/thanos-io/thanos/pull/2893) Store: Rename metric `thanos_bucket_store_cached_postings_compression_time_seconds` to `thanos_bucket_store_cached_postings_compression_time_seconds_total`.
- [#2915](https://github.com/thanos-io/thanos/pull/2915) Receive,Ruler: Enable TSDB directory locking by default. Add a new flag (`--tsdb.no-lockfile`) to override behavior.

## [v0.14.0](https://github.com/thanos-io/thanos/releases/tag/v0.14.0) - 2020.07.10

Expand Down
6 changes: 6 additions & 0 deletions cmd/thanos/compact.go
Expand Up @@ -303,6 +303,12 @@ func runCompact(
if err := sy.SyncMetas(ctx); err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}

for _, meta := range sy.Metas() {
groupKey := compact.DefaultGroupKey(meta.Thanos)
downsampleMetrics.downsamples.WithLabelValues(groupKey)
downsampleMetrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, downsampleMetrics, bkt, sy.Metas(), downsamplingDir); err != nil {
return errors.Wrap(err, "first pass of downsampling failed")
}
Expand Down
8 changes: 7 additions & 1 deletion cmd/thanos/downsample.go
Expand Up @@ -106,6 +106,12 @@ func RunDownsample(
if err != nil {
return errors.Wrap(err, "sync before first pass of downsampling")
}

for _, meta := range metas {
groupKey := compact.DefaultGroupKey(meta.Thanos)
metrics.downsamples.WithLabelValues(groupKey)
metrics.downsampleFailures.WithLabelValues(groupKey)
}
if err := downsampleBucket(ctx, logger, metrics, bkt, metas, dataDir); err != nil {
return errors.Wrap(err, "downsampling failed")
}
Expand Down Expand Up @@ -231,7 +237,7 @@ func downsampleBucket(
continue
}
if err := processDownsampling(ctx, logger, bkt, m, dir, downsample.ResLevel2); err != nil {
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos))
metrics.downsampleFailures.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
return errors.Wrap(err, "downsampling to 60 min")
}
metrics.downsamples.WithLabelValues(compact.DefaultGroupKey(m.Thanos)).Inc()
Expand Down
115 changes: 68 additions & 47 deletions cmd/thanos/receive.go
Expand Up @@ -15,13 +15,13 @@ import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/oklog/run"
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
kingpin "gopkg.in/alecthomas/kingpin.v2"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/extflag"
Expand Down Expand Up @@ -70,7 +70,7 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
refreshInterval := modelDuration(cmd.Flag("receive.hashrings-file-refresh-interval", "Refresh interval to re-read the hashring configuration file. (used as a fallback)").
Default("5m"))

local := cmd.Flag("receive.local-endpoint", "Endpoint of local receive node. Used to identify the local node in the hashring configuration.").String()
localEndpoint := cmd.Flag("receive.local-endpoint", "Endpoint of local receive node. Used to identify the local node in the hashring configuration.").String()

tenantHeader := cmd.Flag("receive.tenant-header", "HTTP header to determine tenant for write requests.").Default(receive.DefaultTenantHeader).String()

Expand All @@ -82,14 +82,14 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {

replicationFactor := cmd.Flag("receive.replication-factor", "How many times to replicate incoming write requests.").Default("1").Uint64()

forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for forward requests.").Default("5s").Hidden())
forwardTimeout := modelDuration(cmd.Flag("receive-forward-timeout", "Timeout for each forward request.").Default("5s").Hidden())

tsdbMinBlockDuration := modelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden())
tsdbMaxBlockDuration := modelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden())
ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true receive will not require min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool()

walCompression := cmd.Flag("tsdb.wal-compression", "Compress the tsdb WAL.").Default("true").Bool()
noLockFile := cmd.Flag("tsdb.no-lockfile", "Do not create lockfile in TSDB data directory. In any case, the lockfiles will be deleted on next startup.").Default("false").Bool()

ignoreBlockSize := cmd.Flag("shipper.ignore-unequal-block-size", "If true receive will not require min and max block size flags to be set to the same value. Only use this if you want to keep long retention and compaction enabled, as in the worst case it can result in ~2h data loss for your Thanos bucket storage.").Default("false").Hidden().Bool()
allowOutOfOrderUpload := cmd.Flag("shipper.allow-out-of-order-uploads",
"If true, shipper will skip failed block uploads in the given iteration and retry later. This means that some newer blocks might be uploaded sooner than older blocks."+
"This can trigger compaction without those blocks and as a result will create an overlap situation. Set it to true if you have vertical compaction enabled and wish to upload blocks as soon as possible without caring"+
Expand All @@ -114,20 +114,20 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
MinBlockDuration: int64(time.Duration(*tsdbMinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(*tsdbMaxBlockDuration) / time.Millisecond),
RetentionDuration: int64(time.Duration(*retention) / time.Millisecond),
NoLockfile: true,
NoLockfile: *noLockFile,
WALCompression: *walCompression,
}

// Local is empty, so try to generate a local endpoint
// based on the hostname and the listening port.
if *local == "" {
if *localEndpoint == "" {
hostname, err := os.Hostname()
if hostname == "" || err != nil {
return errors.New("--receive.local-endpoint is empty and host could not be determined.")
}
parts := strings.Split(*rwAddress, ":")
parts := strings.Split(*grpcBindAddr, ":")
port := parts[len(parts)-1]
*local = fmt.Sprintf("http://%s:%s/api/v1/receive", hostname, port)
*localEndpoint = fmt.Sprintf("%s:%s", hostname, port)
}

return runReceive(
Expand Down Expand Up @@ -156,15 +156,15 @@ func registerReceive(m map[string]setupFunc, app *kingpin.Application) {
*ignoreBlockSize,
lset,
cw,
*local,
*localEndpoint,
*tenantHeader,
*defaultTenantID,
*tenantLabelName,
*replicaHeader,
*replicationFactor,
time.Duration(*forwardTimeout),
comp,
*allowOutOfOrderUpload,
comp,
)
}
}
Expand Down Expand Up @@ -202,8 +202,8 @@ func runReceive(
replicaHeader string,
replicationFactor uint64,
forwardTimeout time.Duration,
comp component.SourceStoreAPI,
allowOutOfOrderUpload bool,
comp component.SourceStoreAPI,
) error {
logger = log.With(logger, "component", "receive")
level.Warn(logger).Log("msg", "setting up receive")
Expand Down Expand Up @@ -294,6 +294,7 @@ func runReceive(

level.Debug(logger).Log("msg", "setting up tsdb")
{
log.With(logger, "component", "storage")
dbUpdatesStarted := promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_receive_multi_db_updates_attempted_total",
Help: "Number of Multi DB attempted reloads with flush and potential upload due to hashring changes",
Expand All @@ -303,6 +304,11 @@ func runReceive(
Help: "Number of Multi DB completed reloads with flush and potential upload due to hashring changes",
})

level.Debug(logger).Log("msg", "removing storage lock files if any")
if err := dbs.RemoveLockFilesIfAny(); err != nil {
return errors.Wrap(err, "remove storage lock files")
}

// TSDBs reload logic, listening on hashring changes.
cancel := make(chan struct{})
g.Add(func() error {
Expand All @@ -311,12 +317,17 @@ func runReceive(

// Before quitting, ensure the WAL is flushed and the DBs are closed.
defer func() {
level.Info(logger).Log("msg", "shutting down storage")
if err := dbs.Flush(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to flush storage")
level.Error(logger).Log("err", err, "msg", "failed to flush storage")
} else {
level.Info(logger).Log("msg", "storage is flushed successfully")
}
if err := dbs.Close(); err != nil {
level.Warn(logger).Log("err", err, "msg", "failed to close multi db")
level.Error(logger).Log("err", err, "msg", "failed to close storage")
return
}
level.Info(logger).Log("msg", "storage is closed")
}()

for {
Expand All @@ -328,7 +339,7 @@ func runReceive(
return nil
}
dbUpdatesStarted.Inc()
level.Info(logger).Log("msg", "updating Multi DB")
level.Info(logger).Log("msg", "updating storage")

if err := dbs.Flush(); err != nil {
return errors.Wrap(err, "flushing storage")
Expand All @@ -341,7 +352,7 @@ func runReceive(
<-uploadDone
}
statusProber.Ready()
level.Info(logger).Log("msg", "tsdb started, and server is ready to receive web requests")
level.Info(logger).Log("msg", "storage started, and server is ready to receive web requests")
dbUpdatesCompleted.Inc()
dbReady <- struct{}{}
}
Expand Down Expand Up @@ -394,7 +405,7 @@ func runReceive(
return nil
}
webHandler.Hashring(h)
msg := "hashring has changed; server is not ready to receive web requests."
msg := "hashring has changed; server is not ready to receive web requests"
statusProber.NotReady(errors.New(msg))
level.Info(logger).Log("msg", msg)
hashringChangedChan <- struct{}{}
Expand Down Expand Up @@ -489,57 +500,67 @@ func runReceive(
}

if upload {
level.Debug(logger).Log("msg", "upload enabled")
if err := dbs.Sync(context.Background()); err != nil {
level.Warn(logger).Log("msg", "initial upload failed", "err", err)
}
logger := log.With(logger, "component", "uploader")
upload := func(ctx context.Context) error {
level.Debug(logger).Log("msg", "upload starting")
start := time.Now()

if err := dbs.Sync(ctx); err != nil {
level.Warn(logger).Log("msg", "upload failed", "elapsed", time.Since(start), "err", err)
return err
}
level.Debug(logger).Log("msg", "upload done", "elapsed", time.Since(start))
return nil
}
{
// Run the uploader in a loop.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if err := dbs.Sync(ctx); err != nil {
level.Warn(logger).Log("msg", "interval upload failed", "err", err)
}

return nil
})
}, func(error) {
cancel()
})
level.Info(logger).Log("msg", "upload enabled, starting initial sync")
if err := upload(context.Background()); err != nil {
return errors.Wrap(err, "initial upload failed")
}
level.Info(logger).Log("msg", "initial sync done")
}

{
// Upload on demand.
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// Ensure we clean up everything properly.
defer func() {
runutil.CloseWithLogOnErr(logger, bkt, "bucket client")
}()

// Before quitting, ensure all blocks are uploaded.
defer func() {
<-uploadC
if err := dbs.Sync(context.Background()); err != nil {
level.Warn(logger).Log("msg", "on demnad upload failed", "err", err)
<-uploadC // Closed by storage routine when it's done.
level.Info(logger).Log("msg", "uploading the final cut block before exiting")
ctx, cancel := context.WithCancel(context.Background())
if err := dbs.Sync(ctx); err != nil {
cancel()
level.Error(logger).Log("msg", "the final upload failed", "err", err)
return
}
cancel()
level.Info(logger).Log("msg", "the final cut block was uploaded")
}()

defer close(uploadDone)

// Run the uploader in a loop.
tick := time.NewTicker(30 * time.Second)
defer tick.Stop()

for {
select {
case <-ctx.Done():
return nil
default:
}
select {
case <-ctx.Done():
return nil
case <-uploadC:
if err := dbs.Sync(ctx); err != nil {
level.Warn(logger).Log("err", err)
// Upload on demand.
if err := upload(ctx); err != nil {
level.Warn(logger).Log("msg", "on demand upload failed", "err", err)
}
uploadDone <- struct{}{}
case <-tick.C:
if err := upload(ctx); err != nil {
level.Warn(logger).Log("msg", "recurring upload failed", "err", err)
}
}
}
}, func(error) {
Expand Down

0 comments on commit 6a085f3

Please sign in to comment.