/
listener.go
186 lines (155 loc) · 5.51 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package xlistener
import (
"crypto/tls"
"net"
"strconv"
"sync"
"syscall"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/go-kit/kit/metrics/discard"
"github.com/jithin-kg/webpa-common/logging"
"github.com/jithin-kg/webpa-common/xmetrics"
)
var (
// netListen is the factory function for creating a net.Listener. Defaults to net.Listen. Only tests would change this variable.
netListen = net.Listen
// tlsListen is the factory function for creating a tls.Listener. Defaults to tls.Listen. Only tests would change this variable.
tlsListen = tls.Listen
)
// Options defines the available options for configuring a listener
type Options struct {
// Logger is the go-kit logger to use for output. If unset, logging.DefaultLogger() is used.
Logger log.Logger
// MaxConnections is the maximum number of active connections the listener will permit. If this
// value is not positive, there is no limit to the number of connections.
MaxConnections int
// Rejected is is incremented each time the listener rejects a connection. If unset, a go-kit discard Counter is used.
Rejected xmetrics.Adder
// Active is updated to reflect the current number of active connections. If unset, a go-kit discard Gauge is used.
Active xmetrics.Adder
// Network is the network to listen on. This value is only used if Next is unset. Defaults to "tcp" if unset.
Network string
// Address is the address to listen on. This value is only used if Next is unset. Defaults to ":http" if unset.
Address string
// Next is the net.Listener to decorate. If this field is set, Network and Address are ignored.
Next net.Listener
Config *tls.Config
}
// New constructs a new net.Listener using a set of options.
//
// If Next is set, that listener is decorated with connection limiting and other options specfied in Options.
// Otherwise, a new net.Listener is created, and that new listener is decorated. Note that in the case
// where this function creates a new net.Listener, that listener will be occupying a port and should be cleaned
// up via Close() if higher level errors occur.
func New(o Options) (net.Listener, error) {
if o.Logger == nil {
o.Logger = logging.DefaultLogger()
}
var semaphore chan struct{}
if o.MaxConnections > 0 {
semaphore = make(chan struct{}, o.MaxConnections)
}
if o.Rejected == nil {
o.Rejected = discard.NewCounter()
}
if o.Active == nil {
o.Active = discard.NewGauge()
}
next := o.Next
if next == nil {
if len(o.Network) == 0 {
o.Network = "tcp"
}
if len(o.Address) == 0 {
o.Address = ":http"
}
var err error
if o.Config != nil {
next, err = tlsListen(o.Network, o.Address, o.Config)
} else {
next, err = netListen(o.Network, o.Address)
}
if err != nil {
return nil, err
}
}
return &listener{
Listener: next,
logger: log.With(o.Logger, "listenNetwork", next.Addr().Network(), "listenAddress", next.Addr().String()),
semaphore: semaphore,
rejected: xmetrics.NewIncrementer(o.Rejected),
active: o.Active,
}, nil
}
// listener decorates a net.Listener with metrics and optional maximum connection enforcement
type listener struct {
net.Listener
logger log.Logger
semaphore chan struct{}
rejected xmetrics.Incrementer
active xmetrics.Adder
}
// acquire attempts to obtain a semaphore resource. If the semaphore has not been set (i.e. no maximum connections),
// this method immediately returns true. Otherwise, the semaphore must be immediately acquired or this method returns false.
// In all cases, the active connections gauge is updated if appropriate.
func (l *listener) acquire() bool {
if l.semaphore == nil {
l.active.Add(1.0)
return true
}
select {
case l.semaphore <- struct{}{}:
l.active.Add(1.0)
return true
default:
return false
}
}
// release returns a semaphore resource to the pool, if set. This method also decrements the active connection gauge.
func (l *listener) release() {
l.active.Add(-1.0)
if l.semaphore != nil {
<-l.semaphore
}
}
// Accept invokes the delegate net.Listener's Accept method, then attempts to acquire the semaphore.
// If the semaphore was set and could not be acquired, the accepted connection is immediately closed.
func (l *listener) Accept() (net.Conn, error) {
for {
c, err := l.Listener.Accept()
if err != nil {
sysValue := ""
if errno, ok := err.(syscall.Errno); ok {
sysValue = "0x" + strconv.FormatInt(int64(errno), 16)
}
l.logger.Log(level.Key(), level.ErrorValue(), logging.MessageKey(), "failed to accept connection", logging.ErrorKey(), err, "sysValue", sysValue)
if err == syscall.ENFILE {
l.logger.Log(level.Key(), level.ErrorValue(), logging.MessageKey(), "ENFILE received. translating to EMFILE")
return nil, syscall.EMFILE
}
return nil, err
}
if !l.acquire() {
l.logger.Log(level.Key(), level.ErrorValue(), logging.MessageKey(), "rejected connection", "remoteAddress", c.RemoteAddr().String())
l.rejected.Inc()
c.Close()
continue
}
l.logger.Log(level.Key(), level.DebugValue(), logging.MessageKey(), "accepted connection", "remoteAddress", c.RemoteAddr().String())
return &conn{Conn: c, release: l.release}, nil
}
}
// conn is a decorated net.Conn that supplies feedback to a listener when the connection is closed.
type conn struct {
net.Conn
releaseOnce sync.Once
release func()
}
// Close closes the decorated connection and invokes release on the listener that created it. The release
// operation is idempotent.
func (c *conn) Close() error {
err := c.Conn.Close()
c.releaseOnce.Do(c.release)
return err
}