Skip to content

Commit

Permalink
Remove gossip (#1008)
Browse files Browse the repository at this point in the history
* Remove gossip

* Fix go.mod

* Remove gossip from tests

* Remove cluster pkg

* Update docs

* Fix tests

* Fixes after review

* Fixes after review

* Remove gossip mention from service-discovery doc
  • Loading branch information
povilasv authored and brancz committed May 28, 2019
1 parent 3f9cadd commit 37f89ad
Show file tree
Hide file tree
Showing 22 changed files with 44 additions and 1,598 deletions.
98 changes: 5 additions & 93 deletions cmd/thanos/flags.go
Expand Up @@ -3,16 +3,9 @@ package main
import (
"fmt"
"io/ioutil"
"net"
"strconv"
"strings"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)
Expand All @@ -23,7 +16,7 @@ func regGRPCFlags(cmd *kingpin.CmdClause) (
grpcTLSSrvKey *string,
grpcTLSSrvClientCA *string,
) {
grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components if you use gossip, 'grpc-advertise-address' is empty and you require cross-node connection.").
grpcBindAddr = cmd.Flag("grpc-address", "Listen ip:port address for gRPC endpoints (StoreAPI). Make sure this address is routable from other components.").
Default("0.0.0.0:10901").String()

grpcTLSSrvCert = cmd.Flag("grpc-server-tls-cert", "TLS Certificate for gRPC server, leave blank to disable TLS").Default("").String()
Expand All @@ -36,110 +29,29 @@ func regGRPCFlags(cmd *kingpin.CmdClause) (
grpcTLSSrvClientCA
}

// TODO(povilasv): we don't need this anymore.
func regCommonServerFlags(cmd *kingpin.CmdClause) (
grpcBindAddr *string,
httpBindAddr *string,
grpcTLSSrvCert *string,
grpcTLSSrvKey *string,
grpcTLSSrvClientCA *string,
peerFunc func(log.Logger, *prometheus.Registry, bool, string, bool) (cluster.Peer, error)) {

grpcTLSSrvClientCA *string) {
httpBindAddr = regHTTPAddrFlag(cmd)
grpcBindAddr, grpcTLSSrvCert, grpcTLSSrvKey, grpcTLSSrvClientCA = regGRPCFlags(cmd)
grpcAdvertiseAddr := cmd.Flag("grpc-advertise-address", "Deprecated(gossip will be removed from v0.5.0): Explicit (external) host:port address to advertise for gRPC StoreAPI in gossip cluster. If empty, 'grpc-address' will be used.").
String()

clusterBindAddr := cmd.Flag("cluster.address", "Deprecated(gossip will be removed from v0.5.0): Listen ip:port address for gossip cluster.").
Default("0.0.0.0:10900").String()

clusterAdvertiseAddr := cmd.Flag("cluster.advertise-address", "Deprecated(gossip will be removed from v0.5.0): Explicit (external) ip:port address to advertise for gossip in gossip cluster. Used internally for membership only.").
String()

peers := cmd.Flag("cluster.peers", "Deprecated(gossip will be removed from v0.5.0): Initial peers to join the cluster. It can be either <ip:port>, or <domain:port>. A lookup resolution is done only at the startup.").Strings()

gossipInterval := modelDuration(cmd.Flag("cluster.gossip-interval", "Deprecated(gossip will be removed from v0.5.0): Interval between sending gossip messages. By lowering this value (more frequent) gossip messages are propagated across the cluster more quickly at the expense of increased bandwidth. Default is used from a specified network-type.").
PlaceHolder("<gossip interval>"))

pushPullInterval := modelDuration(cmd.Flag("cluster.pushpull-interval", "Deprecated(gossip will be removed from v0.5.0): Interval for gossip state syncs. Setting this interval lower (more frequent) will increase convergence speeds across larger clusters at the expense of increased bandwidth usage. Default is used from a specified network-type.").
PlaceHolder("<push-pull interval>"))

refreshInterval := modelDuration(cmd.Flag("cluster.refresh-interval", "Deprecated(gossip will be removed from v0.5.0): Interval for membership to refresh cluster.peers state, 0 disables refresh.").Default(cluster.DefaultRefreshInterval.String()))

secretKey := cmd.Flag("cluster.secret-key", "Deprecated(gossip will be removed from v0.5.0): Initial secret key to encrypt cluster gossip. Can be one of AES-128, AES-192, or AES-256 in hexadecimal format.").HexBytes()

networkType := cmd.Flag("cluster.network-type",
fmt.Sprintf("Deprecated(gossip will be removed from v0.5.0): Network type with predefined peers configurations. Sets of configurations accounting the latency differences between network types: %s.",
strings.Join(cluster.NetworkPeerTypes, ", "),
),
).
Default(cluster.LanNetworkPeerType).
Enum(cluster.NetworkPeerTypes...)

gossipDisabled := cmd.Flag("cluster.disable", "Deprecated(gossip will be removed from v0.5.0): If true gossip will be disabled and no cluster related server will be started.").Default("true").Bool()

return grpcBindAddr,
httpBindAddr,
grpcTLSSrvCert,
grpcTLSSrvKey,
grpcTLSSrvClientCA,
func(logger log.Logger, reg *prometheus.Registry, waitIfEmpty bool, httpAdvertiseAddr string, queryAPIEnabled bool) (cluster.Peer, error) {
if *gossipDisabled {
level.Info(logger).Log("msg", "gossip is disabled")
return cluster.NewNoop(), nil
}

host, port, err := cluster.CalculateAdvertiseAddress(*grpcBindAddr, *grpcAdvertiseAddr)
if err != nil {
return nil, errors.Wrapf(err, "calculate advertise StoreAPI addr for gossip based on bindAddr: %s and advAddr: %s", *grpcBindAddr, *grpcAdvertiseAddr)
}

advStoreAPIAddress := net.JoinHostPort(host, strconv.Itoa(port))
if cluster.IsUnroutable(advStoreAPIAddress) {
level.Warn(logger).Log("msg", "this component advertises its gRPC StoreAPI on an unroutable address. This will not work cross-cluster", "addr", advStoreAPIAddress)
level.Warn(logger).Log("msg", "provide --grpc-address as routable ip:port or --grpc-advertise-address as a routable host:port")
}

level.Info(logger).Log("msg", "StoreAPI address that will be propagated through gossip", "address", advStoreAPIAddress)

advQueryAPIAddress := httpAdvertiseAddr
if queryAPIEnabled {
host, port, err := cluster.CalculateAdvertiseAddress(*httpBindAddr, advQueryAPIAddress)
if err != nil {
return nil, errors.Wrapf(err, "calculate advertise QueryAPI addr for gossip based on bindAddr: %s and advAddr: %s", *httpBindAddr, advQueryAPIAddress)
}

advQueryAPIAddress = net.JoinHostPort(host, strconv.Itoa(port))
if cluster.IsUnroutable(advQueryAPIAddress) {
level.Warn(logger).Log("msg", "this component advertises its HTTP QueryAPI on an unroutable address. This will not work cross-cluster", "addr", advQueryAPIAddress)
level.Warn(logger).Log("msg", "provide --http-address as routable ip:port or --http-advertise-address as a routable host:port")
}

level.Info(logger).Log("msg", "QueryAPI address that will be propagated through gossip", "address", advQueryAPIAddress)
}

return cluster.New(logger,
reg,
*clusterBindAddr,
*clusterAdvertiseAddr,
advStoreAPIAddress,
advQueryAPIAddress,
*peers,
waitIfEmpty,
time.Duration(*gossipInterval),
time.Duration(*pushPullInterval),
time.Duration(*refreshInterval),
*secretKey,
*networkType,
)
}
grpcTLSSrvClientCA
}

func regHTTPAddrFlag(cmd *kingpin.CmdClause) *string {
return cmd.Flag("http-address", "Listen host:port for HTTP endpoints.").Default("0.0.0.0:10902").String()
}

func modelDuration(flags *kingpin.FlagClause) *model.Duration {
var value = new(model.Duration)
value := new(model.Duration)
flags.SetValue(value)

return value
Expand Down
57 changes: 1 addition & 56 deletions cmd/thanos/query.go
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/go-kit/kit/log/level"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/improbable-eng/thanos/pkg/cluster"
"github.com/improbable-eng/thanos/pkg/component"
"github.com/improbable-eng/thanos/pkg/discovery/cache"
"github.com/improbable-eng/thanos/pkg/discovery/dns"
Expand Down Expand Up @@ -46,10 +45,7 @@ import (
func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string) {
cmd := app.Command(name, "query node exposing PromQL enabled Query API with data retrieved from multiple store nodes")

grpcBindAddr, httpBindAddr, srvCert, srvKey, srvClientCA, newPeerFn := regCommonServerFlags(cmd)

httpAdvertiseAddr := cmd.Flag("http-advertise-address", "Explicit (external) host:port address to advertise for HTTP QueryAPI in gossip cluster. If empty, 'http-address' will be used.").
String()
grpcBindAddr, httpBindAddr, srvCert, srvKey, srvClientCA := regCommonServerFlags(cmd)

secure := cmd.Flag("grpc-client-tls-secure", "Use TLS when talking to the gRPC server").Default("false").Bool()
cert := cmd.Flag("grpc-client-tls-cert", "TLS Certificates to use to identify this client to the server").Default("").String()
Expand Down Expand Up @@ -102,10 +98,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
storeResponseTimeout := modelDuration(cmd.Flag("store.response-timeout", "If a Store doesn't send any data in this specified duration then a Store will be ignored and partial data will be returned if it's enabled. 0 disables timeout.").Default("0ms"))

m[name] = func(g *run.Group, logger log.Logger, reg *prometheus.Registry, tracer opentracing.Tracer, _ bool) error {
peer, err := newPeerFn(logger, reg, true, *httpAdvertiseAddr, true)
if err != nil {
return errors.Wrap(err, "new cluster peer")
}
selectorLset, err := parseFlagLabels(*selectorLabels)
if err != nil {
return errors.Wrap(err, "parse federation labels")
Expand Down Expand Up @@ -153,7 +145,6 @@ func registerQuery(m map[string]setupFunc, app *kingpin.Application, name string
time.Duration(*queryTimeout),
time.Duration(*storeResponseTimeout),
*replicaLabel,
peer,
selectorLset,
*stores,
*enableAutodownsampling,
Expand Down Expand Up @@ -271,7 +262,6 @@ func runQuery(
queryTimeout time.Duration,
storeResponseTimeout time.Duration,
replicaLabel string,
peer cluster.Peer,
selectorLset labels.Labels,
storeAddrs []string,
enableAutodownsampling bool,
Expand Down Expand Up @@ -305,16 +295,6 @@ func runQuery(
logger,
reg,
func() (specs []query.StoreSpec) {
// Add store specs from gossip.
for id, ps := range peer.PeerStates(cluster.PeerTypesStoreAPIs()...) {
if ps.StoreAPIAddr == "" {
level.Error(logger).Log("msg", "Gossip found peer that propagates empty address, ignoring.", "lset", fmt.Sprintf("%v", ps.Metadata.Labels))
continue
}

specs = append(specs, &gossipSpec{id: id, addr: ps.StoreAPIAddr, stateFetcher: peer})
}

// Add DNS resolved addresses from static flags and file SD.
for _, addr := range dnsProvider.Addresses() {
specs = append(specs, query.NewGRPCStoreSpec(addr))
Expand Down Expand Up @@ -388,21 +368,6 @@ func runQuery(
close(fileSDUpdates)
})
}
{
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
// New gossip cluster.
if err := peer.Join(cluster.PeerTypeQuery, cluster.PeerMetadata{}); err != nil {
return errors.Wrap(err, "join cluster")
}

<-ctx.Done()
return nil
}, func(error) {
cancel()
peer.Close(5 * time.Second)
})
}
// Periodically update the addresses from static flags and file SD by resolving them using DNS SD if necessary.
{
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -507,23 +472,3 @@ func removeDuplicateStoreSpecs(logger log.Logger, duplicatedStores prometheus.Co
}
return deduplicated
}

type gossipSpec struct {
id string
addr string

stateFetcher cluster.PeerStateFetcher
}

func (s *gossipSpec) Addr() string {
return s.addr
}

// Metadata method for gossip store tries get current peer state.
func (s *gossipSpec) Metadata(_ context.Context, _ storepb.StoreClient) (labels []storepb.Label, mint int64, maxt int64, err error) {
state, ok := s.stateFetcher.PeerState(s.id)
if !ok {
return nil, 0, 0, errors.Errorf("peer %s is no longer in gossip cluster", s.id)
}
return state.Metadata.Labels, state.Metadata.MinTime, state.Metadata.MaxTime, nil
}

0 comments on commit 37f89ad

Please sign in to comment.