forked from openshift/library-go
/
etcd_client.go
113 lines (95 loc) · 2.92 KB
/
etcd_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package encryption
import (
"bufio"
"context"
"fmt"
"os/exec"
"strconv"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
)
type EtcdClient interface {
Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)
}
func NewEtcdClient(kubeClient kubernetes.Interface) EtcdClient {
return &etcdWrapper{kubeClient: kubeClient}
}
type etcdWrapper struct {
kubeClient kubernetes.Interface
}
func (e *etcdWrapper) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
// we need to rebuild this port-forward based client every time so we can tolerate API server rollouts
clientInternal, done, err := e.newEtcdClientInternal()
if err != nil {
return nil, fmt.Errorf("failed to build port-forward based etcd client: %v", err)
}
defer done()
return clientInternal.Get(ctx, key, opts...)
}
func (e *etcdWrapper) newEtcdClientInternal() (EtcdClient, func(), error) {
ctx, cancel := context.WithCancel(context.Background())
cmd := exec.CommandContext(ctx, "oc", "port-forward", "service/etcd", ":2379", "-n", "openshift-etcd")
done := func() {
cancel()
_ = cmd.Wait() // wait to clean up resources but ignore returned error since cancel kills the process
}
var err error // so we can clean up on error
defer func() {
if err != nil {
done()
}
}()
stdOut, err := cmd.StdoutPipe()
if err != nil {
return nil, nil, err
}
if err = cmd.Start(); err != nil {
return nil, nil, err
}
scanner := bufio.NewScanner(stdOut)
if !scanner.Scan() {
return nil, nil, fmt.Errorf("failed to scan port forward std out")
}
if err = scanner.Err(); err != nil {
return nil, nil, err
}
output := scanner.Text()
port := strings.TrimSuffix(strings.TrimPrefix(output, "Forwarding from 127.0.0.1:"), " -> 2379")
_, err = strconv.Atoi(port)
if err != nil {
return nil, nil, fmt.Errorf("port forward output not in expected format: %s", output)
}
coreV1 := e.kubeClient.CoreV1()
etcdConfigMap, err := coreV1.ConfigMaps("openshift-config").Get("etcd-ca-bundle", metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
etcdSecret, err := coreV1.Secrets("openshift-config").Get("etcd-client", metav1.GetOptions{})
if err != nil {
return nil, nil, err
}
tlsConfig, err := restclient.TLSConfigFor(&restclient.Config{
TLSClientConfig: restclient.TLSClientConfig{
CertData: etcdSecret.Data[corev1.TLSCertKey],
KeyData: etcdSecret.Data[corev1.TLSPrivateKeyKey],
CAData: []byte(etcdConfigMap.Data["ca-bundle.crt"]),
},
})
if err != nil {
return nil, nil, err
}
etcdClient3, err := clientv3.New(clientv3.Config{
Endpoints: []string{"https://127.0.0.1:" + port},
DialTimeout: 30 * time.Second,
TLS: tlsConfig,
})
if err != nil {
return nil, nil, err
}
return etcdClient3.KV, done, nil
}