/
listener.go
160 lines (134 loc) · 4.31 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Copyright 2020-2022 Matt Layher
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package corerad
import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"time"
"github.com/mdlayher/corerad/internal/system"
"github.com/mdlayher/ndp"
"golang.org/x/sync/errgroup"
)
var (
// deadlineNow causes connection deadlines to trigger immediately.
deadlineNow = time.Unix(1, 0)
// errRetriesExhausted is a sentinel which indicates that receiveRetry failed
// after exhausting its retries.
errRetriesExhausted = errors.New("exhausted receive retries")
)
// A listener instruments a system.Conn and adds retry functionality for
// receiving NDP messages.
type listener struct {
cctx *Context
iface string
c system.Conn
}
// newListener constructs a listener with optional logger and metrics.
func newListener(cctx *Context, iface string, conn system.Conn) *listener {
return &listener{
cctx: cctx,
iface: iface,
c: conn,
}
}
// A message contains information from a single NDP read.
type message struct {
Message ndp.Message
Host netip.Addr
}
// Listen receives NDP messages and invokes onMessage for each until ctx is
// canceled.
func (l *listener) Listen(ctx context.Context, onMessage func(msg message) error) error {
// Ensure the interrupt goroutine is canceled whether or not ctx itself
// is canceled.
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Wait for cancelation and then force any pending reads to time out.
var eg errgroup.Group
eg.Go(func() error {
<-ctx.Done()
if err := l.c.SetReadDeadline(deadlineNow); err != nil {
return fmt.Errorf("failed to interrupt listener: %w", err)
}
return nil
})
defer func() { _ = eg.Wait() }()
for {
// Receive and pass incoming NDP messages to the caller.
m, host, err := l.receiveRetry(ctx)
if err != nil {
if ctx.Err() != nil {
// Context canceled.
return eg.Wait()
}
return fmt.Errorf("failed to read NDP messages: %w", err)
}
msg := message{
Message: m,
// Package ndp used to send net.IPs which have no zone info, but the
// conversion to net/netip always attaches a zone. CoreRAD manages
// this on its own since every advertiser or monitor is already
// bound to a single interface, so clear the zone.
Host: host.WithZone(""),
}
if err := onMessage(msg); err != nil {
return err
}
}
}
// receiveRetry will attempt to read an NDP message from conn until ctx is
// canceled or it exhausts a fixed number of retries.
func (l *listener) receiveRetry(ctx context.Context) (ndp.Message, netip.Addr, error) {
// TODO(mdlayher): consider parameterizing in the future if need be.
const retries = 5
for i := 0; i < retries; i++ {
// Enable cancelation before receiving any messages, if necessary.
if err := ctx.Err(); err != nil {
return nil, netip.Addr{}, err
}
m, cm, host, err := l.c.ReadFrom()
if err != nil {
if cerr := ctx.Err(); cerr != nil {
// Context canceled.
return nil, netip.Addr{}, cerr
}
var nerr net.Error
if errors.As(err, &nerr) && nerr.Timeout() {
// Temporary error or timeout, either back off and retry or
// return if the context is canceled.
select {
case <-ctx.Done():
return nil, netip.Addr{}, ctx.Err()
case <-time.After(time.Duration(i) * 50 * time.Millisecond):
}
continue
}
return nil, netip.Addr{}, err
}
// Ensure this message has a valid hop limit.
if cm.HopLimit != ndp.HopLimit {
l.logf("received NDP message with IPv6 hop limit %d from %s, ignoring", cm.HopLimit, host)
l.cctx.mm.MessagesReceivedInvalidTotal(1.0, l.iface, m.Type().String())
continue
}
return m, host, nil
}
return nil, netip.Addr{}, errRetriesExhausted
}
// logf prints a formatted log with the listener's interface name.
func (l *listener) logf(format string, v ...interface{}) {
l.cctx.ll.Printf(l.iface+": "+format, v...)
}