/
connd.go
129 lines (112 loc) · 2.48 KB
/
connd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package conncli
import (
"errors"
"sync"
"github.com/meitu/bifrost/commons/log"
"github.com/meitu/bifrost/commons/trace"
conn "github.com/meitu/bifrost/grpc/conn"
"github.com/meitu/bifrost/push/conf"
"go.uber.org/zap"
"go.uber.org/zap/buffer"
"golang.org/x/net/context"
)
const (
TraceID = "traceid"
)
type connd struct {
clients map[string]*client
rwlock sync.RWMutex
cfg *conf.Client
}
func NewConnd(config *conf.Connd) (Connd, error) {
c := &connd{
clients: make(map[string]*client),
cfg: &config.Client,
}
return c, nil
}
func (c *connd) Close() error {
c.rwlock.Lock()
for addr, cli := range c.clients {
cli.close()
delete(c.clients, addr)
}
c.rwlock.Unlock()
return nil
}
func (c *connd) getClient(addr string) (cli *client, err error) {
c.rwlock.RLock()
cli, ok := c.clients[addr]
c.rwlock.RUnlock()
if !ok {
if env := log.Check(zap.DebugLevel, "create conn client"); env != nil {
env.Write(zap.String("addr", addr))
}
c.rwlock.Lock()
defer c.rwlock.Unlock()
// double check
cli, ok = c.clients[addr]
if ok {
return cli, nil
}
cli, err = newClient(addr, c.cfg)
if err != nil {
return nil, err
}
c.clients[addr] = cli
}
return cli, err
}
func (c *connd) Disconnect(ctx context.Context, id, service, addr string, connid int64) error {
if env := log.Check(zap.DebugLevel, "disconnect client"); env != nil {
env.Write(zap.String("addr", addr),
zap.String("id", id),
zap.Int64("connid", connid))
}
req := &conn.DisconnectReq{
Service: service,
ClientID: id,
ConnectionID: connid,
TraceID: traceid(ctx),
}
cli, err := c.getClient(addr)
if err != nil {
return err
}
err = cli.disconnect(ctx, req)
if err != nil {
return err
}
return nil
}
func (c *connd) Notify(ctx context.Context, service string, addrs []string, topic string, index []byte, noneDowngrade bool) error {
req := &conn.NotifyReq{
Topic: topic,
Index: index,
NoneDowngrade: noneDowngrade,
TraceID: traceid(ctx),
Service: service,
}
var errbuf buffer.Buffer
for _, addr := range addrs {
cli, err := c.getClient(addr)
if err != nil {
errbuf.AppendString(err.Error())
continue
}
if err := cli.notify(ctx, req); err != nil {
errbuf.AppendString(err.Error())
}
}
if errbuf.Len() != 0 {
return errors.New(errbuf.String())
}
return nil
}
func traceid(ctx context.Context) string {
traceid, err := trace.GetTraceID(ctx)
if err != nil {
return TraceID
}
return traceid
}