-
Notifications
You must be signed in to change notification settings - Fork 38
/
calls.go
137 lines (113 loc) · 3.25 KB
/
calls.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package router
import (
"encoding/hex"
"sync"
"github.com/nspcc-dev/neofs-node/pkg/services/reputation"
"github.com/nspcc-dev/neofs-node/pkg/services/reputation/common"
"go.uber.org/zap"
)
// routeContext wraps context with additional passed
// route data. It is only used inside Router and is
// not passed in any external methods.
type routeContext struct {
common.Context
passedRoute []common.ServerInfo
}
// NewRouteContext wraps the main context of value passing with its traversal route and epoch.
func NewRouteContext(ctx common.Context, passed []common.ServerInfo) common.Context {
return &routeContext{
Context: ctx,
passedRoute: passed,
}
}
type trustWriter struct {
router *Router
routeCtx *routeContext
routeMtx sync.RWMutex
mServers map[string]common.Writer
}
// InitWriter initializes and returns Writer that sends each value to its next route point.
//
// If ctx was created by NewRouteContext, then the traversed route is taken into account,
// and the value will be sent to its continuation. Otherwise, the route will be laid
// from scratch and the value will be sent to its primary point.
//
// After building a list of remote points of the next leg of the route, the value is sent
// sequentially to all of them. If any transmissions (even all) fail, an error will not
// be returned.
//
// Close of the composed Writer calls Close method on each internal Writer generated in
// runtime and never returns an error.
//
// Always returns nil error.
func (r *Router) InitWriter(ctx common.Context) (common.Writer, error) {
var (
routeCtx *routeContext
ok bool
)
if routeCtx, ok = ctx.(*routeContext); !ok {
routeCtx = &routeContext{
Context: ctx,
passedRoute: []common.ServerInfo{r.localSrvInfo},
}
}
return &trustWriter{
router: r,
routeCtx: routeCtx,
mServers: make(map[string]common.Writer),
}, nil
}
func (w *trustWriter) Write(t reputation.Trust) error {
w.routeMtx.Lock()
defer w.routeMtx.Unlock()
route, err := w.router.routeBuilder.NextStage(w.routeCtx.Epoch(), t, w.routeCtx.passedRoute)
if err != nil {
return err
} else if len(route) == 0 {
route = []common.ServerInfo{nil}
}
for _, remoteInfo := range route {
var key string
if remoteInfo != nil {
key = hex.EncodeToString(remoteInfo.PublicKey())
}
remoteWriter, ok := w.mServers[key]
if !ok {
provider, err := w.router.remoteProvider.InitRemote(remoteInfo)
if err != nil {
w.router.log.Debug("could not initialize writer provider",
zap.String("error", err.Error()),
)
continue
}
// init writer with original context wrapped in routeContext
remoteWriter, err = provider.InitWriter(w.routeCtx.Context)
if err != nil {
w.router.log.Debug("could not initialize writer",
zap.String("error", err.Error()),
)
continue
}
w.mServers[key] = remoteWriter
}
err := remoteWriter.Write(t)
if err != nil {
w.router.log.Debug("could not write the value",
zap.String("error", err.Error()),
)
}
}
return nil
}
func (w *trustWriter) Close() error {
for key, wRemote := range w.mServers {
err := wRemote.Close()
if err != nil {
w.router.log.Debug("could not close remote server writer",
zap.String("key", key),
zap.String("error", err.Error()),
)
}
}
return nil
}