/
embed.go
120 lines (100 loc) · 2.4 KB
/
embed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package etcd
import (
"errors"
"fmt"
"net/url"
"os"
"strconv"
"strings"
"time"
"github.com/rafalb8/go-storage/internal/iter"
"github.com/rafalb8/go-storage/internal/net"
"go.etcd.io/etcd/server/v3/embed"
)
var (
localIP = net.LocalIP()
)
type etcdEmbed struct {
cfg *embed.Config
embed *embed.Etcd
}
func embedCfg(etcd *Etcd, peers []string, token, dir string, test bool) error {
cfg := embed.NewConfig()
// Basic
cfg.Dir = dir
nodeIdx, err := thisPeerPosition(peers)
if err != nil {
return err
}
cfg.Name = "node" + nodeIdx
// Logger
cfg.Logger = "zap"
if test {
cfg.LogLevel = "error"
}
// Peers
cfg.AdvertisePeerUrls = parseIPs([]string{localIP}, peerURL)
cfg.ListenPeerUrls = cfg.AdvertisePeerUrls
cfg.AdvertiseClientUrls = parseIPs([]string{localIP}, clientURL)
cfg.ListenClientUrls = cfg.AdvertiseClientUrls
// Cluster cfg
cfg.InitialClusterToken = token
cfg.InitialCluster = parseInitialCluster(peers)
cfg.ClusterState = embed.ClusterStateFlagNew
if entries, _ := os.ReadDir(cfg.Dir); len(entries) > 0 {
cfg.ClusterState = embed.ClusterStateFlagExisting
}
etcd.lg.Info("Cluster State:", cfg.ClusterState)
etcd.lg.Debug(fmt.Sprintf("%+v", cfg))
// Set cfg
etcd.server = &etcdEmbed{cfg: cfg}
return nil
}
func setupEmbed(etcd *Etcd) error {
e := etcd.server
var err error
e.embed, err = embed.StartEtcd(e.cfg)
if err != nil {
return err
}
select {
case <-e.embed.Server.ReadyNotify():
etcd.lg.Info("Server is ready!")
return nil
case <-time.After(60 * time.Second):
e.embed.Server.Stop() // trigger a shutdown
return errors.New("server took too long to start")
case err := <-e.embed.Err():
return err
}
}
func clientURL(ip string) string {
return "http://" + ip + ":2379"
}
func peerURL(ip string) string {
return "http://" + ip + ":2380"
}
func thisPeerPosition(peers []string) (string, error) {
for i, v := range peers {
if v == localIP {
return strconv.Itoa(i), nil
}
}
return "", fmt.Errorf("position of %s not found in %s", localIP, peers)
}
func parseIPs(ips []string, format func(string) string) []url.URL {
return iter.MapSlice(ips, func(ip string) url.URL {
u, err := url.Parse(format(ip))
if err != nil {
return url.URL{}
}
return *u
})
}
func parseInitialCluster(ips []string) string {
i := -1
return strings.Join(iter.MapSlice(ips, func(ip string) string {
i++
return fmt.Sprintf("node%d=%s", i, peerURL(ip))
}), ",")
}