Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug 1805809: create logic for golang ETCD_INITIAL_CLUSTER #27

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile.openshift
Expand Up @@ -13,6 +13,7 @@ ENTRYPOINT ["/usr/bin/etcd"]

COPY --from=builder /go/src/github.com/coreos/etcd/bin/etcd /usr/bin/
COPY --from=builder /go/src/github.com/coreos/etcd/bin/etcdctl /usr/bin/
COPY --from=builder /go/src/github.com/coreos/etcd/bin/discover-etcd-initial-cluster /usr/bin/

LABEL io.k8s.display-name="etcd server" \
io.k8s.description="etcd is a distributed key-value store which stores the persistent master state for Kubernetes and OpenShift." \
Expand Down
1 change: 1 addition & 0 deletions Dockerfile.rhel
Expand Up @@ -13,6 +13,7 @@ ENTRYPOINT ["/usr/bin/etcd"]

COPY --from=builder /go/src/github.com/coreos/etcd/bin/etcd /usr/bin/
COPY --from=builder /go/src/github.com/coreos/etcd/bin/etcdctl /usr/bin/
COPY --from=builder /go/src/github.com/coreos/etcd/bin/discover-etcd-initial-cluster /usr/bin/

LABEL io.k8s.display-name="etcd server" \
io.k8s.description="etcd is a distributed key-value store which stores the persistent master state for Kubernetes and OpenShift." \
Expand Down
15 changes: 15 additions & 0 deletions build
Expand Up @@ -64,6 +64,20 @@ etcd_build() {
-o "${out}/etcdctl" ${REPO_PATH}/etcdctl || return
}


openshift_tools_build() {
out="bin"
if [[ -n "${BINDIR}" ]]; then out="${BINDIR}"; fi
toggle_failpoints_default

# Static compilation is useful when etcd is run in a container. $GO_BUILD_FLAGS is OK
# shellcheck disable=SC2086
CGO_ENABLED=0 go build $GO_BUILD_FLAGS \
-installsuffix cgo \
-ldflags "$GO_LDFLAGS" \
-o "${out}/discover-etcd-initial-cluster" "github.com/coreos/etcd/openshift-tools/discover-etcd-initial-cluster" || return
}

tools_build() {
out="bin"
if [[ -n "${BINDIR}" ]]; then out="${BINDIR}"; fi
Expand Down Expand Up @@ -91,4 +105,5 @@ fi
# only build when called directly, not sourced
if echo "$0" | grep "build$" >/dev/null; then
etcd_build
openshift_tools_build
fi
35 changes: 35 additions & 0 deletions openshift-tools/discover-etcd-initial-cluster/main.go
@@ -0,0 +1,35 @@
package main

import (
goflag "flag"
"fmt"
"math/rand"
"os"
"strings"
"time"

discover_etcd_initial_cluster "github.com/coreos/etcd/openshift-tools/pkg/discover-etcd-initial-cluster"
"github.com/spf13/pflag"
)

// copy from `utilflag "k8s.io/component-base/cli/flag"`
// WordSepNormalizeFunc changes all flags that contain "_" separators
func WordSepNormalizeFunc(f *pflag.FlagSet, name string) pflag.NormalizedName {
if strings.Contains(name, "_") {
return pflag.NormalizedName(strings.Replace(name, "_", "-", -1))
}
return pflag.NormalizedName(name)
}

func main() {
rand.Seed(time.Now().UTC().UnixNano())

pflag.CommandLine.SetNormalizeFunc(WordSepNormalizeFunc)
pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)

command := discover_etcd_initial_cluster.NewDiscoverEtcdInitialClusterCommand()
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
226 changes: 226 additions & 0 deletions openshift-tools/pkg/discover-etcd-initial-cluster/initial-cluster.go
@@ -0,0 +1,226 @@
package discover_etcd_initial_cluster

import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/coreos/etcd/etcdserver/etcdserverpb"

"github.com/coreos/etcd/pkg/transport"
"google.golang.org/grpc"

"github.com/coreos/etcd/clientv3"

"github.com/spf13/pflag"

"github.com/spf13/cobra"
)

type DiscoverEtcdInitialClusterOptions struct {
// TargetPeerURLHost is the host portion of the peer URL. It is used to match on. (either IP or hostname)
TargetPeerURLHost string
// TargetName is the name to assign to this peer if we create it
TargetName string

// CABundleFile is the file to use to trust the etcd server
CABundleFile string
// ClientCertFile is the client cert to use to authenticate this binary to etcd
ClientCertFile string
// ClientKeyFile is the client key to use to authenticate this binary to etcd
ClientKeyFile string
// Endpoints is a list of all the endpoints to use to try to contact etcd
Endpoints []string

// MemberDir is the directory created when etcd starts the first time
MemberDir string
}

func NewDiscoverEtcdInitialCluster() *DiscoverEtcdInitialClusterOptions {
return &DiscoverEtcdInitialClusterOptions{}
}

func NewDiscoverEtcdInitialClusterCommand() *cobra.Command {
o := NewDiscoverEtcdInitialCluster()

cmd := &cobra.Command{
Use: "discover-etcd-initial-cluster",
Short: "output the value for ETCD_INITIAL_CLUSTER in openshift etcd static pod",
Long: `output the value for ETCD_INITIAL_CLUSTER in openshift etcd static pod

1. If --data-dir exists, output a marker value and exit.
2. It tries to contact every available etcd to get a list of member.
3. Check each member to see if any one of them is the target.
4. If so, and if it is started, use the member list to create the ETCD_INITIAL_CLUSTER value and print it out.
5. If so, and if it it not started, use the existing member list and append the target value to create the ETCD_INITIAL_CLUSTER value and print it out.
6. If not, try again until either you have it or you time out.
`,
Run: func(cmd *cobra.Command, args []string) {
if err := o.Validate(); err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
}

if err := o.Run(); err != nil {
fmt.Fprint(os.Stderr, err)
os.Exit(1)
}
},
}
o.BindFlags(cmd.Flags())

return cmd
}

func (o *DiscoverEtcdInitialClusterOptions) BindFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.CABundleFile, "cacert", o.CABundleFile, "file to use to verify the identity of the etcd server")
flags.StringVar(&o.ClientCertFile, "cert", o.ClientCertFile, "client cert to use to authenticate this binary to etcd")
flags.StringVar(&o.ClientKeyFile, "key", o.ClientKeyFile, "client key to use to authenticate this binary to etcd")
flags.StringSliceVar(&o.Endpoints, "endpoints", o.Endpoints, "list of all the endpoints to use to try to contact etcd")
flags.StringVar(&o.MemberDir, "data-dir", o.MemberDir, "file to stat for existence of /var/lib/etcd/member")
flags.StringVar(&o.TargetPeerURLHost, "target-peer-url-host", o.TargetPeerURLHost, "host portion of the peer URL. It is used to match on. (either IP or hostname)")
flags.StringVar(&o.TargetName, "target-name", o.TargetName, "name to assign to this peer if we create it")
}

func (o *DiscoverEtcdInitialClusterOptions) Validate() error {
if len(o.CABundleFile) == 0 {
return fmt.Errorf("missing --cacert")
}
if len(o.ClientCertFile) == 0 {
return fmt.Errorf("missing --cert")
}
if len(o.ClientKeyFile) == 0 {
return fmt.Errorf("missing --key")
}
if len(o.Endpoints) == 0 {
return fmt.Errorf("missing --endpoints")
}
if len(o.MemberDir) == 0 {
return fmt.Errorf("missing --data-dir")
}
if len(o.TargetPeerURLHost) == 0 {
return fmt.Errorf("missing --target-peer-url-host")
}
if len(o.TargetName) == 0 {
return fmt.Errorf("missing --target-name")
}
return nil
}

func (o *DiscoverEtcdInitialClusterOptions) Run() error {
_, err := os.Stat(o.MemberDir)
switch {
case os.IsNotExist(err):
// do nothing. This just means we fall through to the polling logic

case err == nil:
fmt.Printf(o.TargetName)
return nil

case err != nil:
return err
}

client, err := o.getClient()
if err != nil {
return err
}
defer client.Close()

var targetMember *etcdserverpb.Member
var allMembers []*etcdserverpb.Member
for i := 0; i < 10; i++ {
fmt.Fprintf(os.Stderr, "#### attempt %d\n", i)
targetMember, allMembers, err = o.checkForTarget(client)

for _, member := range allMembers {
fmt.Fprintf(os.Stderr, " member=%v\n", stringifyMember(member))
}
fmt.Fprintf(os.Stderr, " target=%v, err=%v\n", stringifyMember(targetMember), err)

// we're done because we found what we want.
if targetMember != nil && err == nil {
break
}

fmt.Fprintf(os.Stderr, "#### sleeping...\n")
time.Sleep(1 * time.Second)
}
if err != nil {
return err
}
if targetMember == nil {
return fmt.Errorf("timed out")
}

etcdInitialClusterEntries := []string{}
for _, member := range allMembers {
if len(member.Name) == 0 { // this is the signal for whether or not a given peer is started
continue
}
etcdInitialClusterEntries = append(etcdInitialClusterEntries, fmt.Sprintf("%s=%s", member.Name, member.PeerURLs[0]))
}
if len(targetMember.Name) == 0 {
etcdInitialClusterEntries = append(etcdInitialClusterEntries, fmt.Sprintf("%s=%s", o.TargetName, targetMember.PeerURLs[0]))
}
fmt.Printf(strings.Join(etcdInitialClusterEntries, ","))

return nil
}

func stringifyMember(member *etcdserverpb.Member) string {
if member == nil {
return "nil"
}

return fmt.Sprintf("{name=%q, peerURLs=[%s}, clientURLs=[%s]", member.Name, strings.Join(member.PeerURLs, ","), strings.Join(member.ClientURLs, ","))
}

func (o *DiscoverEtcdInitialClusterOptions) checkForTarget(client *clientv3.Client) (*etcdserverpb.Member, []*etcdserverpb.Member, error) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

memberResponse, err := client.MemberList(ctx)
if err != nil {
return nil, nil, err
}

var targetMember *etcdserverpb.Member
for i := range memberResponse.Members {
member := memberResponse.Members[i]
for _, peerURL := range member.PeerURLs {
if strings.Contains(peerURL, o.TargetPeerURLHost) {
targetMember = member
}
}
}

return targetMember, memberResponse.Members, err
}

func (o *DiscoverEtcdInitialClusterOptions) getClient() (*clientv3.Client, error) {
dialOptions := []grpc.DialOption{
grpc.WithBlock(), // block until the underlying connection is up
}

tlsInfo := transport.TLSInfo{
CertFile: o.ClientCertFile,
KeyFile: o.ClientKeyFile,
TrustedCAFile: o.CABundleFile,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, err
}

cfg := &clientv3.Config{
DialOptions: dialOptions,
Endpoints: o.Endpoints,
DialTimeout: 15 * time.Second,
TLS: tlsConfig,
}

return clientv3.New(*cfg)
}