/
consul.go
142 lines (124 loc) · 2.88 KB
/
consul.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package storage
import (
"Envoy-Pilot/cmd/server/constant"
"fmt"
"log"
"os"
"strconv"
"sync"
consul "github.com/hashicorp/consul/api"
"github.com/joho/godotenv"
)
var singletonConsulWrapper ConsulWrapper
var CONSUL_PATH string
var mux sync.Mutex
const (
envoySubscriberSequenceKey = "envoySubscriberSequence"
)
type ConsulWrapper struct {
client *consul.Client
}
func GetConsulWrapper() ConsulWrapper {
mux.Lock()
defer mux.Unlock()
if singletonConsulWrapper.client == nil {
err := godotenv.Load(constant.ENV_PATH)
if err != nil {
log.Print(err)
log.Fatal("Error loading .env file")
}
consulPath := os.Getenv("CONSUL_PATH")
log.Printf("Consul Path: %s\n", consulPath)
config := &consul.Config{Address: consulPath}
singletonConsulWrapper = ConsulWrapper{}
client, err := consul.NewClient(config)
if err != nil {
panic(err)
}
singletonConsulWrapper.client = client
}
return singletonConsulWrapper
}
// TODO add retry
func (c *ConsulWrapper) GetUniqId() int {
for i := 0; i < 100; i++ {
res, id, err := c.checkAndSetUniqId()
if err != nil {
log.Println("Error updating uniq CAS")
panic(err)
}
if res {
return id
}
log.Println("Re generating uniq id")
}
panic("Unable to generate new id")
}
func GetSequenceKey() string {
return fmt.Sprintf("%s/%s", constant.CONSUL_PREFIX, envoySubscriberSequenceKey)
}
func (c *ConsulWrapper) checkAndSetUniqId() (bool, int, error) {
pair, _, err := c.client.KV().Get(GetSequenceKey(), nil)
if err != nil {
panic(err)
}
if pair == nil {
log.Println("nil value...")
c.Set(GetSequenceKey(), "1")
return true, 1, nil
// pair = c.Get(envoySubscriberSequenceKey)
}
id, err := strconv.Atoi(string(pair.Value))
if err != nil {
log.Printf("Err getting uniq id: %s\n", pair.Value)
panic(err)
}
log.Printf("Last id value is %d\n", id)
newId := id + 1
pair.Value = []byte(strconv.Itoa(newId))
res, _, err := c.client.KV().CAS(pair, nil)
return res, newId, err
}
func (c *ConsulWrapper) Set(key string, value string) {
p := &consul.KVPair{Key: key, Value: []byte(value)}
_, err := c.client.KV().Put(p, nil)
if err != nil {
log.Println(err)
panic(err)
}
}
func (c *ConsulWrapper) Get(key string) *consul.KVPair {
pair, _, err := c.client.KV().Get(key, nil)
if err != nil {
panic(err)
}
// if pair == nil {
// log.Printf("Nil value for key %s\n", key)
// }
return pair
}
func (c *ConsulWrapper) GetString(key string) string {
pair := c.Get(key)
if pair != nil {
return string(pair.Value)
} else {
return ""
}
}
func (c *ConsulWrapper) GetInt(key string) int {
pair := c.Get(key)
id, err := strconv.Atoi(string(pair.Value))
if err != nil {
log.Println("Err getting uniq id")
panic(err)
}
return id
}
func (c *ConsulWrapper) Delete(key string) error {
_, err := c.client.KV().Delete(key, nil)
if err != nil {
log.Printf("Error deleting key %s\n", key)
return err
}
return nil
}