/
delegate.go
100 lines (85 loc) · 2.7 KB
/
delegate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package memberlist
import (
"bytes"
"fmt"
"github.com/hashicorp/go-msgpack/codec"
"github.com/unionj-cloud/go-doudou/v2/framework/registry/constants"
"github.com/unionj-cloud/go-doudou/v2/toolkit/memberlist"
logger "github.com/unionj-cloud/go-doudou/v2/toolkit/zlogger"
"sync"
"time"
)
type Service struct {
Name string `json:"name"`
Host string `json:"host"`
Port int `json:"port"`
RouteRootPath string `json:"routeRootPath"`
Type constants.ServiceType `json:"type"`
Data map[string]interface{} `json:"data,omitempty"`
}
func (receiver *Service) BaseUrl() string {
if receiver == nil {
return ""
}
switch receiver.Type {
case constants.REST_TYPE:
return fmt.Sprintf("http://%s:%d%s", receiver.Host, receiver.Port, receiver.RouteRootPath)
case constants.GRPC_TYPE:
return fmt.Sprintf("%s:%d", receiver.Host, receiver.Port)
}
return ""
}
type NodeMeta struct {
Services []Service `json:"serviceInfo"`
RegisterAt *time.Time `json:"registerAt"`
GoVer string `json:"goVer"`
GddVer string `json:"gddVer"`
BuildUser string `json:"buildUser"`
BuildTime string `json:"buildTime"`
Weight int `json:"weight"`
}
type delegate struct {
meta NodeMeta
lock sync.Mutex
queue *memberlist.TransmitLimitedQueue
}
func (d *delegate) AddService(service Service) {
d.lock.Lock()
defer d.lock.Unlock()
d.meta.Services = append(d.meta.Services, service)
}
// NodeMeta return user custom node meta data
func (d *delegate) NodeMeta(limit int) []byte {
d.lock.Lock()
defer d.lock.Unlock()
var buf bytes.Buffer
enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
if err := enc.Encode(d.meta); err != nil {
logger.Panic().Err(err).Msg("[go-doudou] Failed to encode node meta data")
}
raw := buf.Bytes()
if len(raw) > limit {
logger.Panic().Msgf("[go-doudou] Node meta data '%v' exceeds length limit of %d bytes", d.meta, limit)
}
return raw
}
// NotifyMsg callback function when received user data message from remote node
func (d *delegate) NotifyMsg(msg []byte) {
d.lock.Lock()
defer d.lock.Unlock()
// TODO
}
// GetBroadcasts get a number of user data broadcasts
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
d.lock.Lock()
defer d.lock.Unlock()
msgs := d.queue.GetBroadcasts(overhead, limit)
return msgs
}
// LocalState also sends user data, but by tcp connection when pushPull-ing state with other node
func (d *delegate) LocalState(join bool) []byte {
return nil
}
// MergeRemoteState gets user data from remote node by tcp connection when pushPull-ing state with other node
func (d *delegate) MergeRemoteState(s []byte, join bool) {
}