forked from the-anna-project/annad
/
service.go
218 lines (184 loc) · 6.99 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package forwarder
import (
"encoding/json"
"fmt"
"github.com/the-anna-project/annad/object/networkpayload"
objectspec "github.com/the-anna-project/spec/object"
servicespec "github.com/the-anna-project/spec/service"
storagecollection "github.com/the-anna-project/storage/collection"
)
// New creates a new forwarder service.
func New() servicespec.ForwarderService {
return &service{}
}
type service struct {
// Dependencies.
serviceCollection servicespec.ServiceCollection
// Settings.
metadata map[string]string
// maxSignals represents the maximum number of signals being forwarded by one
// CLG. When a requested CLG needs to decide where to forward signals to, it
// may will forward up to maxSignals signals to other CLGs, if any.
maxSignals int
}
func (s *service) Boot() {
id, err := s.Service().ID().New()
if err != nil {
panic(err)
}
s.metadata = map[string]string{
"id": id,
"name": "forwarder",
"type": "service",
}
// TODO constant for magic number
// TODO make configurable
s.maxSignals = 5
}
func (s *service) Forward(CLG servicespec.CLGService, networkPayload objectspec.NetworkPayload) error {
s.Service().Log().Line("func", "Forward")
// This is the list of lookup functions which is executed seuqentially.
lookups := []func(CLG servicespec.CLGService, networkPayload objectspec.NetworkPayload) ([]objectspec.NetworkPayload, error){
s.GetNetworkPayloads,
s.NewNetworkpayloads,
}
// Execute one lookup after another. As soon as we find some behaviour IDs, we
// use them to forward the given network payload.
var newNetworkPayloads []objectspec.NetworkPayload
var err error
for _, lookup := range lookups {
newNetworkPayloads, err = lookup(CLG, networkPayload)
if IsNetworkPayloadsNotFound(err) {
// There could no behaviour IDs be found by this lookup. Go on and try the
// next one.
continue
} else if err != nil {
return maskAny(err)
}
// The current lookup was successful. We do not need to execute any further
// lookup, but can go on with the behaviour IDs found.
break
}
// TODO create event collection with network service to handle network events.
//
// Forward the found network payloads to other CLGs by adding them to the
// queue so other processes can fetch them.
for _, np := range newNetworkPayloads {
networkPayloadKey := fmt.Sprintf("events:network-payload")
b, err := json.Marshal(np)
if err != nil {
return maskAny(err)
}
// TODO store asynchronuously
err = s.Service().Storage().General().PushToSet(networkPayloadKey, string(b))
if err != nil {
return maskAny(err)
}
}
return nil
}
func (s *service) MaxSignals() int {
return s.maxSignals
}
func (s *service) GetNetworkPayloads(CLG servicespec.CLGService, networkPayload objectspec.NetworkPayload) ([]objectspec.NetworkPayload, error) {
ctx := networkPayload.GetContext()
// Check if there are behaviour IDs known that we can use to forward the
// current network payload to.
behaviourID, ok := ctx.GetBehaviourID()
if !ok {
return nil, maskAnyf(invalidBehaviourIDError, "must not be empty")
}
behaviourIDsKey := fmt.Sprintf("forward:configuration:behaviour-id:%s:behaviour-ids", behaviourID)
newBehaviourIDs, err := s.Service().Storage().General().GetAllFromSet(behaviourIDsKey)
if storagecollection.IsNotFound(err) {
// No configuration of behaviour IDs is stored. Thus we return an error.
// Eventually some other lookup is able to find sufficient network payloads.
return nil, maskAny(networkPayloadsNotFoundError)
} else if err != nil {
return nil, maskAny(err)
}
// Create a list of new network payloads.
var newNetworkPayloads []objectspec.NetworkPayload
for _, behaviourID := range newBehaviourIDs {
// Prepare a new context for the new network payload.
newCtx := ctx.Clone()
newCtx.SetBehaviourID(behaviourID)
// Create a new network payload.
newNetworkPayloadConfig := networkpayload.DefaultConfig()
newNetworkPayloadConfig.Args = networkPayload.GetArgs()
newNetworkPayloadConfig.Context = newCtx
newNetworkPayloadConfig.Destination = string(behaviourID)
newNetworkPayloadConfig.Sources = []string{networkPayload.GetDestination()}
newNetworkPayload, err := networkpayload.New(newNetworkPayloadConfig)
if err != nil {
return nil, maskAny(err)
}
newNetworkPayloads = append(newNetworkPayloads, newNetworkPayload)
}
return newNetworkPayloads, nil
}
func (s *service) Metadata() map[string]string {
return s.metadata
}
func (s *service) NewNetworkpayloads(CLG servicespec.CLGService, networkPayload objectspec.NetworkPayload) ([]objectspec.NetworkPayload, error) {
ctx := networkPayload.GetContext()
// Decide how many new behaviour IDs should be created. This defines the
// number of signals being forwarded to other CLGs. Here we want to make a
// pseudo random decision. CreateMax takes a max paramater which is exclusive.
// Therefore we increment the configuration for the maximum signals desired by
// one, to reflect the maximum setting properly.
maxSignals, err := s.Service().Random().CreateMax(s.MaxSignals() + 1)
if err != nil {
return nil, maskAny(err)
}
// Create the desired number of behaviour IDs.
var newBehaviourIDs []string
for i := 0; i < maxSignals; i++ {
newBehaviourID, err := s.Service().ID().New()
if err != nil {
return nil, maskAny(err)
}
newBehaviourIDs = append(newBehaviourIDs, string(newBehaviourID))
}
// TODO find a CLG name that can be connected to the current CLG for each new
// behaviour ID and pair these combinations (network event tracker)
// Store each new behaviour ID in the underlying storage.
behaviourID, ok := ctx.GetBehaviourID()
if !ok {
return nil, maskAnyf(invalidBehaviourIDError, "must not be empty")
}
behaviourIDsKey := fmt.Sprintf("forward:configuration:behaviour-id:%s:behaviour-ids", behaviourID)
for _, behaviourID := range newBehaviourIDs {
// TODO store asynchronuously
err = s.Service().Storage().General().PushToSet(behaviourIDsKey, behaviourID)
if err != nil {
return nil, maskAny(err)
}
}
// Create a list of new network payloads.
var newNetworkPayloads []objectspec.NetworkPayload
for _, behaviourID := range newBehaviourIDs {
// Prepare a new context for the new network payload.
newCtx := ctx.Clone()
newCtx.SetBehaviourID(behaviourID)
// TODO set the paired CLG name to the new context
// Create a new network payload.
newNetworkPayloadConfig := networkpayload.DefaultConfig()
newNetworkPayloadConfig.Args = networkPayload.GetArgs()
newNetworkPayloadConfig.Context = newCtx
newNetworkPayloadConfig.Destination = string(behaviourID)
newNetworkPayloadConfig.Sources = []string{networkPayload.GetDestination()}
newNetworkPayload, err := networkpayload.New(newNetworkPayloadConfig)
if err != nil {
return nil, maskAny(err)
}
newNetworkPayloads = append(newNetworkPayloads, newNetworkPayload)
}
return newNetworkPayloads, nil
}
func (s *service) Service() servicespec.ServiceCollection {
return s.serviceCollection
}
func (s *service) SetServiceCollection(sc servicespec.ServiceCollection) {
s.serviceCollection = sc
}