-
Notifications
You must be signed in to change notification settings - Fork 0
/
join.go
175 lines (159 loc) · 5.25 KB
/
join.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/embed"
"github.com/coreos/etcd/wal"
"github.com/juju/errors"
"github.com/pingcap/pd/pkg/etcdutil"
log "github.com/sirupsen/logrus"
)
const (
// privateFileMode grants owner to read/write a file.
privateFileMode = 0600
// privateDirMode grants owner to make/remove files inside the directory.
privateDirMode = 0700
)
// PrepareJoinCluster sends MemberAdd command to PD cluster,
// and returns the initial configuration of the PD cluster.
//
// TL;TR: The join functionality is safe. With data, join does nothing, w/o data
// and it is not a member of cluster, join does MemberAdd, it returns an
// error if PD tries to join itself, missing data or join a duplicated PD.
//
// Etcd automatically re-joins the cluster if there is a data directory. So
// first it checks if there is a data directory or not. If there is, it returns
// an empty string (etcd will get the correct configurations from the data
// directory.)
//
// If there is no data directory, there are following cases:
//
// - A new PD joins an existing cluster.
// What join does: MemberAdd, MemberList, then generate initial-cluster.
//
// - A failed PD re-joins the previous cluster.
// What join does: return an error. (etcd reports: raft log corrupted,
// truncated, or lost?)
//
// - A deleted PD joins to previous cluster.
// What join does: MemberAdd, MemberList, then generate initial-cluster.
// (it is not in the member list and there is no data, so
// we can treat it as a new PD.)
//
// If there is a data directory, there are following special cases:
//
// - A failed PD tries to join the previous cluster but it has been deleted
// during its downtime.
// What join does: return "" (etcd will connect to other peers and find
// that the PD itself has been removed.)
//
// - A deleted PD joins the previous cluster.
// What join does: return "" (as etcd will read data directory and find
// that the PD itself has been removed, so an empty string
// is fine.)
func PrepareJoinCluster(cfg *Config) error {
// - A PD tries to join itself.
if cfg.Join == "" {
return nil
}
if cfg.Join == cfg.AdvertiseClientUrls {
return errors.New("join self is forbidden")
}
filePath := path.Join(cfg.DataDir, "join")
// Read the persist join config
if _, err := os.Stat(filePath); !os.IsNotExist(err) {
s, err := ioutil.ReadFile(filePath)
if err != nil {
log.Fatal("read the join config meet error: ", err)
}
cfg.InitialCluster = strings.TrimSpace(string(s))
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
}
initialCluster := ""
if wal.Exist(path.Join(cfg.DataDir, "member")) {
// Cases with data directory.
cfg.InitialCluster = initialCluster
cfg.InitialClusterState = embed.ClusterStateFlagExisting
return nil
}
// Below are cases without data directory.
tlsConfig, err := cfg.Security.ToTLSConfig()
if err != nil {
return errors.Trace(err)
}
client, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(cfg.Join, ","),
DialTimeout: etcdutil.DefaultDialTimeout,
TLS: tlsConfig,
})
if err != nil {
return errors.Trace(err)
}
defer client.Close()
listResp, err := etcdutil.ListEtcdMembers(client)
if err != nil {
return errors.Trace(err)
}
existed := false
for _, m := range listResp.Members {
if len(m.Name) == 0 {
return errors.New("there is a member that has not joined successfully")
}
if m.Name == cfg.Name {
existed = true
}
}
// - A failed PD re-joins the previous cluster.
if existed {
return errors.New("missing data or join a duplicated pd")
}
// - A new PD joins an existing cluster.
// - A deleted PD joins to previous cluster.
addResp, err := etcdutil.AddEtcdMember(client, []string{cfg.AdvertisePeerUrls})
if err != nil {
return errors.Trace(err)
}
listResp, err = etcdutil.ListEtcdMembers(client)
if err != nil {
return errors.Trace(err)
}
pds := []string{}
for _, memb := range listResp.Members {
n := memb.Name
if memb.ID == addResp.Member.ID {
n = cfg.Name
}
if len(n) == 0 {
return errors.New("there is a member that has not joined successfully")
}
for _, m := range memb.PeerURLs {
pds = append(pds, fmt.Sprintf("%s=%s", n, m))
}
}
initialCluster = strings.Join(pds, ",")
cfg.InitialCluster = initialCluster
cfg.InitialClusterState = embed.ClusterStateFlagExisting
err = os.MkdirAll(cfg.DataDir, privateDirMode)
if err != nil && !os.IsExist(err) {
return errors.Trace(err)
}
err = ioutil.WriteFile(filePath, []byte(cfg.InitialCluster), privateFileMode)
return errors.Trace(err)
}