Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storeApi: add component information to gRPC's Info response (resolves #739) #750

Merged
merged 1 commit into from
Feb 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/thanos/downsample.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/json"
"github.com/improbable-eng/thanos/pkg/component"
"os"
"path"
"path/filepath"
Expand Down Expand Up @@ -35,7 +36,7 @@ func registerDownsample(m map[string]setupFunc, app *kingpin.Application, name s
objStoreConfig := regCommonObjStoreFlags(cmd, "", true)

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
return runDownsample(g, logger, reg, *dataDir, objStoreConfig, name)
return runDownsample(g, logger, reg, *dataDir, objStoreConfig)
}
}

Expand All @@ -45,14 +46,13 @@ func runDownsample(
reg *prometheus.Registry,
dataDir string,
objStoreConfig *pathOrContent,
component string,
) error {
confContentYaml, err := objStoreConfig.Content()
if err != nil {
return err
}

bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Downsample.String())
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ import (
"path"
"time"

"github.com/improbable-eng/thanos/pkg/extprom"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/discovery/cache"
"github.com/improbable-eng/thanos/pkg/discovery/dns"
"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/query"
"github.com/improbable-eng/thanos/pkg/query/api"
"github.com/improbable-eng/thanos/pkg/runutil"
Expand Down Expand Up @@ -306,7 +306,7 @@ func runQuery(
)
proxy = store.NewProxyStore(logger, func(context.Context) ([]store.Client, error) {
return stores.Get(), nil
}, selectorLset)
}, component.Query, selectorLset)
queryableCreator = query.NewQueryableCreator(logger, proxy, replicaLabel)
engine = promql.NewEngine(
promql.EngineOpts{
Expand Down Expand Up @@ -446,7 +446,7 @@ func runQuery(
if err != nil {
return errors.Wrapf(err, "listen gRPC on address")
}
logger := log.With(logger, "component", "query")
logger := log.With(logger, "component", component.Query.String())

opts, err := defaultGRPCServerOpts(logger, reg, tracer, srvCert, srvKey, srvClientCA)
if err != nil {
Expand Down
9 changes: 4 additions & 5 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/improbable-eng/thanos/pkg/alert"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/discovery/cache"
"github.com/improbable-eng/thanos/pkg/discovery/dns"
"github.com/improbable-eng/thanos/pkg/extprom"
Expand Down Expand Up @@ -159,7 +160,6 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application, name string)
peer,
objStoreConfig,
tsdbOpts,
name,
alertQueryURL,
*alertExcludeLabels,
*queries,
Expand Down Expand Up @@ -193,7 +193,6 @@ func runRule(
peer cluster.Peer,
objStoreConfig *pathOrContent,
tsdbOpts *tsdb.Options,
component string,
alertQueryURL *url.URL,
alertExcludeLabels []string,
queryAddrs []string,
Expand Down Expand Up @@ -520,9 +519,9 @@ func runRule(
if err != nil {
return errors.Wrap(err, "listen API address")
}
logger := log.With(logger, "component", "store")
logger := log.With(logger, "component", component.Rule.String())

store := store.NewTSDBStore(logger, reg, db, lset)
store := store.NewTSDBStore(logger, reg, db, component.Rule, lset)

opts, err := defaultGRPCServerOpts(logger, reg, tracer, cert, key, clientCA)
if err != nil {
Expand Down Expand Up @@ -593,7 +592,7 @@ func runRule(
if uploads {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Rule.String())
if err != nil {
return err
}
Expand Down
9 changes: 4 additions & 5 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/promclient"
"github.com/improbable-eng/thanos/pkg/reloader"
Expand Down Expand Up @@ -77,7 +78,6 @@ func registerSidecar(m map[string]setupFunc, app *kingpin.Application, name stri
objStoreConfig,
peer,
rl,
name,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here - let's just put constant in place everywhere in sidecar - no point to dynamically eval this variable

)
}
}
Expand All @@ -97,7 +97,6 @@ func runSidecar(
objStoreConfig *pathOrContent,
peer cluster.Peer,
reloader *reloader.Reloader,
component string,
) error {
var m = &promMetadata{
promURL: promURL,
Expand Down Expand Up @@ -196,12 +195,12 @@ func runSidecar(
if err != nil {
return errors.Wrap(err, "listen API address")
}
logger := log.With(logger, "component", "sidecar")
logger := log.With(logger, "component", component.Sidecar.String())

var client http.Client

promStore, err := store.NewPrometheusStore(
logger, &client, promURL, m.Labels, m.Timestamps)
logger, &client, promURL, component.Sidecar, m.Labels, m.Timestamps)
if err != nil {
return errors.Wrap(err, "create Prometheus store")
}
Expand Down Expand Up @@ -236,7 +235,7 @@ func runSidecar(
if uploads {
// The background shipper continuously scans the data directory and uploads
// new blocks to Google Cloud Storage or an S3-compatible storage service.
bkt, err := client.NewBucket(logger, confContentYaml, reg, component)
bkt, err := client.NewBucket(logger, confContentYaml, reg, component.Sidecar.String())
if err != nil {
return err
}
Expand Down
12 changes: 4 additions & 8 deletions pkg/compact/downsample/downsample_test.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
package downsample

import (
"github.com/prometheus/tsdb"
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"

"github.com/improbable-eng/thanos/pkg/block/metadata"

"github.com/prometheus/prometheus/pkg/value"

"github.com/prometheus/tsdb/chunks"

"time"

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/testutil"
"github.com/prometheus/prometheus/pkg/value"
"github.com/prometheus/tsdb"
"github.com/prometheus/tsdb/chunkenc"
"github.com/prometheus/tsdb/chunks"
"github.com/prometheus/tsdb/index"
"github.com/prometheus/tsdb/labels"
)
Expand Down
87 changes: 87 additions & 0 deletions pkg/component/component.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package component

import (
"strings"

"github.com/improbable-eng/thanos/pkg/store/storepb"
)

// StoreAPI is a component that implements Thanos' gRPC StoreAPI.
type StoreAPI interface {
adrien-f marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go wants comments like this for exported functions:

Suggested change
type StoreAPI interface {
// StoreAPI is ...
type StoreAPI interface {

https://blog.golang.org/godoc-documenting-go-code

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exaclty this ^

implementsStoreAPI()
String() string
ToProto() storepb.StoreType
}

// Source is a Thanos component that produce blocks of metrics.
type Source interface {
producesBlocks()
String() string
}

// SourceStoreAPI is a component that implements Thanos' gRPC StoreAPI
// and produce blocks of metrics.
type SourceStoreAPI interface {
implementsStoreAPI()
producesBlocks()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, we could use embedding here. For example:

type SourceStoreAPI interface {
    Source
    StoreAPI
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I guess it will cause diamond problem as described here: https://joaodlf.com/go-the-diamond-problem.html

(with String()) - that's fine then - let's leave your version as it is.

String() string
ToProto() storepb.StoreType
}

type component struct {
name string
}

func (c component) String() string { return c.name }

type storeAPI struct {
component
}

func (storeAPI) implementsStoreAPI() {}

func (s sourceStoreAPI) ToProto() storepb.StoreType {
return storepb.StoreType(storepb.StoreType_value[strings.ToUpper(s.String())])
}

func (s storeAPI) ToProto() storepb.StoreType {
return storepb.StoreType(storepb.StoreType_value[strings.ToUpper(s.String())])
}

type source struct {
component
}

func (source) producesBlocks() {}

type sourceStoreAPI struct {
component
source
storeAPI
}

// FromProto converts from a gRPC StoreType to StoreAPI.
func FromProto(storeType storepb.StoreType) StoreAPI {
switch storeType {
case storepb.StoreType_QUERY:
return Query
case storepb.StoreType_RULE:
return Rule
case storepb.StoreType_SIDECAR:
return Sidecar
case storepb.StoreType_STORE:
return Store
default:
return nil
}
}

var (
Bucket = source{component: component{name: "bucket"}}
Compact = source{component: component{name: "compact"}}
Downsample = source{component: component{name: "downsample"}}
Query = sourceStoreAPI{component: component{name: "query"}}
Rule = sourceStoreAPI{component: component{name: "rule"}}
Sidecar = sourceStoreAPI{component: component{name: "sidecar"}}
Store = sourceStoreAPI{component: component{name: "store"}}
adrien-f marked this conversation as resolved.
Show resolved Hide resolved
)
17 changes: 11 additions & 6 deletions pkg/query/storeset.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/store"
"github.com/improbable-eng/thanos/pkg/store/storepb"
Expand All @@ -26,7 +27,7 @@ const (
type StoreSpec interface {
// Addr returns StoreAPI Address for the store spec. It is used as ID for store.
Addr() string
// Metadata returns current labels and min, max ranges for store.
// Metadata returns current labels, store type and min, max ranges for store.
// It can change for every call for this method.
// If metadata call fails we assume that store is no longer accessible and we should not use it.
// NOTE: It is implementation responsibility to retry until context timeout, but a caller responsibility to manage
Expand All @@ -38,9 +39,10 @@ type StoreStatus struct {
Name string
LastCheck time.Time
LastError error
Labels []storepb.Label
StoreType component.StoreAPI
MinTime int64
MaxTime int64
Labels []storepb.Label
}

type grpcStoreSpec struct {
Expand Down Expand Up @@ -159,9 +161,10 @@ type storeRef struct {
addr string

// Meta (can change during runtime).
labels []storepb.Label
minTime int64
maxTime int64
labels []storepb.Label
storeType component.StoreAPI
minTime int64
maxTime int64

logger log.Logger
}
Expand Down Expand Up @@ -305,6 +308,7 @@ func (s *StoreSet) getHealthyStores(ctx context.Context) map[string]*storeRef {
level.Warn(s.logger).Log("msg", "update of store node failed", "err", errors.Wrap(err, "initial store client info fetch"), "address", addr)
return
}
store.storeType = component.FromProto(resp.StoreType)
store.Update(resp.Labels, resp.MinTime, resp.MaxTime)
}

Expand Down Expand Up @@ -340,9 +344,10 @@ func (s *StoreSet) updateStoreStatus(store *storeRef, err error) {
now := time.Now()
s.storeStatuses[store.addr] = &StoreStatus{
Name: store.addr,
Labels: store.labels,
LastError: err,
LastCheck: now,
Labels: store.labels,
StoreType: store.storeType,
MinTime: store.minTime,
MaxTime: store.maxTime,
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/improbable-eng/thanos/pkg/block"
"github.com/improbable-eng/thanos/pkg/block/metadata"
"github.com/improbable-eng/thanos/pkg/compact/downsample"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/objstore"
"github.com/improbable-eng/thanos/pkg/pool"
"github.com/improbable-eng/thanos/pkg/runutil"
Expand Down Expand Up @@ -423,8 +424,9 @@ func (s *BucketStore) Info(context.Context, *storepb.InfoRequest) (*storepb.Info
mint, maxt := s.TimeRange()
// Store nodes hold global data and thus have no labels.
return &storepb.InfoResponse{
MinTime: mint,
MaxTime: maxt,
StoreType: component.Store.ToProto(),
MinTime: mint,
MaxTime: maxt,
}, nil
}

Expand Down