forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 0
/
etcd.go
138 lines (121 loc) · 4.41 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package etcd
import (
"fmt"
"net"
"net/http"
"time"
etcdclient "github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/client"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
configapi "github.com/openshift/origin/pkg/cmd/server/api"
)
// RunEtcd starts an etcd server and runs it forever
func RunEtcd(etcdServerConfig *configapi.EtcdConfig) {
cfg := &config{
name: defaultName,
dir: etcdServerConfig.StorageDir,
TickMs: 100,
ElectionMs: 1000,
maxSnapFiles: 5,
maxWalFiles: 5,
initialClusterToken: "etcd-cluster",
}
var err error
if configapi.UseTLS(etcdServerConfig.ServingInfo) {
cfg.clientTLSInfo.CAFile = etcdServerConfig.ServingInfo.ClientCA
cfg.clientTLSInfo.CertFile = etcdServerConfig.ServingInfo.ServerCert.CertFile
cfg.clientTLSInfo.KeyFile = etcdServerConfig.ServingInfo.ServerCert.KeyFile
}
if cfg.lcurls, err = urlsFromStrings(etcdServerConfig.ServingInfo.BindAddress, cfg.clientTLSInfo); err != nil {
glog.Fatalf("Unable to build etcd client URLs: %v", err)
}
if configapi.UseTLS(etcdServerConfig.PeerServingInfo) {
cfg.peerTLSInfo.CAFile = etcdServerConfig.PeerServingInfo.ClientCA
cfg.peerTLSInfo.CertFile = etcdServerConfig.PeerServingInfo.ServerCert.CertFile
cfg.peerTLSInfo.KeyFile = etcdServerConfig.PeerServingInfo.ServerCert.KeyFile
}
if cfg.lpurls, err = urlsFromStrings(etcdServerConfig.PeerServingInfo.BindAddress, cfg.peerTLSInfo); err != nil {
glog.Fatalf("Unable to build etcd peer URLs: %v", err)
}
if cfg.acurls, err = urlsFromStrings(etcdServerConfig.Address, cfg.clientTLSInfo); err != nil {
glog.Fatalf("Unable to build etcd announce client URLs: %v", err)
}
if cfg.apurls, err = urlsFromStrings(etcdServerConfig.PeerAddress, cfg.peerTLSInfo); err != nil {
glog.Fatalf("Unable to build etcd announce peer URLs: %v", err)
}
if err := cfg.resolveUrls(); err != nil {
glog.Fatalf("Unable to resolve etcd URLs: %v", err)
}
cfg.initialCluster = fmt.Sprintf("%s=%s", cfg.name, cfg.apurls[0].String())
stopped, err := startEtcd(cfg)
if err != nil {
glog.Fatalf("Unable to start etcd: %v", err)
}
go func() {
glog.Infof("Started etcd at %s", etcdServerConfig.Address)
<-stopped
}()
}
// GetAndTestEtcdClient creates an etcd client based on the provided config. It will attempt to
// connect to the etcd server and block until the server responds at least once, or return an
// error if the server never responded.
func GetAndTestEtcdClient(etcdClientInfo configapi.EtcdConnectionInfo) (*etcdclient.Client, error) {
etcdClient, err := EtcdClient(etcdClientInfo)
if err != nil {
return nil, err
}
if err := TestEtcdClient(etcdClient); err != nil {
return nil, err
}
return etcdClient, nil
}
// EtcdClient creates an etcd client based on the provided config.
func EtcdClient(etcdClientInfo configapi.EtcdConnectionInfo) (*etcdclient.Client, error) {
// etcd does a poor job of setting up the transport - use the Kube client stack
transport, err := client.TransportFor(&client.Config{
TLSClientConfig: client.TLSClientConfig{
CertFile: etcdClientInfo.ClientCert.CertFile,
KeyFile: etcdClientInfo.ClientCert.KeyFile,
CAFile: etcdClientInfo.CA,
},
WrapTransport: DefaultEtcdClientTransport,
})
if err != nil {
return nil, err
}
etcdClient := etcdclient.NewClient(etcdClientInfo.URLs)
etcdClient.SetTransport(transport.(*http.Transport))
return etcdClient, nil
}
// TestEtcdClient verifies a client is functional. It will attempt to
// connect to the etcd server and block until the server responds at least once, or return an
// error if the server never responded.
func TestEtcdClient(etcdClient *etcdclient.Client) error {
for i := 0; ; i++ {
_, err := etcdClient.Get("/", false, false)
if err == nil || etcdstorage.IsEtcdNotFound(err) {
break
}
if i > 100 {
return fmt.Errorf("could not reach etcd: %v", err)
}
time.Sleep(50 * time.Millisecond)
}
return nil
}
// DefaultEtcdClientTransport sets defaults for an etcd Transport that are suitable
// for use by infrastructure components.
func DefaultEtcdClientTransport(rt http.RoundTripper) http.RoundTripper {
transport := rt.(*http.Transport)
dialer := &net.Dialer{
Timeout: 30 * time.Second,
// Lower the keep alive for connections.
KeepAlive: 1 * time.Second,
}
transport.Dial = dialer.Dial
// Because watches are very bursty, defends against long delays
// in watch reconnections.
transport.MaxIdleConnsPerHost = 500
return transport
}