forked from openshift/origin
/
etcd.go
111 lines (93 loc) · 3.29 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package etcd
import (
"fmt"
"time"
etcdconfig "github.com/coreos/etcd/config"
"github.com/coreos/etcd/etcd"
etcdclient "github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/openshift/origin/pkg/api/latest"
configapi "github.com/openshift/origin/pkg/cmd/server/api"
)
// RunEtcd starts an etcd server and runs it forever
func RunEtcd(etcdServerConfig *configapi.EtcdConfig) {
config := etcdconfig.New()
config.Addr = etcdServerConfig.Address
config.BindAddr = etcdServerConfig.ServingInfo.BindAddress
if configapi.UseTLS(etcdServerConfig.ServingInfo) {
config.CAFile = etcdServerConfig.ServingInfo.ClientCA
config.CertFile = etcdServerConfig.ServingInfo.ServerCert.CertFile
config.KeyFile = etcdServerConfig.ServingInfo.ServerCert.KeyFile
}
config.Peer.Addr = etcdServerConfig.PeerAddress
config.Peer.BindAddr = etcdServerConfig.PeerServingInfo.BindAddress
if configapi.UseTLS(etcdServerConfig.PeerServingInfo) {
config.Peer.CAFile = etcdServerConfig.PeerServingInfo.ClientCA
config.Peer.CertFile = etcdServerConfig.PeerServingInfo.ServerCert.CertFile
config.Peer.KeyFile = etcdServerConfig.PeerServingInfo.ServerCert.KeyFile
}
config.DataDir = etcdServerConfig.StorageDir
config.Name = "openshift.local"
server := etcd.New(config)
go util.Forever(func() {
glog.Infof("Started etcd at %s", config.Addr)
server.Run()
glog.Fatalf("etcd died, exiting.")
}, 500*time.Millisecond)
<-server.ReadyNotify()
}
// getAndTestEtcdClient creates an etcd client based on the provided config and waits
// until etcd server is reachable. It errors out and exits if the server cannot
// be reached for a certain amount of time.
func GetAndTestEtcdClient(etcdClientInfo configapi.EtcdConnectionInfo) (*etcdclient.Client, error) {
var etcdClient *etcdclient.Client
if len(etcdClientInfo.ClientCert.CertFile) > 0 {
tlsClient, err := etcdclient.NewTLSClient(
etcdClientInfo.URLs,
etcdClientInfo.ClientCert.CertFile,
etcdClientInfo.ClientCert.KeyFile,
etcdClientInfo.CA,
)
if err != nil {
return nil, err
}
etcdClient = tlsClient
} else if len(etcdClientInfo.CA) > 0 {
etcdClient = etcdclient.NewClient(etcdClientInfo.URLs)
err := etcdClient.AddRootCA(etcdClientInfo.CA)
if err != nil {
return nil, err
}
} else {
etcdClient = etcdclient.NewClient(etcdClientInfo.URLs)
}
for i := 0; ; i++ {
// TODO: make sure this works with etcd2 (root key may not exist)
_, err := etcdClient.Get("/", false, false)
if err == nil || tools.IsEtcdNotFound(err) {
break
}
if i > 100 {
return nil, fmt.Errorf("Could not reach etcd: %v", err)
}
time.Sleep(50 * time.Millisecond)
}
return etcdClient, nil
}
// newOpenShiftEtcdHelper returns an EtcdHelper for the provided arguments or an error if the version
// is incorrect.
func NewOpenShiftEtcdHelper(etcdClientInfo configapi.EtcdConnectionInfo) (helper tools.EtcdHelper, err error) {
// Connect and setup etcd interfaces
client, err := GetAndTestEtcdClient(etcdClientInfo)
if err != nil {
return tools.EtcdHelper{}, err
}
version := latest.Version
interfaces, err := latest.InterfacesFor(version)
if err != nil {
return helper, err
}
return tools.NewEtcdHelper(client, interfaces.Codec), nil
}