Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add udp timeout configuration #6982

Merged
3 changes: 3 additions & 0 deletions docs/content/reference/static-configuration/cli-ref.md
Expand Up @@ -171,6 +171,9 @@ ReadTimeout is the maximum duration for reading the entire request, including th
`--entrypoints.<name>.transport.respondingtimeouts.writetimeout`:
WriteTimeout is the maximum duration before timing out writes of the response. If zero, no timeout is set. (Default: ```0```)

`--entrypoints.<name>.udp.timeout`:
Timeout defines how long to wait on an idle session before releasing the related resources. (Default: ```3```)

`--experimental.devplugin.gopath`:
plugin's GOPATH.

Expand Down
3 changes: 3 additions & 0 deletions docs/content/reference/static-configuration/env-ref.md
Expand Up @@ -171,6 +171,9 @@ ReadTimeout is the maximum duration for reading the entire request, including th
`TRAEFIK_ENTRYPOINTS_<NAME>_TRANSPORT_RESPONDINGTIMEOUTS_WRITETIMEOUT`:
WriteTimeout is the maximum duration before timing out writes of the response. If zero, no timeout is set. (Default: ```0```)

`TRAEFIK_ENTRYPOINTS_<NAME>_UDP_TIMEOUT`:
Timeout defines how long to wait on an idle session before releasing the related resources. (Default: ```3```)

`TRAEFIK_EXPERIMENTAL_DEVPLUGIN_GOPATH`:
plugin's GOPATH.

Expand Down
2 changes: 2 additions & 0 deletions docs/content/reference/static-configuration/file.toml
Expand Up @@ -29,6 +29,8 @@
[entryPoints.EntryPoint0.forwardedHeaders]
insecure = true
trustedIPs = ["foobar", "foobar"]
[entryPoints.EntryPoint0.udp]
timeout = 42
[entryPoints.EntryPoint0.http]
middlewares = ["foobar", "foobar"]
[entryPoints.EntryPoint0.http.redirections]
Expand Down
2 changes: 2 additions & 0 deletions docs/content/reference/static-configuration/file.yaml
Expand Up @@ -33,6 +33,8 @@ entryPoints:
- foobar
- foobar
enableHTTP3: true
udp:
timeout: 42
http:
redirections:
entryPoint:
Expand Down
32 changes: 32 additions & 0 deletions docs/content/routing/entrypoints.md
Expand Up @@ -864,3 +864,35 @@ entryPoints:
--entrypoints.websecure.address=:443
--entrypoints.websecure.http.tls.certResolver=leresolver
```

## UDP Options

This whole section is dedicated to options, keyed by entry point, that will apply only to UDP routing.

### Timeout

_Optional, Default=3s_

Timeout defines how long to wait on an idle session before releasing the related resources.
The Timeout value must be greater than zero.

```toml tab="File (TOML)"
[entryPoints.foo]
address = ":8000/udp"

[entryPoints.foo.udp]
timeout = "10s"
```

```yaml tab="File (YAML)"
entryPoints:
foo:
address: ':8000/udp'
udp:
timeout: 10s
```

```bash tab="CLI"
entrypoints.foo.address=:8000/udp
entrypoints.foo.udp.timeout=10s
```
5 changes: 3 additions & 2 deletions docs/content/routing/routers/index.md
Expand Up @@ -982,8 +982,9 @@ So UDP "routers" at this time are pretty much only load-balancers in one form or
It basically means that some state is kept about an ongoing communication between a client and a backend,
notably so that the proxy knows where to forward a response packet from a backend.
As expected, a `timeout` is associated to each of these sessions,
so that they get cleaned out if they go through a period of inactivity longer than a given duration (that is hardcoded to 3 seconds for now).
Making this timeout configurable will be considered later if we get more usage feedback on this matter.
so that they get cleaned out if they go through a period of inactivity longer than a given duration.
Timeout can be configured using the `entryPoints.name.udp.timeout` option as described
under [entry points](../entrypoints/#udp-options).

### EntryPoints

Expand Down
14 changes: 14 additions & 0 deletions pkg/config/static/entrypoints.go
Expand Up @@ -5,6 +5,7 @@ import (
"math"
"strings"

ptypes "github.com/traefik/paerser/types"
"github.com/traefik/traefik/v2/pkg/types"
)

Expand All @@ -16,6 +17,7 @@ type EntryPoint struct {
ForwardedHeaders *ForwardedHeaders `description:"Trust client forwarding headers." json:"forwardedHeaders,omitempty" toml:"forwardedHeaders,omitempty" yaml:"forwardedHeaders,omitempty" export:"true"`
HTTP HTTPConfig `description:"HTTP configuration." json:"http,omitempty" toml:"http,omitempty" yaml:"http,omitempty" export:"true"`
EnableHTTP3 bool `description:"Enable HTTP3." json:"enableHTTP3,omitempty" toml:"enableHTTP3,omitempty" yaml:"enableHTTP3,omitempty" export:"true"`
UDP *UDPConfig `description:"UDP configuration." json:"udp,omitempty" toml:"udp,omitempty" yaml:"udp,omitempty"`
}

// GetAddress strips any potential protocol part of the address field of the
Expand Down Expand Up @@ -46,6 +48,8 @@ func (ep *EntryPoint) SetDefaults() {
ep.Transport = &EntryPointsTransport{}
ep.Transport.SetDefaults()
ep.ForwardedHeaders = &ForwardedHeaders{}
ep.UDP = &UDPConfig{}
ep.UDP.SetDefaults()
}

// HTTPConfig is the HTTP configuration of an entry point.
Expand Down Expand Up @@ -110,3 +114,13 @@ func (t *EntryPointsTransport) SetDefaults() {
t.RespondingTimeouts = &RespondingTimeouts{}
t.RespondingTimeouts.SetDefaults()
}

// UDPConfig is the UDP configuration of an entry point.
type UDPConfig struct {
Timeout ptypes.Duration `description:"Timeout defines how long to wait on an idle session before releasing the related resources." json:"timeout,omitempty" toml:"timeout,omitempty" yaml:"timeout,omitempty"`
}

// SetDefaults sets the default values.
func (u *UDPConfig) SetDefaults() {
u.Timeout = ptypes.Duration(DefaultUDPTimeout)
}
4 changes: 4 additions & 0 deletions pkg/config/static/static_config.go
Expand Up @@ -51,6 +51,10 @@ const (

// DefaultAcmeCAServer is the default ACME API endpoint.
DefaultAcmeCAServer = "https://acme-v02.api.letsencrypt.org/directory"

// DefaultUDPTimeout defines how long to wait by default on an idle session,
// before releasing all resources related to that session.
DefaultUDPTimeout = 3 * time.Second
)

// Configuration is the static configuration.
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/server_entrypoint_udp.go
Expand Up @@ -89,7 +89,8 @@ func NewUDPEntryPoint(cfg *static.EntryPoint) (*UDPEntryPoint, error) {
if err != nil {
return nil, err
}
listener, err := udp.Listen("udp", addr)

listener, err := udp.Listen("udp", addr, time.Duration(cfg.UDP.Timeout))
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/server/server_entrypoint_udp_test.go
Expand Up @@ -14,14 +14,17 @@ import (
)

func TestShutdownUDPConn(t *testing.T) {
entryPoint, err := NewUDPEntryPoint(&static.EntryPoint{
ep := static.EntryPoint{
Address: ":0",
Transport: &static.EntryPointsTransport{
LifeCycle: &static.LifeCycle{
GraceTimeOut: ptypes.Duration(5 * time.Second),
},
},
})
}
ep.SetDefaults()

entryPoint, err := NewUDPEntryPoint(&ep)
require.NoError(t, err)

go entryPoint.Start(context.Background())
Expand Down
25 changes: 14 additions & 11 deletions pkg/udp/conn.go
Expand Up @@ -12,12 +12,6 @@ const receiveMTU = 8192

const closeRetryInterval = 500 * time.Millisecond

// connTimeout determines how long to wait on an idle session,
// before releasing all resources related to that session.
const connTimeout = 3 * time.Second

var timeoutTicker = connTimeout / 10

var errClosedListener = errors.New("udp: listener closed")

// Listener augments a session-oriented Listener over a UDP PacketConn.
Expand All @@ -31,10 +25,18 @@ type Listener struct {
accepting bool

acceptCh chan *Conn // no need for a Once, already indirectly guarded by accepting.

// timeout defines how long to wait on an idle session,
// before releasing its related resources.
timeout time.Duration
}

// Listen creates a new listener.
func Listen(network string, laddr *net.UDPAddr) (*Listener, error) {
func Listen(network string, laddr *net.UDPAddr, timeout time.Duration) (*Listener, error) {
if timeout <= 0 {
return nil, errors.New("timeout should be greater than zero")
}

conn, err := net.ListenUDP(network, laddr)
if err != nil {
return nil, err
Expand All @@ -45,6 +47,7 @@ func Listen(network string, laddr *net.UDPAddr) (*Listener, error) {
acceptCh: make(chan *Conn),
conns: make(map[string]*Conn),
accepting: true,
timeout: timeout,
}

go l.readLoop()
Expand Down Expand Up @@ -179,7 +182,7 @@ func (l *Listener) newConn(rAddr net.Addr) *Conn {
readCh: make(chan []byte),
sizeCh: make(chan int),
doneCh: make(chan struct{}),
timeout: timeoutTicker,
timeout: l.timeout,
}
}

Expand All @@ -206,7 +209,7 @@ type Conn struct {
// that is to say it waits on readCh to receive the slice of bytes that the Read operation wants to read onto.
// The Read operation receives the signal that the data has been written to the slice of bytes through the sizeCh.
func (c *Conn) readLoop() {
ticker := time.NewTicker(c.timeout)
ticker := time.NewTicker(c.timeout / 10)
defer ticker.Stop()

for {
Expand All @@ -216,7 +219,7 @@ func (c *Conn) readLoop() {
c.msgs = append(c.msgs, msg)
case <-ticker.C:
c.muActivity.RLock()
deadline := c.lastActivity.Add(connTimeout)
deadline := c.lastActivity.Add(c.timeout)
c.muActivity.RUnlock()
if time.Now().After(deadline) {
c.Close()
Expand All @@ -236,7 +239,7 @@ func (c *Conn) readLoop() {
c.msgs = append(c.msgs, msg)
case <-ticker.C:
c.muActivity.RLock()
deadline := c.lastActivity.Add(connTimeout)
deadline := c.lastActivity.Add(c.timeout)
c.muActivity.RUnlock()
if time.Now().After(deadline) {
c.Close()
Expand Down
18 changes: 13 additions & 5 deletions pkg/udp/conn_test.go
Expand Up @@ -15,7 +15,7 @@ func TestConsecutiveWrites(t *testing.T) {
addr, err := net.ResolveUDPAddr("udp", ":0")
require.NoError(t, err)

ln, err := Listen("udp", addr)
ln, err := Listen("udp", addr, 3*time.Second)
require.NoError(t, err)
defer func() {
err := ln.Close()
Expand Down Expand Up @@ -77,7 +77,7 @@ func TestListenNotBlocking(t *testing.T) {

require.NoError(t, err)

ln, err := Listen("udp", addr)
ln, err := Listen("udp", addr, 3*time.Second)
require.NoError(t, err)
defer func() {
err := ln.Close()
Expand Down Expand Up @@ -162,6 +162,14 @@ func TestListenNotBlocking(t *testing.T) {
}
}

func TestListenWithZeroTimeout(t *testing.T) {
addr, err := net.ResolveUDPAddr("udp", ":0")
require.NoError(t, err)

_, err = Listen("udp", addr, 0)
assert.Error(t, err)
}

func TestTimeoutWithRead(t *testing.T) {
testTimeout(t, true)
}
Expand All @@ -176,7 +184,7 @@ func testTimeout(t *testing.T, withRead bool) {
addr, err := net.ResolveUDPAddr("udp", ":0")
require.NoError(t, err)

ln, err := Listen("udp", addr)
ln, err := Listen("udp", addr, 3*time.Second)
require.NoError(t, err)
defer func() {
err := ln.Close()
Expand Down Expand Up @@ -212,15 +220,15 @@ func testTimeout(t *testing.T, withRead bool) {

assert.Equal(t, 10, len(ln.conns))

time.Sleep(4 * time.Second)
time.Sleep(ln.timeout + time.Second)
assert.Equal(t, 0, len(ln.conns))
}

func TestShutdown(t *testing.T) {
addr, err := net.ResolveUDPAddr("udp", ":0")
require.NoError(t, err)

l, err := Listen("udp", addr)
l, err := Listen("udp", addr, 3*time.Second)
require.NoError(t, err)

go func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/udp/proxy_test.go
Expand Up @@ -46,7 +46,7 @@ func newServer(t *testing.T, addr string, handler Handler) {
addrL, err := net.ResolveUDPAddr("udp", addr)
require.NoError(t, err)

listener, err := Listen("udp", addrL)
listener, err := Listen("udp", addrL, 3*time.Second)
require.NoError(t, err)

for {
Expand Down