-
Notifications
You must be signed in to change notification settings - Fork 0
/
manager_packets.go
132 lines (108 loc) · 3.5 KB
/
manager_packets.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package cluster
import (
"encoding/json"
"fmt"
"reflect"
"time"
)
func (m *Manager) handlePackets() {
for {
select {
case pm := <-m.ToNode: // incomming from client application
if LogTraffic {
m.log("%s traffic to cluster node: %+v", m.name, pm)
}
err := m.writeClusterNode(pm.Node, pm.Message)
if err != nil {
m.log("%s Failed to write message to remote node. error: %s", m.name, err)
}
case message := <-m.ToCluster: // incomming from client application
if LogTraffic {
m.log("%s traffic to cluster: %+v", m.name, message)
}
err := m.writeCluster(message)
if err != nil {
m.log("%s Failed to write message to remote node. error: %s", m.name, err)
}
case message := <-m.apiRequest: // incomming messages from API
if LogTraffic {
m.log("%s traffic from cluster api: %+v", m.name, message)
}
switch message.Action {
case "reconnect":
case "admin":
}
m.log("%s Cluster API request: %s (%s)", m.name, message.Action, message.Node)
select {
case m.FromClusterAPI <- message:
default:
m.log("%s Unable to write API message to FromClusterAPI. Channel full!", m.name)
}
case message := <-m.internalMessage: // incomming intenal messages (do not leave this library)
switch message.Type {
case "nodeadd":
m.updateQuorum()
case "noderemove":
m.updateQuorum()
case "nodejoin":
m.log("%s Cluster node joined: %s", m.name, message.Node)
select {
case m.NodeJoin <- message.Node: // send node join to client application
default:
}
m.updateQuorum()
case "nodeleave":
m.log("%s Cluster node left: %s (%s)", m.name, message.Node, message.Error)
select {
case m.NodeLeave <- message.Node: // send node join to client application
default:
}
m.updateQuorum()
default:
m.log("%s Unknown internal message %+v", m.name, message)
}
case packet := <-m.incommingPackets: // incomming packets from other cluster nodes
if LogTraffic {
m.log("%s traffic received incomming packet: %+v", m.name, packet)
}
m.connectedNodes.incPackets(packet.Name)
switch packet.DataType {
case "cluster.Auth": // internal use
m.connectedNodes.setStatus(packet.Name, StatusAuthenticating)
case "cluster.packetNodeShutdown": // internal use
m.connectedNodes.setStatus(packet.Name, StatusShutdown)
m.log("%s Got exit notice from node %s (shutdown)", m.name, packet.Name)
m.connectedNodes.close(packet.Name)
case "cluster.packetPing": // internal use
m.log("%s Got ping from node %s (%v)", m.name, packet.Name, time.Now().Sub(packet.Time))
m.connectedNodes.setLag(packet.Name, time.Now().Sub(packet.Time))
default:
m.log("%s Recieved non-cluster packet: %s", m.name, packet.DataType)
select {
case m.FromCluster <- packet: // outgoing to client application
default:
m.log("%s unable to send data to FromCluster channel, channel full!", m.name)
}
}
}
}
}
func (m *Manager) newPacket(dataMessage interface{}) ([]byte, error) {
val := reflect.Indirect(reflect.ValueOf(dataMessage))
packet := &Packet{
Name: m.name,
DataType: fmt.Sprintf("%s", val.Type()),
Time: time.Now(),
}
data, err := json.Marshal(dataMessage)
if err != nil {
m.log("%s Unable to jsonfy data: %s", m.name, err)
}
packet.DataMessage = string(data)
packetData, err := json.Marshal(packet)
if err != nil {
m.log("%s Unable to create json packet: %s", m.name, err)
}
packetData = append(packetData, 10) // 10 = newline
return packetData, err
}