Skip to content

Commit

Permalink
Merge pull request #28 from deads2k/etcd-initial-cluster-02-filecheck
Browse files Browse the repository at this point in the history
bug 1805807: create logic for golang ETCD_INITIAL_CLUSTER
  • Loading branch information
openshift-merge-robot committed Feb 21, 2020
2 parents dcbb2e8 + 52f6bc9 commit 60eed1d
Showing 1 changed file with 158 additions and 24 deletions.
182 changes: 158 additions & 24 deletions openshift-tools/pkg/discover-etcd-initial-cluster/initial-cluster.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package discover_etcd_initial_cluster

import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/coreos/etcd/etcdserver/etcdserverpb"

"github.com/coreos/etcd/pkg/transport"
"google.golang.org/grpc"

"github.com/coreos/etcd/clientv3"

"github.com/spf13/pflag"

"github.com/spf13/cobra"
Expand All @@ -13,6 +22,8 @@ import (
type DiscoverEtcdInitialClusterOptions struct {
// TargetPeerURLHost is the host portion of the peer URL. It is used to match on. (either IP or hostname)
TargetPeerURLHost string
// TargetName is the name to assign to this peer if we create it
TargetName string

// CABundleFile is the file to use to trust the etcd server
CABundleFile string
Expand All @@ -21,24 +32,14 @@ type DiscoverEtcdInitialClusterOptions struct {
// ClientKeyFile is the client key to use to authenticate this binary to etcd
ClientKeyFile string
// Endpoints is a list of all the endpoints to use to try to contact etcd
Endpoints string
Endpoints []string

// Revision is the revision value for the static pod
Revision string
// PreviousEtcdInitialClusterDir is the directory to store the previous etcd initial cluster value
PreviousEtcdInitialClusterDir string

// TotalTimeToWait is the total time to wait before reporting failure and dumping logs
TotalTimeToWait time.Duration
// TimeToWaitBeforeUsingPreviousValue is the time to wait before checking to see if we have a previous value.
TimeToWaitBeforeUsingPreviousValue time.Duration
// MemberDir is the directory created when etcd starts the first time
MemberDir string
}

func NewDiscoverEtcdInitialCluster() *DiscoverEtcdInitialClusterOptions {
return &DiscoverEtcdInitialClusterOptions{
TotalTimeToWait: 30 * time.Second,
TimeToWaitBeforeUsingPreviousValue: 10 * time.Second,
}
return &DiscoverEtcdInitialClusterOptions{}
}

func NewDiscoverEtcdInitialClusterCommand() *cobra.Command {
Expand All @@ -49,13 +50,12 @@ func NewDiscoverEtcdInitialClusterCommand() *cobra.Command {
Short: "output the value for ETCD_INITIAL_CLUSTER in openshift etcd static pod",
Long: `output the value for ETCD_INITIAL_CLUSTER in openshift etcd static pod
1. It tries to contact every available etcd to get a list of member.
2. Check each member to see if any one of them is the target.
3. If so, and if it is started, use the member list to create the ETCD_INITIAL_CLUSTER value and print it out.
4. If so, and if it it not started, use the existing member list and append the target value to create the ETCD_INITIAL_CLUSTER value and print it out.
5. If not, try again until either you have it or you have to check a cache.
6. If you have to check a cache and it is present, return
7. If the cache is not present, keep trying to contact etcd until total timeout is met.
1. If --data-dir exists, output a marker value and exit.
2. It tries to contact every available etcd to get a list of member.
3. Check each member to see if any one of them is the target.
4. If so, and if it is started, use the member list to create the ETCD_INITIAL_CLUSTER value and print it out.
5. If so, and if it it not started, use the existing member list and append the target value to create the ETCD_INITIAL_CLUSTER value and print it out.
6. If not, try again until either you have it or you time out.
`,
Run: func(cmd *cobra.Command, args []string) {
if err := o.Validate(); err != nil {
Expand All @@ -78,15 +78,149 @@ func (o *DiscoverEtcdInitialClusterOptions) BindFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.CABundleFile, "cacert", o.CABundleFile, "file to use to verify the identity of the etcd server")
flags.StringVar(&o.ClientCertFile, "cert", o.ClientCertFile, "client cert to use to authenticate this binary to etcd")
flags.StringVar(&o.ClientKeyFile, "key", o.ClientKeyFile, "client key to use to authenticate this binary to etcd")
flags.StringVar(&o.Endpoints, "endpoints", o.Endpoints, "list of all the endpoints to use to try to contact etcd")
flags.StringVar(&o.Revision, "revision", o.Revision, "revision value for the static pod")
flags.StringVar(&o.PreviousEtcdInitialClusterDir, "memory-dir", o.PreviousEtcdInitialClusterDir, "directory to store the previous etcd initial cluster value")
flags.StringSliceVar(&o.Endpoints, "endpoints", o.Endpoints, "list of all the endpoints to use to try to contact etcd")
flags.StringVar(&o.MemberDir, "data-dir", o.MemberDir, "file to stat for existence of /var/lib/etcd/member")
flags.StringVar(&o.TargetPeerURLHost, "target-peer-url-host", o.TargetPeerURLHost, "host portion of the peer URL. It is used to match on. (either IP or hostname)")
flags.StringVar(&o.TargetName, "target-name", o.TargetName, "name to assign to this peer if we create it")
}

func (o *DiscoverEtcdInitialClusterOptions) Validate() error {
if len(o.CABundleFile) == 0 {
return fmt.Errorf("missing --cacert")
}
if len(o.ClientCertFile) == 0 {
return fmt.Errorf("missing --cert")
}
if len(o.ClientKeyFile) == 0 {
return fmt.Errorf("missing --key")
}
if len(o.Endpoints) == 0 {
return fmt.Errorf("missing --endpoints")
}
if len(o.MemberDir) == 0 {
return fmt.Errorf("missing --data-dir")
}
if len(o.TargetPeerURLHost) == 0 {
return fmt.Errorf("missing --target-peer-url-host")
}
if len(o.TargetName) == 0 {
return fmt.Errorf("missing --target-name")
}
return nil
}

func (o *DiscoverEtcdInitialClusterOptions) Run() error {
_, err := os.Stat(o.MemberDir)
switch {
case os.IsNotExist(err):
// do nothing. This just means we fall through to the polling logic

case err == nil:
fmt.Printf(o.TargetName)
return nil

case err != nil:
return err
}

client, err := o.getClient()
if err != nil {
return err
}
defer client.Close()

var targetMember *etcdserverpb.Member
var allMembers []*etcdserverpb.Member
for i := 0; i < 10; i++ {
fmt.Fprintf(os.Stderr, "#### attempt %d\n", i)
targetMember, allMembers, err = o.checkForTarget(client)

for _, member := range allMembers {
fmt.Fprintf(os.Stderr, " member=%v\n", stringifyMember(member))
}
fmt.Fprintf(os.Stderr, " target=%v, err=%v\n", stringifyMember(targetMember), err)

// we're done because we found what we want.
if targetMember != nil && err == nil {
break
}

fmt.Fprintf(os.Stderr, "#### sleeping...\n")
time.Sleep(1 * time.Second)
}
if err != nil {
return err
}
if targetMember == nil {
return fmt.Errorf("timed out")
}

etcdInitialClusterEntries := []string{}
for _, member := range allMembers {
if len(member.Name) == 0 { // this is the signal for whether or not a given peer is started
continue
}
etcdInitialClusterEntries = append(etcdInitialClusterEntries, fmt.Sprintf("%s=%s", member.Name, member.PeerURLs[0]))
}
if len(targetMember.Name) == 0 {
etcdInitialClusterEntries = append(etcdInitialClusterEntries, fmt.Sprintf("%s=%s", o.TargetName, targetMember.PeerURLs[0]))
}
fmt.Printf(strings.Join(etcdInitialClusterEntries, ","))

return nil
}

func stringifyMember(member *etcdserverpb.Member) string {
if member == nil {
return "nil"
}

return fmt.Sprintf("{name=%q, peerURLs=[%s}, clientURLs=[%s]", member.Name, strings.Join(member.PeerURLs, ","), strings.Join(member.ClientURLs, ","))
}

func (o *DiscoverEtcdInitialClusterOptions) checkForTarget(client *clientv3.Client) (*etcdserverpb.Member, []*etcdserverpb.Member, error) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

memberResponse, err := client.MemberList(ctx)
if err != nil {
return nil, nil, err
}

var targetMember *etcdserverpb.Member
for i := range memberResponse.Members {
member := memberResponse.Members[i]
for _, peerURL := range member.PeerURLs {
if strings.Contains(peerURL, o.TargetPeerURLHost) {
targetMember = member
}
}
}

return targetMember, memberResponse.Members, err
}

func (o *DiscoverEtcdInitialClusterOptions) getClient() (*clientv3.Client, error) {
dialOptions := []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
}

tlsInfo := transport.TLSInfo{
CertFile: o.ClientCertFile,
KeyFile: o.ClientKeyFile,
TrustedCAFile: o.CABundleFile,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}

cfg := &clientv3.Config{
DialOptions: dialOptions,
Endpoints: o.Endpoints,
DialTimeout: 15 * time.Second,
TLS: tlsConfig,
}

return clientv3.New(*cfg)
}

0 comments on commit 60eed1d

Please sign in to comment.