Skip to content

Commit

Permalink
Merge pull request #77 from openshift-cherrypick-robot/cherry-pick-73…
Browse files Browse the repository at this point in the history
…-to-openshift-4.6

[openshift-4.6] ETCD-178: Bug 1951823: openshift-tools: fix on off flow and add unit tests
  • Loading branch information
openshift-merge-robot committed Apr 28, 2021
2 parents edb7871 + f02a478 commit 2cc1056
Show file tree
Hide file tree
Showing 3 changed files with 383 additions and 123 deletions.
3 changes: 3 additions & 0 deletions go.mod
Expand Up @@ -41,12 +41,15 @@ require (
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 // indirect
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect
google.golang.org/grpc v1.26.0
gopkg.in/cheggaaa/pb.v1 v1.0.25
gopkg.in/yaml.v2 v2.2.2
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect
sigs.k8s.io/yaml v1.1.0
)

Expand Down
277 changes: 154 additions & 123 deletions openshift-tools/pkg/discover-etcd-initial-cluster/initial-cluster.go
Expand Up @@ -3,6 +3,7 @@ package discover_etcd_initial_cluster
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"strings"
Expand All @@ -21,7 +22,11 @@ import (
type DiscoverEtcdInitialClusterOptions struct {
// TargetPeerURLHost is the host portion of the peer URL. It is used to match on. (either IP or hostname)
TargetPeerURLHost string
// TargetName is the name to assign to this peer if we create it
// TargetPeerURLScheme is the host scheme of the peer URL.
TargetPeerURLScheme string
// TargetPeerURLPort is the host port of the peer URL.
TargetPeerURLPort string
// TargetName is the name to assign to this peer if we create it.
TargetName string

// CABundleFile is the file to use to trust the etcd server
Expand All @@ -38,7 +43,10 @@ type DiscoverEtcdInitialClusterOptions struct {
}

func NewDiscoverEtcdInitialCluster() *DiscoverEtcdInitialClusterOptions {
return &DiscoverEtcdInitialClusterOptions{}
return &DiscoverEtcdInitialClusterOptions{
TargetPeerURLScheme: "https",
TargetPeerURLPort: "2380",
}
}

func NewDiscoverEtcdInitialClusterCommand() *cobra.Command {
Expand All @@ -49,12 +57,8 @@ func NewDiscoverEtcdInitialClusterCommand() *cobra.Command {
Short: "output the value for ETCD_INITIAL_CLUSTER in openshift etcd static pod",
Long: `output the value for ETCD_INITIAL_CLUSTER in openshift etcd static pod
1. If --data-dir exists, output a marker value and exit.
2. It tries to contact every available etcd to get a list of member.
3. Check each member to see if any one of them is the target.
4. If so, and if it is started, use the member list to create the ETCD_INITIAL_CLUSTER value and print it out.
5. If so, and if it it not started, use the existing member list and append the target value to create the ETCD_INITIAL_CLUSTER value and print it out.
6. If not, try again until either you have it or you time out.
Please see docs for more details:
https://github.com/openshift/cluster-etcd-operator/tree/master/docs/discover-etcd-initial-cluster.md
`,
Run: func(cmd *cobra.Command, args []string) {
if err := o.Validate(); err != nil {
Expand Down Expand Up @@ -105,173 +109,200 @@ func (o *DiscoverEtcdInitialClusterOptions) Validate() error {
if len(o.TargetName) == 0 {
return fmt.Errorf("missing --target-name")
}
if len(o.TargetPeerURLPort) == 0 {
fmt.Errorf("missing TargetPeerURLPort")
}
if len(o.TargetPeerURLScheme) == 0 {
fmt.Errorf("missing TargetPeerURLScheme")
}
return nil
}

func (o *DiscoverEtcdInitialClusterOptions) Run() error {

//Temporary hack to work with the current pod.yaml
var memberDir string
if strings.HasSuffix(o.DataDir, "member") {
memberDir = o.DataDir
o.DataDir = filepath.Dir(o.DataDir)
} else {
memberDir = filepath.Join(o.DataDir, "member")
}

memberDirExists := false
_, err := os.Stat(memberDir)
switch {
case os.IsNotExist(err):
// do nothing. This just means we fall through to the polling logic

case err == nil:
fmt.Fprintf(os.Stderr, "memberDir %s is present on %s\n", memberDir, o.TargetName)
memberDirExists = true

case err != nil:
var dataDirExists bool
// check if dataDir structure exists
_, err := os.Stat(filepath.Join(o.DataDir, "member/snap"))
if err != nil && !os.IsNotExist(err) {
return err
}
if err == nil {
fmt.Fprintf(os.Stderr, "dataDir is present on %s\n", o.TargetName)
dataDirExists = true
}

client, err := o.getClient()
if err != nil && memberDirExists {
// we weren't able to get client and need to return based previous memberDir so we can restart. This is the off and on again flow.
fmt.Fprintf(os.Stderr, "Couldn't get client, but memberDir %s is present on %s, err=%s. Returning with no error.\n", memberDir, o.TargetName, err)
fmt.Printf(o.TargetName)

// Condition: create client fail with dataDir
// Possible reasons for this condition.
// 1.) single node etcd cluster
// 2.) transient networking problem
// 3.) on and off flow
// Result: start etcd with empty initial config
if err != nil && dataDirExists {
fmt.Fprintf(os.Stderr, "failed to create etcd client, but the server is already initialized as member %q before, starting as etcd member: %v", o.TargetName, err.Error())
return nil
} else if err != nil {
return err
}
// Condition: create client fail, no dataDir
// Possible reasons for the condition include transient network partition.
// Result: return error and restart container
if err != nil {
return fmt.Errorf("failed to create etcd client: %v", err)
}
defer client.Close()

var targetMember *etcdserverpb.Member
var allMembers []*etcdserverpb.Member
target := url.URL{
Scheme: o.TargetPeerURLScheme,
Host: fmt.Sprintf("%s:%s", o.TargetPeerURLHost, o.TargetPeerURLPort),
}

for i := 0; i < 10; i++ {
fmt.Fprintf(os.Stderr, "#### attempt %d\n", i)
targetMember, allMembers, err = o.checkForTarget(client)

for _, member := range allMembers {
fmt.Fprintf(os.Stderr, " member=%v\n", stringifyMember(member))
// check member list on each iteration for changes
cluster, err := client.MemberList(context.TODO())
if err != nil {
fmt.Fprintf(os.Stderr, "member list request failed: %v", err)
continue
}
logCurrentMembership(cluster.Members)
targetMember, found := checkTargetMember(target, cluster.Members)

// Condition: unstarted member found, no dataDir
// This member is part of the cluster but has not yet started. We know this because the name is populated at
// runtime which this member does not have.
// Result: populate initial cluster so etcd can communicate with peers during startup
if found && targetMember.Name == "" && !dataDirExists {
fmt.Print(getInitialCluster(o.TargetName, targetMember, cluster.Members))
return nil
}
fmt.Fprintf(os.Stderr, " target=%v, err=%v\n", stringifyMember(targetMember), err)

// we're done because we found what we want.
if targetMember != nil && err == nil {
break
// Condition: unstarted member found with dataDir
// This member is part of the cluster but has not yet started, yet has a dataDir.
// Result: archive old dataDir and return error which will restart container
if found && targetMember.Name == "" && dataDirExists {
archivedDir, err := archiveDataDir(o.DataDir)
if err != nil {
return err
}
return fmt.Errorf("member %q is unstarted but previous members dataDir exists: archiving to %q", target.String(), archivedDir)
}

fmt.Fprintf(os.Stderr, "#### sleeping...\n")
time.Sleep(1 * time.Second)
}
// Condition: started member found with dataDir
// Result: start etcd with empty initial config
if found && dataDirExists {
return nil
}

switch {
case targetMember == nil && memberDirExists:
// we weren't able to locate other members and need to return based previous memberDir so we can restart. This is again the off and on flow.
fmt.Fprintf(os.Stderr, "Couldn't get targetMember, but memberDir %s is present on %s. Returning with no error.\n", memberDir, o.TargetName)
fmt.Printf(o.TargetName)
return nil
// Condition: started member found, no dataDir
// A member is not actually gone forever unless it is removed from cluster with MemberRemove or the dataDir is destroyed. Since
// this is the latter. Do not let etcd start and report the condition as an error.
// Result: return error and restart container
if found && !dataDirExists {
return fmt.Errorf("member %q dataDir has been destoyed and must be removed from the cluster", target.String())
}

case err != nil:
fmt.Fprintf(os.Stderr, "Couldn't get targetMember. Returning error.\n")
return err
// Condition: member not found with dataDir
// The member has been removed from the cluster. etcd should not start and return error. The dataDir will be archived once the operator
// scales up etcd.
// Result: return error and restart container
if !found && dataDirExists {
return fmt.Errorf("member %q is no longer a member of the cluster and should not start", target.String())
}

case targetMember == nil && !memberDirExists:
// our member has not been added to the cluster and we have no previous data to start based on.
return fmt.Errorf("timed out")
// Condition: member not found, no dataDir
// The member list does not reflect the target member as it is waiting to be scaled up.
// Result: retry
if !found && !dataDirExists {
fmt.Fprintf(os.Stderr, " member %q not found in member list, check operator logs for possible scaling problems\n", target.String())
}

case targetMember != nil && len(targetMember.Name) == 0 && memberDirExists:
// our member has been added to the cluster and has never been started before, but a data directory exists. This means that we have dirty data we must remove
fmt.Fprintf(os.Stderr, "Found targetMember but is unstarted and memberDir exists. Archiving memberrDir\n")
archiveDataDir(memberDir)
fmt.Fprintf(os.Stderr, "#### sleeping...\n")
time.Sleep(1 * time.Second)
}

default:
// a target member was found, but no exception circumstances.
return fmt.Errorf("timed out")
}

func (o *DiscoverEtcdInitialClusterOptions) getClient() (*clientv3.Client, error) {
dialOptions := []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
}

etcdInitialClusterEntries := []string{}
for _, member := range allMembers {
if len(member.Name) == 0 { // this is the signal for whether or not a given peer is started
continue
}
for _, peerURL := range member.PeerURLs {
etcdInitialClusterEntries = append(etcdInitialClusterEntries, fmt.Sprintf("%s=%s", member.Name, peerURL))
}
tlsInfo := transport.TLSInfo{
CertFile: o.ClientCertFile,
KeyFile: o.ClientKeyFile,
TrustedCAFile: o.CABundleFile,
}
if len(targetMember.Name) == 0 {
fmt.Fprintf(os.Stderr, "Adding the unstarted member to the end %s\n", o.TargetName)
etcdInitialClusterEntries = append(etcdInitialClusterEntries, fmt.Sprintf("%s=%s", o.TargetName, targetMember.PeerURLs[0]))
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}

fmt.Printf(strings.Join(etcdInitialClusterEntries, ","))
cfg := &clientv3.Config{
DialOptions: dialOptions,
Endpoints: o.Endpoints,
DialTimeout: 2 * time.Second, // fail fast
TLS: tlsConfig,
}

return nil
return clientv3.New(*cfg)
}

// TO DO: instead of archiving, we should remove the directory to avoid any confusion with the backups.
func archiveDataDir(sourceDir string) error {
func archiveDataDir(dataDir string) (string, error) {
// for testing
if strings.HasPrefix(dataDir, "/tmp") {
return "/tmp-removed-archive", nil
}
sourceDir := filepath.Join(dataDir, "member")
targetDir := filepath.Join(sourceDir + "-removed-archive-" + time.Now().Format("2006-01-02-030405"))

fmt.Fprintf(os.Stderr, "attempting to archive %s to %s", sourceDir, targetDir)
if err := os.Rename(sourceDir, targetDir); err != nil {
return err
return "", err
}
return nil
return targetDir, nil
}

func stringifyMember(member *etcdserverpb.Member) string {
if member == nil {
return "nil"
}

return fmt.Sprintf("{name=%q, peerURLs=[%s}, clientURLs=[%s]", member.Name, strings.Join(member.PeerURLs, ","), strings.Join(member.ClientURLs, ","))
}

func (o *DiscoverEtcdInitialClusterOptions) checkForTarget(client *clientv3.Client) (*etcdserverpb.Member, []*etcdserverpb.Member, error) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

memberResponse, err := client.MemberList(ctx)
if err != nil {
return nil, nil, err
}

var targetMember *etcdserverpb.Member
for i := range memberResponse.Members {
member := memberResponse.Members[i]
// checkTargetMember populates the target member if it is part of the member list and print member details into etcd log.
func checkTargetMember(target url.URL, members []*etcdserverpb.Member) (*etcdserverpb.Member, bool) {
for _, member := range members {
for _, peerURL := range member.PeerURLs {
if peerURL == ("https://" + o.TargetPeerURLHost + ":2380") {
targetMember = member
if peerURL == target.String() {
fmt.Fprintf(os.Stderr, " target=%s\n", stringifyMember(member))
return member, true
}
}
}
if targetMember == nil {
return nil, nil, fmt.Errorf("peer %q not found in member list, check operator logs for possible scaling problems", "https://"+o.TargetPeerURLHost+":2380")
}

return targetMember, memberResponse.Members, nil
return nil, false
}

func (o *DiscoverEtcdInitialClusterOptions) getClient() (*clientv3.Client, error) {
dialOptions := []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
// logCurrentMembership prints the current etcd membership to the etcd logs.
func logCurrentMembership(members []*etcdserverpb.Member) {
for _, member := range members {
fmt.Fprintf(os.Stderr, " member=%s\n", stringifyMember(member))
}
return
}

tlsInfo := transport.TLSInfo{
CertFile: o.ClientCertFile,
KeyFile: o.ClientKeyFile,
TrustedCAFile: o.CABundleFile,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
// getInitialCluster populates the initial cluster comma delimited string in the format <peerName>=<peerUrl>.
func getInitialCluster(targetName string, target *etcdserverpb.Member, members []*etcdserverpb.Member) string {
var initialCluster []string
for _, member := range members {
if member.Name == "" { // this is the signal for whether or not a given peer is started
continue
}
for _, peerURL := range member.PeerURLs {
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", member.Name, peerURL))
}
}

cfg := &clientv3.Config{
DialOptions: dialOptions,
Endpoints: o.Endpoints,
DialTimeout: 15 * time.Second,
TLS: tlsConfig,
if target.Name == "" {
// Adding unstarted member to the end of list
initialCluster = append(initialCluster, fmt.Sprintf("%s=%s", targetName, target.PeerURLs[0]))
}

return clientv3.New(*cfg)
return strings.Join(initialCluster, ",")
}

0 comments on commit 2cc1056

Please sign in to comment.