-
Notifications
You must be signed in to change notification settings - Fork 1
/
switch.go
158 lines (134 loc) · 3.32 KB
/
switch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package switchr
import (
"errors"
"fmt"
"net/netip"
"runtime"
"github.com/mycoria/mycoria/config"
"github.com/mycoria/mycoria/frame"
"github.com/mycoria/mycoria/m"
"github.com/mycoria/mycoria/mgr"
"github.com/mycoria/mycoria/peering"
"github.com/mycoria/mycoria/state"
)
// Switch handles packets based on switch labels.
type Switch struct {
input chan frame.Frame
routerInput chan frame.Frame
instance instance
}
// instance is an interface subset of inst.Ance.
type instance interface {
Version() string
Config() *config.Config
Identity() *m.Address
State() *state.State
Peering() *peering.Peering
}
// New returns a new switch.
func New(instance instance, upstreamHandler chan frame.Frame) *Switch {
return &Switch{
input: make(chan frame.Frame),
routerInput: upstreamHandler,
instance: instance,
}
}
// Start starts the switch.
func (s *Switch) Start(mgr *mgr.Manager) error {
for i := 0; i < runtime.NumCPU(); i++ {
mgr.Go("switch", s.handler)
}
return nil
}
// Stop stops the switch.
func (s *Switch) Stop(mgr *mgr.Manager) error {
return nil
}
// Input returns the input channel for the switch.
func (s *Switch) Input() chan frame.Frame {
return s.input
}
func (s *Switch) handler(w *mgr.WorkerCtx) error {
for {
select {
case f := <-s.input:
if err := s.handleFrame(f); err != nil {
w.Debug(
"failed to handle frame",
"router", f.SrcIP(),
"err", err,
)
}
case <-w.Done():
return nil
}
}
}
func (s *Switch) handleFrame(f frame.Frame) error {
// Ignore packets coming from myself.
if f.SrcIP() == s.instance.Identity().IP {
return nil
}
// Get switch block.
switchBlock := f.SwitchBlock()
if len(switchBlock) == 0 {
return s.escalateFrame(f)
}
// Get recv link.
recvLink := f.RecvLink()
if recvLink == nil {
return errors.New("missing recv link")
}
// Rotate switch block.
nextHopLabel, err := m.NextRotateSwitchBlock(switchBlock, recvLink.SwitchLabel())
if err != nil {
return fmt.Errorf("rotate switch block: %w", err)
}
// Check if we are the destination.
if nextHopLabel == 0 {
return s.escalateFrame(f)
}
// Forward frame to next hop.
return s.ForwardByLabel(f, nextHopLabel)
}
func (s *Switch) escalateFrame(f frame.Frame) error {
select {
case s.routerInput <- f:
default:
}
return nil
}
// ForwardByLabel forwards a frame by the given switch label.
func (s *Switch) ForwardByLabel(f frame.Frame, nextHopLabel m.SwitchLabel) error {
// Get link by switch label.
link := s.instance.Peering().GetLinkByLabel(nextHopLabel)
if link == nil {
return errors.New("next hop unavailable")
}
return s.forwardToLink(f, link)
}
// ForwardByPeer forwards a frame by the given peer IP.
func (s *Switch) ForwardByPeer(f frame.Frame, peerIP netip.Addr) error {
// Get link by switch label.
link := s.instance.Peering().GetLink(peerIP)
if link == nil {
return errors.New("next hop unavailable")
}
return s.forwardToLink(f, link)
}
func (s *Switch) forwardToLink(f frame.Frame, link peering.Link) error {
// Decrease and check TTL.
f.ReduceTTL(1)
if f.TTL() == 0 {
return errors.New("TTL expired")
}
// Add flow control flag.
if recvLink := f.RecvLink(); recvLink != nil {
f.SetFlowFlag(recvLink.FlowControlIndicator())
}
// Forward message.
if f.MessageType().IsPriority() {
return link.SendPriority(f)
}
return link.Send(f)
}