Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove gossip #1008

Merged
merged 9 commits into from May 28, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
93 changes: 3 additions & 90 deletions cmd/thanos/flags.go
Expand Up @@ -3,16 +3,9 @@ package main
import (
"fmt"
"io/ioutil"
"net"
"strconv"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
Expand All @@ -36,102 +29,22 @@ func regGRPCFlags(cmd *kingpin.CmdClause) (
grpcTLSSrvClientCA
}

// TODO(povilasv): we don't need this anymore.
func regCommonServerFlags(cmd *kingpin.CmdClause) (
grpcBindAddr *string,
httpBindAddr *string,
grpcTLSSrvCert *string,
grpcTLSSrvKey *string,
grpcTLSSrvClientCA *string,
peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (cluster.Peer, error)) {
grpcTLSSrvClientCA *string) {

httpBindAddr = regHTTPAddrFlag(cmd)
grpcBindAddr, grpcTLSSrvCert, grpcTLSSrvKey, grpcTLSSrvClientCA = regGRPCFlags(cmd)
grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Deprecated(gossip will be removed from v0.5.0): Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used.").
String()

clusterBindAddr := cmd.Flag("cluster.address", "Deprecated(gossip will be removed from v0.5.0): Listen ip:port address for gossip cluster.").
Default("0.0.0.0:10900").String()

clusterAdvertiseAddr := cmd.Flag("cluster.advertise-address", "Deprecated(gossip will be removed from v0.5.0): Explicit (external) ip:port address to advertise for gossip in gossip cluster. Used internally for membership only.").
String()

peers := cmd.Flag("cluster.peers", "Deprecated(gossip will be removed from v0.5.0): Initial peers to join the cluster. It can be either <ip:port>, or <domain:port>. A lookup resolution is done only at the startup.").Strings()

gossipInterval := modelDuration(cmd.Flag("cluster.gossip-interval", "Deprecated(gossip will be removed from v0.5.0): Interval between sending gossip messages. By lowering this value (more frequent) gossip messages are propagated across the cluster more quickly at the expense of increased bandwidth. Default is used from a specified network-type.").
PlaceHolder("<gossip interval>"))

pushPullInterval := modelDuration(cmd.Flag("cluster.pushpull-interval", "Deprecated(gossip will be removed from v0.5.0): Interval for gossip state syncs. Setting this interval lower (more frequent) will increase convergence speeds across larger clusters at the expense of increased bandwidth usage. Default is used from a specified network-type.").
PlaceHolder("<push-pull interval>"))

refreshInterval := modelDuration(cmd.Flag("cluster.refresh-interval", "Deprecated(gossip will be removed from v0.5.0): Interval for membership to refresh cluster.peers state, 0 disables refresh.").Default(cluster.DefaultRefreshInterval.String()))

secretKey := cmd.Flag("cluster.secret-key", "Deprecated(gossip will be removed from v0.5.0): Initial secret key to encrypt cluster gossip. Can be one of AES-128, AES-192, or AES-256 in hexadecimal format.").HexBytes()

networkType := cmd.Flag("cluster.network-type",
fmt.Sprintf("Deprecated(gossip will be removed from v0.5.0): Network type with predefined peers configurations. Sets of configurations accounting the latency differences between network types: %s.",
strings.Join(cluster.NetworkPeerTypes, ", "),
),
).
Default(cluster.LanNetworkPeerType).
Enum(cluster.NetworkPeerTypes...)

gossipDisabled := cmd.Flag("cluster.disable", "Deprecated(gossip will be removed from v0.5.0): If true gossip will be disabled and no cluster related server will be started.").Default("true").Bool()

return grpcBindAddr,
httpBindAddr,
grpcTLSSrvCert,
grpcTLSSrvKey,
grpcTLSSrvClientCA,
func(logger log.Logger, reg *prometheus.Registry, waitIfEmpty bool, httpAdvertiseAddr string, queryAPIEnabled bool) (cluster.Peer, error) {
if *gossipDisabled {
level.Info(logger).Log("msg", "gossip is disabled")
return cluster.NewNoop(), nil
}

host, port, err := cluster.CalculateAdvertiseAddress(*grpcBindAddr, *grpcAdvertiseAddr)
if err != nil {
return nil, errors.Wrapf(err, "calculate advertise StoreAPI addr for gossip based on bindAddr: %s and advAddr: %s", *grpcBindAddr, *grpcAdvertiseAddr)
}

advStoreAPIAddress := net.JoinHostPort(host, strconv.Itoa(port))
if cluster.IsUnroutable(advStoreAPIAddress) {
level.Warn(logger).Log("msg", "this component advertises its gRPC StoreAPI on an unroutable address. This will not work cross-cluster", "addr", advStoreAPIAddress)
level.Warn(logger).Log("msg", "provide --grpc-address as routable ip:port or --grpc-advertise-address as a routable host:port")
}

level.Info(logger).Log("msg", "StoreAPI address that will be propagated through gossip", "address", advStoreAPIAddress)

advQueryAPIAddress := httpAdvertiseAddr
if queryAPIEnabled {
host, port, err := cluster.CalculateAdvertiseAddress(*httpBindAddr, advQueryAPIAddress)
if err != nil {
return nil, errors.Wrapf(err, "calculate advertise QueryAPI addr for gossip based on bindAddr: %s and advAddr: %s", *httpBindAddr, advQueryAPIAddress)
}

advQueryAPIAddress = net.JoinHostPort(host, strconv.Itoa(port))
if cluster.IsUnroutable(advQueryAPIAddress) {
level.Warn(logger).Log("msg", "this component advertises its HTTP QueryAPI on an unroutable address. This will not work cross-cluster", "addr", advQueryAPIAddress)
level.Warn(logger).Log("msg", "provide --http-address as routable ip:port or --http-advertise-address as a routable host:port")
}

level.Info(logger).Log("msg", "QueryAPI address that will be propagated through gossip", "address", advQueryAPIAddress)
}

return cluster.New(logger,
reg,
*clusterBindAddr,
*clusterAdvertiseAddr,
advStoreAPIAddress,
advQueryAPIAddress,
*peers,
waitIfEmpty,
time.Duration(*gossipInterval),
time.Duration(*pushPullInterval),
time.Duration(*refreshInterval),
*secretKey,
*networkType,
)
}
grpcTLSSrvClientCA
}

func regHTTPAddrFlag(cmd *kingpin.CmdClause) *string {
Expand Down
57 changes: 1 addition & 56 deletions cmd/thanos/query.go
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/discovery/cache"
"github.com/improbable-eng/thanos/pkg/discovery/dns"
Expand Down Expand Up @@ -46,10 +45,7 @@ import (
func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")

grpcBindAddr, httpBindAddr, srvCert, srvKey, srvClientCA, newPeerFn := regCommonServerFlags(cmd)

httpAdvertiseAddr := cmd.Flag("http-advertise-address", "Explicit (external) host:port address to advertise for HTTP QueryAPI in gossip cluster. If empty, 'http-address' will be used.").
String()
grpcBindAddr, httpBindAddr, srvCert, srvKey, srvClientCA := regCommonServerFlags(cmd)

secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool()
cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String()
Expand Down Expand Up @@ -102,10 +98,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
peer, err := newPeerFn(logger, reg, true, *httpAdvertiseAddr, true)
if err != nil {
return errors.Wrap(err, "new cluster peer")
}
selectorLset, err := parseFlagLabels(*selectorLabels)
if err != nil {
return errors.Wrap(err, "parse federation labels")
Expand Down Expand Up @@ -153,7 +145,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
*replicaLabel,
peer,
selectorLset,
*stores,
*enableAutodownsampling,
Expand Down Expand Up @@ -271,7 +262,6 @@ func runQuery(
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
replicaLabel string,
peer cluster.Peer,
selectorLset labels.Labels,
storeAddrs []string,
enableAutodownsampling bool,
Expand Down Expand Up @@ -305,16 +295,6 @@ func runQuery(
logger,
reg,
func() (specs []query.StoreSpec) {
// Add store specs from gossip.
for id, ps := range peer.PeerStates(cluster.PeerTypesStoreAPIs()...) {
if ps.StoreAPIAddr == "" {
level.Error(logger).Log("msg", "Gossip found peer that propagates empty address, ignoring.", "lset", fmt.Sprintf("%v", ps.Metadata.Labels))
continue
}

specs = append(specs, &gossipSpec{id: id, addr: ps.StoreAPIAddr, stateFetcher: peer})
}

// Add DNS resolved addresses from static flags and file SD.
for _, addr := range dnsProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr))
Expand Down Expand Up @@ -388,21 +368,6 @@ func runQuery(
close(fileSDUpdates)
})
}
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// New gossip cluster.
if err := peer.Join(cluster.PeerTypeQuery, cluster.PeerMetadata{}); err != nil {
return errors.Wrap(err, "join cluster")
}

<-ctx.Done()
return nil
}, func(error) {
cancel()
peer.Close(5 * time.Second)
})
}
// Periodically update the addresses from static flags and file SD by resolving them using DNS SD if necessary.
{
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -507,23 +472,3 @@ func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Co
}
return deduplicated
}

type gossipSpec struct {
id string
addr string

stateFetcher cluster.PeerStateFetcher
}

func (s *gossipSpec) Addr() string {
return s.addr
}

// Metadata method for gossip store tries get current peer state.
func (s *gossipSpec) Metadata(_ context.Context, _ storepb.StoreClient) (labels []storepb.Label, mint int64, maxt int64, err error) {
state, ok := s.stateFetcher.PeerState(s.id)
if !ok {
return nil, 0, 0, errors.Errorf("peer %s is no longer in gossip cluster", s.id)
}
return state.Metadata.Labels, state.Metadata.MinTime, state.Metadata.MaxTime, nil
}