forked from etcd-io/etcd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
import_snap_command.go
128 lines (114 loc) · 2.93 KB
/
import_snap_command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package command
import (
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"sync"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/codegangsta/cli"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/coreos/go-etcd/etcd"
"github.com/coreos/etcd/store"
)
type set struct {
key string
value string
ttl int64
}
func NewImportSnapCommand() cli.Command {
return cli.Command{
Name: "import",
Usage: "import a snapshot to a cluster",
Flags: []cli.Flag{
cli.StringFlag{Name: "snap", Value: "", Usage: "Path to the vaild etcd 0.4.x snapshot."},
cli.StringSliceFlag{Name: "hidden", Value: new(cli.StringSlice), Usage: "Hidden key spaces to import from snapshot"},
cli.IntFlag{Name: "c", Value: 10, Usage: "Number of concurrent clients to import the data"},
},
Action: handleImportSnap,
}
}
func handleImportSnap(c *cli.Context) {
d, err := ioutil.ReadFile(c.String("snap"))
if err != nil {
if c.String("snap") == "" {
fmt.Printf("no snapshot file provided (use --snap)\n")
} else {
fmt.Printf("cannot read snapshot file %s\n", c.String("snap"))
}
os.Exit(1)
}
st := store.New()
err = st.Recovery(d)
if err != nil {
fmt.Printf("cannot recover the snapshot file: %v\n", err)
os.Exit(1)
}
endpoints, err := getEndpoints(c)
if err != nil {
handleError(ErrorFromEtcd, err)
}
tr, err := getTransport(c)
if err != nil {
handleError(ErrorFromEtcd, err)
}
wg := &sync.WaitGroup{}
setc := make(chan set)
concurrent := c.Int("c")
fmt.Printf("starting to import snapshot %s with %d clients\n", c.String("snap"), concurrent)
for i := 0; i < concurrent; i++ {
client := etcd.NewClient(endpoints)
client.SetTransport(tr)
if c.GlobalBool("debug") {
go dumpCURL(client)
}
if ok := client.SyncCluster(); !ok {
handleError(FailedToConnectToHost, errors.New("cannot sync with the cluster using endpoints "+strings.Join(endpoints, ", ")))
}
wg.Add(1)
go runSet(client, setc, wg)
}
all, err := st.Get("/", true, true)
if err != nil {
handleError(ErrorFromEtcd, err)
}
n := copyKeys(all.Node, setc)
hiddens := c.StringSlice("hidden")
for _, h := range hiddens {
allh, err := st.Get(h, true, true)
if err != nil {
handleError(ErrorFromEtcd, err)
}
n += copyKeys(allh.Node, setc)
}
close(setc)
wg.Wait()
fmt.Printf("finished importing %d keys\n", n)
}
func copyKeys(n *store.NodeExtern, setc chan set) int {
num := 0
if !n.Dir {
setc <- set{n.Key, *n.Value, n.TTL}
return 1
}
log.Println("entering dir:", n.Key)
for _, nn := range n.Nodes {
sub := copyKeys(nn, setc)
num += sub
}
return num
}
func runSet(c *etcd.Client, setc chan set, wg *sync.WaitGroup) {
for s := range setc {
log.Println("copying key:", s.key)
if s.ttl != 0 && s.ttl < 300 {
log.Printf("extending key %s's ttl to 300 seconds", s.key)
s.ttl = 5 * 60
}
_, err := c.Set(s.key, s.value, uint64(s.ttl))
if err != nil {
log.Fatalf("failed to copy key: %v\n", err)
}
}
wg.Done()
}