-
Notifications
You must be signed in to change notification settings - Fork 0
/
http.go
190 lines (155 loc) · 4.84 KB
/
http.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package forwarder
import (
"context"
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/saiya/mesh_for_home_server/config"
"github.com/saiya/mesh_for_home_server/interfaces"
"github.com/saiya/mesh_for_home_server/logger"
"github.com/saiya/mesh_for_home_server/peering/messagewindow"
"github.com/saiya/mesh_for_home_server/peering/proto/generated"
"golang.org/x/exp/maps"
)
type httpForwarder struct {
router interfaces.Router
closeRouterListen func()
reqIDGen int64 // This ID must be unique within httpForwarder, not httpForwardingRountTripper
listenersLock sync.Mutex
listeners map[httpForwarderListnerID]*httpForwardingSession
}
type httpForwarderListnerID struct {
peer config.NodeID
reqID int64
}
const httpReqBodyChunkSize = 512 * 1024
func NewHTTPForwarder(router interfaces.Router) interfaces.HTTPForwarder {
fw := &httpForwarder{
router: router,
listeners: make(map[httpForwarderListnerID]*httpForwardingSession),
}
fw.closeRouterListen = router.Listen(func(ctx context.Context, from config.NodeID, msg interfaces.Message) error {
if http := msg.GetHttp(); http != nil {
listenerID := httpForwarderListnerID{from, http.Identity.RequestId}
fw.listenersLock.Lock()
listener := fw.listeners[listenerID]
fw.listenersLock.Unlock()
// Note: listener can be nil (e.g. this server operating both egress & ingress; this ingress forwarder won't handle egress messages)
if listener != nil {
return listener.handle(http)
}
}
return nil
})
return fw
}
func (fw *httpForwarder) Close(ctx context.Context) error {
fw.closeRouterListen()
fw.listenersLock.Lock()
listeners := maps.Values(fw.listeners)
fw.listenersLock.Unlock()
for _, listener := range listeners {
listener.Close()
}
return nil
}
func (fw *httpForwarder) newSession(headerTimeout, bodyTimeout time.Duration) *httpForwardingSession {
fwc := &httpForwardingSession{
fw: fw,
reqID: atomic.AddInt64(&fw.reqIDGen, +1),
msgOrder: 0,
msgWindow: messagewindow.NewMessageWindow[int64, *generated.HttpMessage](),
from: fw.router.NodeID(),
headerTimeout: headerTimeout,
bodyTimeout: bodyTimeout,
}
return fwc
}
func (fw *httpForwarder) NewRoundTripper(cfg *config.HTTPIngressConfig) http.RoundTripper {
return &httpRoundTripper{
fw, fw.router,
cfg.ResponseTimeout.HeaderTimeout(), cfg.ResponseTimeout.BodyTimeout(),
}
}
func (fw *httpForwarder) addListener(fwc *httpForwardingSession) {
fw.listenersLock.Lock()
defer fw.listenersLock.Unlock()
fw.listeners[httpForwarderListnerID{fwc.dest, fwc.reqID}] = fwc
}
func (fw *httpForwarder) removeListener(fwc *httpForwardingSession) {
fw.listenersLock.Lock()
defer fw.listenersLock.Unlock()
delete(fw.listeners, httpForwarderListnerID{fwc.dest, fwc.reqID})
}
type httpForwardingSession struct {
fw *httpForwarder
reqID int64
msgOrder int64
msgWindow messagewindow.MessageWindow[int64, *generated.HttpMessage]
from config.NodeID
dest config.NodeID
headerTimeout time.Duration
bodyTimeout time.Duration
reaperLock sync.Mutex
reaperTimer *time.Timer
reaperTimerClosed chan struct{}
}
// Close completely this session.
// This method make sure all resources of this session to be freed.
func (fwc *httpForwardingSession) Close() {
fwc.stopReaper(false)
fwc.msgWindow.Close()
fwc.fw.removeListener(fwc)
}
func (fwc *httpForwardingSession) startListener() {
fwc.fw.addListener(fwc)
fwc.resetReaper(fwc.headerTimeout)
}
func (fwc *httpForwardingSession) handle(msg *generated.HttpMessage) error {
if err := fwc.msgWindow.Send(msg.Identity.MsgOrder, msg); err != nil {
return fmt.Errorf("failed to handle HTTP message from peer: %v", err)
}
fwc.resetReaper(fwc.bodyTimeout)
return nil
}
// NextMsgID issues message identity. Returns unique & sequential identity for each call.
func (fwc *httpForwardingSession) NextMsgID() *generated.HttpMessageIdentity {
id := &generated.HttpMessageIdentity{
RequestId: fwc.reqID,
MsgOrder: fwc.msgOrder,
}
fwc.msgOrder++
return id
}
func (fwc *httpForwardingSession) stopReaper(alreadyLocked bool) {
if !alreadyLocked {
fwc.reaperLock.Lock()
defer fwc.reaperLock.Unlock()
}
if fwc.reaperTimer != nil {
fwc.reaperTimer.Stop()
fwc.reaperTimer = nil
close(fwc.reaperTimerClosed)
fwc.reaperTimerClosed = nil
}
}
func (fwc *httpForwardingSession) resetReaper(timeout time.Duration) {
fwc.reaperLock.Lock()
defer fwc.reaperLock.Unlock()
fwc.stopReaper(true)
timer := time.NewTimer(timeout)
fwc.reaperTimer = timer
closed := make(chan struct{})
fwc.reaperTimerClosed = closed
go func() {
select {
case <-closed:
return
case <-timer.C:
logger.Get().Warnw("HTTP forwarder timeout, terminating session forcibly", "req-id", fwc.reqID)
fwc.Close() // Internally locks reaperLock again, but won't cause deadlock because this goroutine don't have the lock
}
}()
}