From 52f6bc9e17c9287d6c7dba0f6fb562f939770e38 Mon Sep 17 00:00:00 2001 From: David Eads Date: Thu, 20 Feb 2020 17:04:39 -0500 Subject: [PATCH] codify the initial cluster check as golang code --- .../initial-cluster.go | 182 +++++++++++++++--- 1 file changed, 158 insertions(+), 24 deletions(-) diff --git a/openshift-tools/pkg/discover-etcd-initial-cluster/initial-cluster.go b/openshift-tools/pkg/discover-etcd-initial-cluster/initial-cluster.go index 0e9a8166f31..736a14a3338 100644 --- a/openshift-tools/pkg/discover-etcd-initial-cluster/initial-cluster.go +++ b/openshift-tools/pkg/discover-etcd-initial-cluster/initial-cluster.go @@ -1,10 +1,19 @@ package discover_etcd_initial_cluster import ( + "context" "fmt" "os" + "strings" "time" + "github.com/coreos/etcd/etcdserver/etcdserverpb" + + "github.com/coreos/etcd/pkg/transport" + "google.golang.org/grpc" + + "github.com/coreos/etcd/clientv3" + "github.com/spf13/pflag" "github.com/spf13/cobra" @@ -13,6 +22,8 @@ import ( type DiscoverEtcdInitialClusterOptions struct { // TargetPeerURLHost is the host portion of the peer URL. It is used to match on. (either IP or hostname) TargetPeerURLHost string + // TargetName is the name to assign to this peer if we create it + TargetName string // CABundleFile is the file to use to trust the etcd server CABundleFile string @@ -21,24 +32,14 @@ type DiscoverEtcdInitialClusterOptions struct { // ClientKeyFile is the client key to use to authenticate this binary to etcd ClientKeyFile string // Endpoints is a list of all the endpoints to use to try to contact etcd - Endpoints string + Endpoints []string - // Revision is the revision value for the static pod - Revision string - // PreviousEtcdInitialClusterDir is the directory to store the previous etcd initial cluster value - PreviousEtcdInitialClusterDir string - - // TotalTimeToWait is the total time to wait before reporting failure and dumping logs - TotalTimeToWait time.Duration - // TimeToWaitBeforeUsingPreviousValue is the time to wait before checking to see if we have a previous value. - TimeToWaitBeforeUsingPreviousValue time.Duration + // MemberDir is the directory created when etcd starts the first time + MemberDir string } func NewDiscoverEtcdInitialCluster() *DiscoverEtcdInitialClusterOptions { - return &DiscoverEtcdInitialClusterOptions{ - TotalTimeToWait: 30 * time.Second, - TimeToWaitBeforeUsingPreviousValue: 10 * time.Second, - } + return &DiscoverEtcdInitialClusterOptions{} } func NewDiscoverEtcdInitialClusterCommand() *cobra.Command { @@ -49,13 +50,12 @@ func NewDiscoverEtcdInitialClusterCommand() *cobra.Command { Short: "output the value for ETCD_INITIAL_CLUSTER in openshift etcd static pod", Long: `output the value for ETCD_INITIAL_CLUSTER in openshift etcd static pod -1. It tries to contact every available etcd to get a list of member. -2. Check each member to see if any one of them is the target. -3. If so, and if it is started, use the member list to create the ETCD_INITIAL_CLUSTER value and print it out. -4. If so, and if it it not started, use the existing member list and append the target value to create the ETCD_INITIAL_CLUSTER value and print it out. -5. If not, try again until either you have it or you have to check a cache. -6. If you have to check a cache and it is present, return -7. If the cache is not present, keep trying to contact etcd until total timeout is met. +1. If --data-dir exists, output a marker value and exit. +2. It tries to contact every available etcd to get a list of member. +3. Check each member to see if any one of them is the target. +4. If so, and if it is started, use the member list to create the ETCD_INITIAL_CLUSTER value and print it out. +5. If so, and if it it not started, use the existing member list and append the target value to create the ETCD_INITIAL_CLUSTER value and print it out. +6. If not, try again until either you have it or you time out. `, Run: func(cmd *cobra.Command, args []string) { if err := o.Validate(); err != nil { @@ -78,15 +78,149 @@ func (o *DiscoverEtcdInitialClusterOptions) BindFlags(flags *pflag.FlagSet) { flags.StringVar(&o.CABundleFile, "cacert", o.CABundleFile, "file to use to verify the identity of the etcd server") flags.StringVar(&o.ClientCertFile, "cert", o.ClientCertFile, "client cert to use to authenticate this binary to etcd") flags.StringVar(&o.ClientKeyFile, "key", o.ClientKeyFile, "client key to use to authenticate this binary to etcd") - flags.StringVar(&o.Endpoints, "endpoints", o.Endpoints, "list of all the endpoints to use to try to contact etcd") - flags.StringVar(&o.Revision, "revision", o.Revision, "revision value for the static pod") - flags.StringVar(&o.PreviousEtcdInitialClusterDir, "memory-dir", o.PreviousEtcdInitialClusterDir, "directory to store the previous etcd initial cluster value") + flags.StringSliceVar(&o.Endpoints, "endpoints", o.Endpoints, "list of all the endpoints to use to try to contact etcd") + flags.StringVar(&o.MemberDir, "data-dir", o.MemberDir, "file to stat for existence of /var/lib/etcd/member") + flags.StringVar(&o.TargetPeerURLHost, "target-peer-url-host", o.TargetPeerURLHost, "host portion of the peer URL. It is used to match on. (either IP or hostname)") + flags.StringVar(&o.TargetName, "target-name", o.TargetName, "name to assign to this peer if we create it") } func (o *DiscoverEtcdInitialClusterOptions) Validate() error { + if len(o.CABundleFile) == 0 { + return fmt.Errorf("missing --cacert") + } + if len(o.ClientCertFile) == 0 { + return fmt.Errorf("missing --cert") + } + if len(o.ClientKeyFile) == 0 { + return fmt.Errorf("missing --key") + } + if len(o.Endpoints) == 0 { + return fmt.Errorf("missing --endpoints") + } + if len(o.MemberDir) == 0 { + return fmt.Errorf("missing --data-dir") + } + if len(o.TargetPeerURLHost) == 0 { + return fmt.Errorf("missing --target-peer-url-host") + } + if len(o.TargetName) == 0 { + return fmt.Errorf("missing --target-name") + } return nil } func (o *DiscoverEtcdInitialClusterOptions) Run() error { + _, err := os.Stat(o.MemberDir) + switch { + case os.IsNotExist(err): + // do nothing. This just means we fall through to the polling logic + + case err == nil: + fmt.Printf(o.TargetName) + return nil + + case err != nil: + return err + } + + client, err := o.getClient() + if err != nil { + return err + } + defer client.Close() + + var targetMember *etcdserverpb.Member + var allMembers []*etcdserverpb.Member + for i := 0; i < 10; i++ { + fmt.Fprintf(os.Stderr, "#### attempt %d\n", i) + targetMember, allMembers, err = o.checkForTarget(client) + + for _, member := range allMembers { + fmt.Fprintf(os.Stderr, " member=%v\n", stringifyMember(member)) + } + fmt.Fprintf(os.Stderr, " target=%v, err=%v\n", stringifyMember(targetMember), err) + + // we're done because we found what we want. + if targetMember != nil && err == nil { + break + } + + fmt.Fprintf(os.Stderr, "#### sleeping...\n") + time.Sleep(1 * time.Second) + } + if err != nil { + return err + } + if targetMember == nil { + return fmt.Errorf("timed out") + } + + etcdInitialClusterEntries := []string{} + for _, member := range allMembers { + if len(member.Name) == 0 { // this is the signal for whether or not a given peer is started + continue + } + etcdInitialClusterEntries = append(etcdInitialClusterEntries, fmt.Sprintf("%s=%s", member.Name, member.PeerURLs[0])) + } + if len(targetMember.Name) == 0 { + etcdInitialClusterEntries = append(etcdInitialClusterEntries, fmt.Sprintf("%s=%s", o.TargetName, targetMember.PeerURLs[0])) + } + fmt.Printf(strings.Join(etcdInitialClusterEntries, ",")) + return nil } + +func stringifyMember(member *etcdserverpb.Member) string { + if member == nil { + return "nil" + } + + return fmt.Sprintf("{name=%q, peerURLs=[%s}, clientURLs=[%s]", member.Name, strings.Join(member.PeerURLs, ","), strings.Join(member.ClientURLs, ",")) +} + +func (o *DiscoverEtcdInitialClusterOptions) checkForTarget(client *clientv3.Client) (*etcdserverpb.Member, []*etcdserverpb.Member, error) { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + memberResponse, err := client.MemberList(ctx) + if err != nil { + return nil, nil, err + } + + var targetMember *etcdserverpb.Member + for i := range memberResponse.Members { + member := memberResponse.Members[i] + for _, peerURL := range member.PeerURLs { + if strings.Contains(peerURL, o.TargetPeerURLHost) { + targetMember = member + } + } + } + + return targetMember, memberResponse.Members, err +} + +func (o *DiscoverEtcdInitialClusterOptions) getClient() (*clientv3.Client, error) { + dialOptions := []grpc.DialOption{ + grpc.WithBlock(), // block until the underlying connection is up + } + + tlsInfo := transport.TLSInfo{ + CertFile: o.ClientCertFile, + KeyFile: o.ClientKeyFile, + TrustedCAFile: o.CABundleFile, + } + tlsConfig, err := tlsInfo.ClientConfig() + if err != nil { + return nil, err + } + + cfg := &clientv3.Config{ + DialOptions: dialOptions, + Endpoints: o.Endpoints, + DialTimeout: 15 * time.Second, + TLS: tlsConfig, + } + + return clientv3.New(*cfg) +}