-
Notifications
You must be signed in to change notification settings - Fork 1
/
graph.manager.go
123 lines (87 loc) · 2.01 KB
/
graph.manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package connection
import (
"errors"
ultipa "github.com/ultipa/ultipa-go-sdk/rpc"
"github.com/lrita/cmap"
)
type GraphManager struct {
graphs *cmap.Cmap
}
func NewGraphManager() *GraphManager {
return &GraphManager{
graphs: &cmap.Cmap{},
}
}
func (gm *GraphManager) DeleteGraph(graphName string) {
gm.graphs.Delete(graphName)
}
func (gm *GraphManager) AddGraph(graphName string) {
gm.graphs.LoadOrStore(graphName, &GraphClusterInfo{
Graph: graphName,
})
}
func (gm *GraphManager) GetGraph(graphName string) *GraphClusterInfo {
gci, ok := gm.graphs.Load(graphName)
if ok == false {
return nil
}
return gci.(*GraphClusterInfo)
}
func (gm *GraphManager) GetLeader(graphName string) *Connection {
gci := gm.GetGraph(graphName)
if gci == nil {
return nil
}
return gci.Leader
}
func (gm *GraphManager) SetLeader(graphName string, conn *Connection) {
_, ok := gm.graphs.Load(graphName)
if ok == false {
gm.AddGraph(graphName)
}
gci := gm.GetGraph(graphName)
if gci != nil {
//TODO: concurrent conflict
gci.Leader = conn
}
}
func (gm *GraphManager) ClearFollower(graphName string) {
gci := gm.GetGraph(graphName)
if gci == nil {
return
}
gci.Followers = []*Connection{}
gci.Algos = []*Connection{}
}
func (gm *GraphManager) AddFollower(graphName string, conn *Connection) {
gci := gm.GetGraph(graphName)
if gci == nil {
return
}
gci.Graph = conn.Host
if gci.HasConn(conn) == true {
return
}
gci.Followers = append(gci.Followers, conn)
if conn.HasRole(ultipa.FollowerRole_ROLE_ALGO_EXECUTABLE) {
gci.Algos = append(gci.Algos, conn)
}
}
func (gci *GraphClusterInfo) GetAnalyticConn() (*Connection, error) {
if len(gci.Algos) == 0 {
return nil, errors.New("no Algo/Task Instance Found")
}
gci.LastAlgoIndex++
return gci.Algos[gci.LastAlgoIndex%len(gci.Algos)], nil
}
func (gci *GraphClusterInfo) HasConn(_conn *Connection) bool {
if gci.Leader == _conn {
return true
}
for _, conn := range gci.Followers {
if conn == _conn {
return true
}
}
return false
}