Skip to content

Commit

Permalink
General enhancements to improve stability (#6)
Browse files Browse the repository at this point in the history
* add helper functions to close servers and clients with nil checks

* add loggging for open file descirptor usage to some tests

* ensure clients are always closed and fix deadlock on unsuccessful startup

* install lsof before running gobuild test
  • Loading branch information
charless-splunk committed Apr 9, 2019
1 parent 570fa16 commit dbd1020
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 35 deletions.
6 changes: 6 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ jobs:
- restore_cache:
key: goexecutor-cache-{{ .Branch }}-{{ .Revision }}
- checkout
- run:
name: "apt-get update"
command: apt-get update
- run:
name: "install lsof"
command: apt-get install lsof
- run:
name: "gobuild test"
command: gobuild test
Expand Down
15 changes: 15 additions & 0 deletions embetcd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,18 @@ func URLSToStringSlice(urls []url.URL) []string {
}
return strs
}

// CloseServer closes an embetcd server with nil checks
func CloseServer(s *Server) {
// close etcd server if it was improperly created in previous loop iterations
if s != nil && s.Etcd != nil {
s.Etcd.Close()
}
}

// CloseClient closes an embetcd client with nil checks
func CloseClient(client *Client) {
if client != nil && client.Client != nil {
client.Close()
}
}
80 changes: 47 additions & 33 deletions embetcd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,14 @@ func (s *Server) prepareForNewCluster(ctx context.Context) (err error) {
func (s *Server) prepareForExistingCluster(ctx context.Context) (err error) {
// create a temporary client
var tempcli *Client
defer CloseClient(tempcli)

// get an etcdclient to the cluster using the config file
for ctx.Err() == nil {
// close the temporary client if it was created in a previous iteration of the loop
CloseClient(tempcli)

// create the client
tempcli, err = s.config.GetClientFromConfig(ctx)
if err == nil {
// set up the temp cli for the cluster namespace
Expand Down Expand Up @@ -284,6 +289,8 @@ func (s *Server) startupValidation(cfg *Config) error {
func (s *Server) start(ctx context.Context, cfg *Config) (err error) {
// retry starting the etcd server until it succeeds
for ctx.Err() == nil {
CloseServer(s)

// remove the data dir because we require each server to be completely removed
// from the cluster before we can rejoin
// TODO: if we ever use snapshotting or want to restore a cluster this will need to be revised
Expand Down Expand Up @@ -339,7 +346,7 @@ func (s *Server) Start(ctx context.Context, cfg *Config) (err error) {
// This is a dedicated function for test coverage purposes.
func (s *Server) cleanUpStart(err error) {
if err != nil && s.isRunning() {
s.Shutdown(context.Background())
s.shutdown(context.Background())
}
}

Expand All @@ -353,7 +360,7 @@ func memberKeyRoutine(ctx context.Context, client *Client, lease *cli.LeaseGrant
client.Revoke(context.Background(), lease.ID)

// close the client
client.Close()
CloseClient(client)
}

// errorHandlerRoutine waits for errors to occur and attempts
Expand Down Expand Up @@ -425,7 +432,7 @@ func (s *Server) cleanCluster(ctx context.Context, members *Members, client *Cli
// clusterCleanupRoutine iteratively checks member health and removes bad members
func (s *Server) clusterCleanupRoutine(ctx context.Context, stopCh <-chan struct{}, ttl *time.Duration, cleanUpInterval *time.Duration, memberRemoveTimeout *time.Duration, gracePeriod *time.Duration, client *Client) {
// close the client on exit
defer client.Close()
defer CloseClient(client)

// set up ticker
ticker := time.NewTicker(DurationOrDefault(cleanUpInterval, DefaultCleanUpInterval))
Expand Down Expand Up @@ -527,7 +534,7 @@ func (s *Server) waitForShutdown(ctx context.Context, done chan struct{}) (err e
if s != nil && s.Etcd != nil && s.Etcd.Server != nil {
s.Server.HardStop()
// invoke close after hard stop to free up what ever port we're bound too
s.Close()
CloseServer(s)
}
}
case <-done:
Expand Down Expand Up @@ -571,47 +578,47 @@ func (s *Server) removeSelfFromCluster(ctx context.Context) (err error) {
}
}

// use a temporary client to try removing ourselves from the cluster
var tempcli *Client
if tempcli, err = s.getNamespacedClient(ctx, endpoints); tempcli != nil {
defer tempcli.Close()
}
defer CloseClient(tempcli)

// loop while the context hasn't closed
for ctx.Err() == nil {
// close the client if it existed from a previous loop iteration
CloseClient(tempcli)

// create a child context with its own timeout
timeout, cancel := context.WithTimeout(ctx, DurationOrDefault(s.config.DialTimeout, DefaultDialTimeout))

// use the temporary client to try removing ourselves from the cluster
var unlock func(context.Context) error
if unlock, err = tempcli.Lock(timeout, s.Server.Cfg.Name); err == nil {
_, err = tempcli.MemberRemove(ctx, uint64(s.Server.ID()))
unlock(timeout)
// mask the member not found err because it could mean a cluster clean up routine cleaned us up already
if err == nil || err == rpctypes.ErrMemberNotFound {
err = nil
cancel()
break
}
}
// use a temporary client to try removing ourselves from the cluster
tempcli, err = s.getNamespacedClient(ctx, endpoints)

// wait for the until timeout to try again
<-timeout.Done()
if err == nil {
// create a child context with its own timeout
timeout, cancel := context.WithTimeout(ctx, DurationOrDefault(s.config.DialTimeout, DefaultDialTimeout))

// use the temporary client to try removing ourselves from the cluster
var unlock func(context.Context) error
if unlock, err = tempcli.Lock(timeout, s.Server.Cfg.Name); err == nil {
_, err = tempcli.MemberRemove(ctx, uint64(s.Server.ID()))
unlock(timeout)
// mask the member not found err because it could mean a cluster clean up routine cleaned us up already
if err == nil || err == rpctypes.ErrMemberNotFound {
err = nil
cancel()
break
}
}

// cancel the timeout context
cancel()
// wait for the timeout to try again
<-timeout.Done()

// cancel the timeout context
cancel()
}
}

return err
}

// Shutdown shuts down the server with a cancelable context
func (s *Server) Shutdown(ctx context.Context) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()

// shutdown shuts down the server with a cancelable context and without locking
func (s *Server) shutdown(ctx context.Context) (err error) {
if !s.isRunning() {
return ErrAlreadyStopped
}
Expand All @@ -633,7 +640,7 @@ func (s *Server) Shutdown(ctx context.Context) (err error) {
// if the etcd server stalls while shutting down or exceeds the shutdown context
go func() {
// close the server and signals routines to stop
s.Close()
CloseServer(s)

// wait for the running routines to stop
s.routineWg.Wait()
Expand All @@ -649,6 +656,13 @@ func (s *Server) Shutdown(ctx context.Context) (err error) {
return err
}

// Shutdown shuts down the server with a cancelable context
func (s *Server) Shutdown(ctx context.Context) (err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.shutdown(ctx)
}

// New returns a new etcd Server
func New() *Server {
return &Server{mutex: sync.RWMutex{}}
Expand Down
35 changes: 33 additions & 2 deletions embetcd/server_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package embetcd

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"net/url"
"os"
"os/exec"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -506,6 +509,8 @@ func TestNew(t *testing.T) {
}

// shutdown
t.Logf("Shutting down server %s. There are %d open file descriptors open.", server.config.Name, openFDCount(t))

if err := server.Shutdown(ctx); err != nil && err != etcdserver.ErrNotEnoughStartedMembers && err != rpctypes.ErrMemberNotEnoughStarted && err != rpctypes.ErrGRPCMemberNotEnoughStarted {
t.Errorf("error while closing etccd server '%s' %v", server.Config().Name, err)
} else {
Expand All @@ -522,9 +527,10 @@ func TestNew(t *testing.T) {
t.Errorf("Already stopped server did not return ErrAlreadyStopped when stop was called: %v", server)
}

t.Logf("Shutdown server %s. There are %d open file descriptors open.", server.config.Name, openFDCount(t))

// cleanUpStart should just run
server.cleanUpStart(fmt.Errorf("bogus error"))

cancel()
}
})
Expand Down Expand Up @@ -668,10 +674,13 @@ func TestMemberHaltsAndIsReAdded(t *testing.T) {
}

// stop a server abruptly
t.Log("Killing server in cluster: ", servers[0].Config().Name, servers[0].Server.ID())
t.Logf("Killing server in cluster: %s %v. There are %d open file descriptors.", servers[0].Config().Name, servers[0].Server.ID(), openFDCount(t))

servers[0].Server.HardStop() // hard stop does not clear the listener
servers[0].Close() // invoking close is what ultimately stops the etcd server from hogging the port

t.Logf("Kiled server: %s %v. There are %d open file descriptors.", servers[0].Config().Name, servers[0].Server.ID(), openFDCount(t))

// remove the server from the list of servers
servers = servers[1:]

Expand Down Expand Up @@ -915,3 +924,25 @@ func TestConfig_GetClientFromConfig(t *testing.T) {
t.Errorf("Config.GetClientFromConfig() want = (%v) got = (%v)", cli.ErrNoAvailableEndpoints, err)
}
}

// openFDCount returns the number of open file descriptors for the running process.
// This function is based on content from https://groups.google.com/forum/#!topic/golang-nuts/c0AnWXjzNIA
func openFDCount(t *testing.T) (lines int) {
if runtime.GOOS == "darwin" || runtime.GOOS == "linux" {
// get the pid for this process
pid := os.Getpid()

// build the command
lsofCmd := fmt.Sprintf("lsof -p %v", pid)

// execute the command in a shell and get the output
out, err := exec.Command("/bin/sh", "-c", lsofCmd).Output()
if err != nil {
t.Errorf("a problem occurred while checking the number of open file descriptors %v", err)
}

// count the number of lines returned from the lsof command
lines = bytes.Count(out, []byte("\n"))
}
return lines
}

0 comments on commit dbd1020

Please sign in to comment.