Skip to content

Commit

Permalink
Improvements to joining an etcd cluster (#8)
Browse files Browse the repository at this point in the history
* ensure get server peers excludes member's peer urls

* add function to check for a string in a slice of strings

* remove tls configuration since etcd handles nil tls differently

* ensure that server start doesn't block and repeatedly retries joining
  • Loading branch information
charless-splunk authored Apr 29, 2019
1 parent bac7a59 commit 3f369f7
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 96 deletions.
68 changes: 33 additions & 35 deletions embetcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,30 +84,57 @@ func (c *Client) PutWithKeepAlive(ctx context.Context, key string, value string,
return lease, keepAlive, cancel, err
}

func (c *Client) arePeerURLSInCluster(ctx context.Context, apURLS []url.URL) (areIn bool, err error) {
var members *cli.MemberListResponse
members, err = c.MemberList(ctx)

if err == nil && members != nil && members.Members != nil {
apStrings := URLSToStringSlice(apURLS)

for _, member := range members.Members {
memberPURLs := member.GetPeerURLs()
for _, u := range apStrings {
if StringIsInStringSlice(u, memberPURLs) {
areIn = true
break
}
}
}
}
return areIn, err
}

// getServerPeers returns the peer urls for the cluster formatted for the initialCluster server configuration.
// The context that is passed in should have a configured timeout.
func (c *Client) getServerPeers(ctx context.Context, initialCluster string) (peers string, err error) {
func (c *Client) getServerPeers(ctx context.Context, initialCluster string, serverName *string, apURLS []url.URL, dialTimeout *time.Duration) (peers string, err error) {
var members *cli.MemberListResponse
var timeout context.Context
var cancel context.CancelFunc
defer CancelContext(cancel)

apURLStrings := URLSToStringSlice(apURLS)

for ctx.Err() == nil && (err == nil || err.Error() != etcdserver.ErrStopped.Error()) {
// initialize peers with the supplied initial cluster string
peers = initialCluster

// get the list of members
members, err = c.MemberList(ctx)
timeout, cancel = context.WithTimeout(ctx, DurationOrDefault(dialTimeout, DefaultDialTimeout))
members, err = c.MemberList(timeout)
cancel()

if err == nil {
// add members to the initial cluster
for _, member := range members.Members {

// if there's at least one peer url add it to the initial cluster
if pURLS := member.GetPeerURLs(); len(pURLS) > 0 {
// peers should already have this server's address so we can safely append ",%s=%s"
for _, url := range member.GetPeerURLs() {
peers = fmt.Sprintf("%s,%s=%s", peers, member.Name, url)
if !StringIsInStringSlice(url, apURLStrings) {
peers = fmt.Sprintf("%s,%s=%s", peers, member.Name, url)
}
}
}

}
break
}
Expand All @@ -126,6 +153,7 @@ func (c *Client) serverNameConflicts(ctx context.Context, name string) (conflict
if member.Name == name {
conflicts = true
err = ErrNameConflict
break
}
}
}
Expand All @@ -148,36 +176,6 @@ func (c *Client) clusterName(ctx context.Context) (name string, err error) {
return name, err
}

// addMemberToExistingCluster informs an etcd cluster that a server is about to be added to the cluster. The cluster
// can premptively reject this addition if it violates quorum
func (c *Client) addMemberToExistingCluster(ctx context.Context, serverName string, apURLs []url.URL) (err error) {
// loop while the context hasn't closed
var conflict bool
for ctx.Err() == nil && (err == nil || err.Error() != etcdserver.ErrStopped.Error()) {

// Ensure that the server name does not already exist in the cluster.
// We want to ensure uniquely named cluster members.
// If this member died and is trying to rejoin, we want to retry until
// the cluster removes it or our parent context expires.
conflict, err = c.serverNameConflicts(ctx, serverName)

if !conflict && err == nil {

// add the member
_, err = c.MemberAdd(ctx, URLSToStringSlice(apURLs))

// break out of loop if we added ourselves cleanly
if err == nil {
break
}
}

time.Sleep(time.Second * 1)
}

return err
}

// NewClient returns a new etcd v3client wrapped with some helper functions
func NewClient(cfg cli.Config) (client *Client, err error) {
var etcdClient *cli.Client
Expand Down
11 changes: 11 additions & 0 deletions embetcd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,14 @@ func RevokeLease(ctx context.Context, client *Client, lease *cli.LeaseGrantRespo
client.Revoke(ctx, lease.ID)
}
}

// StringIsInStringSlice returns true if the given string is in the slice of strings
func StringIsInStringSlice(s string, strs []string) (resp bool) {
for _, i := range strs {
if s == i {
resp = true
break
}
}
return
}
2 changes: 0 additions & 2 deletions embetcd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package embetcd

import (
"context"
"crypto/tls"
"time"

cli "github.com/coreos/etcd/clientv3"
Expand Down Expand Up @@ -31,7 +30,6 @@ func (c *Config) GetClientFromConfig(ctx context.Context) (*Client, error) {
return NewClient(cli.Config{
Endpoints: c.InitialCluster,
DialTimeout: DurationOrDefault(c.DialTimeout, DefaultDialTimeout),
TLS: &tls.Config{InsecureSkipVerify: true}, // insecure for now
AutoSyncInterval: DurationOrDefault(c.AutoSyncInterval, DefaultAutoSyncInterval),
Context: ctx, // pass in the context so the temp client closes with a cancelled context
})
Expand Down
169 changes: 110 additions & 59 deletions embetcd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
// DefaultUnhealthyTTL is the grace period to wait before removing an unhealthy member
DefaultUnhealthyTTL = time.Second * 15
// DefaultCleanUpInterval is the interval at which to poll for the health of the cluster
DefaultCleanUpInterval = time.Second * 15
DefaultCleanUpInterval = time.Second * 10
// DefaultStartUpGracePeriod is the graceperiod to wait for new cluster members to startup
// before they're subject to health checks
DefaultStartUpGracePeriod = time.Second * 60
Expand Down Expand Up @@ -159,16 +159,9 @@ func (s *Server) IsRunning() bool {
return s.isRunning()
}

// prepare a new cluster
func (s *Server) prepareForNewCluster(ctx context.Context) (err error) {
s.config.Config.InitialCluster = s.config.InitialClusterFromName(s.config.Name)
return err
}

// prepare for an existing cluster
func (s *Server) prepareForExistingCluster(ctx context.Context) (err error) {
// create a temporary client
func getClusterClientWithServerNamespace(ctx context.Context, cfg *Config) (*Client, error) {
var tempcli *Client
var err error
defer CloseClient(tempcli)

// get an etcdclient to the cluster using the config file
Expand All @@ -177,42 +170,127 @@ func (s *Server) prepareForExistingCluster(ctx context.Context) (err error) {
CloseClient(tempcli)

// create the client
tempcli, err = s.config.GetClientFromConfig(ctx)
tempcli, err = cfg.GetClientFromConfig(ctx)
if err == nil {
// set up the temp cli for the cluster namespace
setupClusterNamespace(tempcli)
break
}
}
return tempcli, err
}

// check for conflicting server names
if err == nil {
var clusterName string
if clusterName, err = tempcli.clusterName(ctx); err != nil || (clusterName != "" && clusterName != s.config.ClusterName) {
err = ErrClusterNameConflict
func (s *Server) startAsJoiner(ctx context.Context, cfg *Config, tempcli *Client) (err error) {
var timeout context.Context
var cancel context.CancelFunc
defer CancelContext(cancel)

// continually try to prepare and start the server
for ctx.Err() == nil {
CloseServer(s)

// remove old data directory
os.RemoveAll(cfg.Dir)

// Ensure that the server name does not already exist in the cluster.
// We want to ensure uniquely named cluster members.
// If this member died and is trying to rejoin, we want to retry until
// the cluster removes it or our parent context expires.
timeout, cancel = context.WithTimeout(ctx, DurationOrDefault(cfg.DialTimeout, DefaultDialTimeout))
_, err = tempcli.serverNameConflicts(timeout, cfg.Name)
cancel()

// check for conflicts, get peer urls, and announce that we're joining the cluster
if err == nil {
serverName := cfg.Name
s.config.Config.InitialCluster, err = tempcli.getServerPeers(ctx, cfg.InitialClusterFromName(cfg.Name), &serverName, cfg.APUrls, cfg.DialTimeout)
}

// Announce only once to the cluster that we're going to add this server.
// this offers some protection for errors while joining an existing one node cluster.
// If we announce this new node and then need to denounce it, we won't be able to because of quorum violations
// the cluster is configured to 2 members, but only 1 is started. Removing 1 member from a 2 node cluster
// breaks quorum.
if err == nil {
timeout, cancel = context.WithTimeout(ctx, DurationOrDefault(cfg.DialTimeout, DefaultDialTimeout))
var isIn bool
isIn, err = tempcli.arePeerURLSInCluster(timeout, cfg.APUrls)
cancel()
if err == nil && !isIn {
timeout, cancel = context.WithTimeout(ctx, DurationOrDefault(cfg.DialTimeout, DefaultDialTimeout))
_, err = tempcli.MemberAdd(timeout, URLSToStringSlice(cfg.APUrls))
cancel()
}
}

// start the server
if err == nil {
s.Etcd, err = embed.StartEtcd(cfg.Config)
}

// wait for the server to be ready or error out
if err == nil && s.Etcd != nil {
err = WaitForStructChOrErrCh(ctx, s.Etcd.Server.ReadyNotify(), s.Etcd.Err())
}

// break the loop if successful
if err == nil {
break
}
}
return err
}

// starts the etcd server and joins an existing cluster
func (s *Server) join(ctx context.Context, cfg *Config) (err error) {
// create a temporary client
var tempcli *Client
defer CloseClient(tempcli)

tempcli, err = getClusterClientWithServerNamespace(ctx, cfg)

// get the peer address string for joining the cluster
if err == nil {
s.config.Config.InitialCluster, err = tempcli.getServerPeers(ctx, s.config.InitialClusterFromName(s.config.Name))
// check for conflicting cluster names
var clusterName string
if clusterName, err = tempcli.clusterName(ctx); err == nil && (clusterName != "" && clusterName != s.config.ClusterName) {
err = ErrClusterNameConflict
return
}
}

// announce to the cluster that we're going to add this server
// start as a joiner
if err == nil {
err = tempcli.addMemberToExistingCluster(ctx, s.config.Name, s.config.APUrls)
err = s.startAsJoiner(ctx, cfg, tempcli)
}

return err
}

// prepare will either prepare the server to start a new cluster or join an existing cluster
func (s *Server) prepare(ctx context.Context) (err error) {
// prepare the server to start
if s.config.ClusterState == embed.ClusterStateFlagNew {
err = s.prepareForNewCluster(ctx)
} else {
err = s.prepareForExistingCluster(ctx)
// starts the etcd server as a seed node
func (s *Server) seed(ctx context.Context, cfg *Config) (err error) {
for ctx.Err() == nil {
CloseServer(s)

// remove old data directory
os.RemoveAll(cfg.Dir)

// set the initial cluster string
s.config.Config.InitialCluster = s.config.InitialClusterFromName(s.config.Name)

// start the server
if err == nil {
s.Etcd, err = embed.StartEtcd(cfg.Config)
}

// wait for the server to be ready or error out
if err == nil && s.Etcd != nil {
err = WaitForStructChOrErrCh(ctx, s.Etcd.Server.ReadyNotify(), s.Etcd.Err())
}

// break the loop if successful
if err == nil {
break
}
}
return err
}
Expand All @@ -228,27 +306,6 @@ func (s *Server) startupValidation(cfg *Config) error {
return cfg.Validate()
}

// start starts the etcd server after it has been prepared and config has been validated
// it will retry starting the etcd server until the context is cancelled
func (s *Server) start(ctx context.Context, cfg *Config) (err error) {
// retry starting the etcd server until it succeeds
for ctx.Err() == nil {
CloseServer(s)

// remove the data dir because we require each server to be completely removed
// from the cluster before we can rejoin
// TODO: if we ever use snapshotting or want to restore a cluster this will need to be revised
os.RemoveAll(cfg.Dir)

// create a context for this server
s.Etcd, err = embed.StartEtcd(cfg.Config)
if err == nil {
break
}
}
return err
}

// Start starts the server with the given config
func (s *Server) Start(ctx context.Context, cfg *Config) (err error) {
s.mutex.Lock()
Expand All @@ -262,17 +319,11 @@ func (s *Server) Start(ctx context.Context, cfg *Config) (err error) {
// save the config to the server for reference
s.config = cfg

// prepare the server to either start as a new cluster or join an existing cluster
err = s.prepare(ctx)

// start the server
if err == nil {
err = s.start(ctx, cfg)
}

// wait for the server to be ready or error out
if err == nil && s.Etcd != nil {
err = WaitForStructChOrErrCh(ctx, s.Etcd.Server.ReadyNotify(), s.Etcd.Err())
// start the etcd server as either a seeder or a joiner
if s.config.ClusterState == embed.ClusterStateFlagNew {
err = s.seed(ctx, cfg)
} else {
err = s.join(ctx, cfg)
}

// set the cluster name now that the cluster has started without error
Expand Down

0 comments on commit 3f369f7

Please sign in to comment.