This repository has been archived by the owner on Jul 4, 2022. It is now read-only.
/
client.go
110 lines (95 loc) · 2.77 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package etcd
import (
"context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/taku-k/polymerase/pkg/polypb"
)
type ClientAPI interface {
GetBackupMeta(key polypb.BackupMetaKey) (polypb.BackupMetaSlice, error)
PutBackupMeta(key polypb.BackupMetaKey, meta *polypb.BackupMeta) error
RemoveBackupMeta(key polypb.BackupMetaKey) error
GetNodeMeta(key polypb.NodeMetaKey) ([]*polypb.NodeMeta, error)
PutNodeMeta(key polypb.NodeMetaKey, meta *polypb.NodeMeta) error
RemoveNodeMeta(key polypb.NodeMetaKey) error
Close()
}
type Client struct {
cli *clientv3.Client
session *concurrency.Session
}
func NewClient(cfg clientv3.Config) (ClientAPI, error) {
cli, err := clientv3.New(cfg)
if err != nil {
return nil, errors.Wrapf(err, "Creating etcd client is failed")
}
session, err := concurrency.NewSession(cli)
if err != nil {
return nil, errors.Wrapf(err, "Creating session is failed")
}
return &Client{
cli: cli,
session: session,
}, nil
}
func (c *Client) GetBackupMeta(key polypb.BackupMetaKey) (polypb.BackupMetaSlice, error) {
res, err := c.cli.KV.Get(context.TODO(), string(key),
clientv3.WithPrefix(), clientv3.WithIgnoreLease())
if err != nil {
return nil, err
}
result := make(polypb.BackupMetaSlice, len(res.Kvs))
for i, kv := range res.Kvs {
meta := &polypb.BackupMeta{}
if err := proto.Unmarshal(kv.Value, meta); err != nil {
return nil, err
}
result[i] = meta
}
return result, nil
}
func (c *Client) PutBackupMeta(key polypb.BackupMetaKey, meta *polypb.BackupMeta) error {
data, err := meta.Marshal()
if err != nil {
return err
}
_, err = c.cli.KV.Put(context.TODO(), string(key), string(data))
return err
}
func (c *Client) RemoveBackupMeta(key polypb.BackupMetaKey) error {
_, err := c.cli.KV.Delete(context.TODO(), string(key), clientv3.WithPrefix())
return err
}
func (c *Client) GetNodeMeta(key polypb.NodeMetaKey) ([]*polypb.NodeMeta, error) {
res, err := c.cli.KV.Get(context.TODO(), string(key), clientv3.WithPrefix())
if err != nil {
return nil, err
}
result := make([]*polypb.NodeMeta, len(res.Kvs))
for i, kv := range res.Kvs {
meta := &polypb.NodeMeta{}
if err := proto.Unmarshal(kv.Value, meta); err != nil {
return nil, err
}
result[i] = meta
}
return result, nil
}
func (c *Client) PutNodeMeta(key polypb.NodeMetaKey, meta *polypb.NodeMeta) error {
data, err := meta.Marshal()
if err != nil {
return err
}
_, err = c.cli.KV.Put(context.TODO(), string(key), string(data))
return err
}
func (c *Client) RemoveNodeMeta(key polypb.NodeMetaKey) error {
_, err := c.cli.KV.Delete(context.TODO(), string(key), clientv3.WithPrefix())
return err
}
func (c *Client) Close() {
c.cli.Close()
c.session.Close()
}