Skip to content

Commit

Permalink
Add KeepAliveMaxTime and KeepAliveMaxRequests features to entrypoints
Browse files Browse the repository at this point in the history
  • Loading branch information
juliens committed Jan 2, 2024
1 parent 3dfaa3d commit 9662cdc
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 7 deletions.
6 changes: 6 additions & 0 deletions docs/content/reference/static-configuration/cli-ref.md
Expand Up @@ -171,6 +171,12 @@ Trust all. (Default: ```false```)
`--entrypoints.<name>.proxyprotocol.trustedips`:
Trust only selected IPs.

`--entrypoints.<name>.transport.keepalivemaxrequests`:
Maximum number of requests before closing a keep-alive connection. (Default: ```0```)

`--entrypoints.<name>.transport.keepalivemaxtime`:
Maximum duration before closing a keep-alive connection. (Default: ```0```)

`--entrypoints.<name>.transport.lifecycle.gracetimeout`:
Duration to give active requests a chance to finish before Traefik stops. (Default: ```10```)

Expand Down
6 changes: 6 additions & 0 deletions docs/content/reference/static-configuration/env-ref.md
Expand Up @@ -171,6 +171,12 @@ Trust all. (Default: ```false```)
`TRAEFIK_ENTRYPOINTS_<NAME>_PROXYPROTOCOL_TRUSTEDIPS`:
Trust only selected IPs.

`TRAEFIK_ENTRYPOINTS_<NAME>_TRANSPORT_KEEPALIVEMAXREQUESTS`:
Maximum number of requests before closing a keep-alive connection. (Default: ```0```)

`TRAEFIK_ENTRYPOINTS_<NAME>_TRANSPORT_KEEPALIVEMAXTIME`:
Maximum duration before closing a keep-alive connection. (Default: ```0```)

`TRAEFIK_ENTRYPOINTS_<NAME>_TRANSPORT_LIFECYCLE_GRACETIMEOUT`:
Duration to give active requests a chance to finish before Traefik stops. (Default: ```10```)

Expand Down
2 changes: 2 additions & 0 deletions docs/content/reference/static-configuration/file.toml
Expand Up @@ -15,6 +15,8 @@
[entryPoints.EntryPoint0]
address = "foobar"
[entryPoints.EntryPoint0.transport]
keepAliveMaxRequests = 42
keepAliveMaxTime = "42s"
[entryPoints.EntryPoint0.transport.lifeCycle]
requestAcceptGraceTimeout = "42s"
graceTimeOut = "42s"
Expand Down
2 changes: 2 additions & 0 deletions docs/content/reference/static-configuration/file.yaml
Expand Up @@ -15,6 +15,8 @@ entryPoints:
EntryPoint0:
address: foobar
transport:
keepAliveMaxRequests: 42
keepAliveMaxTime: 42s
lifeCycle:
requestAcceptGraceTimeout: 42s
graceTimeOut: 42s
Expand Down
70 changes: 65 additions & 5 deletions docs/content/routing/entrypoints.md
Expand Up @@ -589,17 +589,77 @@ Controls the behavior of Traefik during the shutdown phase.
--entryPoints.name.transport.lifeCycle.graceTimeOut=42
```

#### `keepAliveMaxRequests`

_Optional, Default=0_

The maximum number of requests Traefik can handle before sending a `Connection: Close` header to the client (for HTTP2, Traefik sends a GOAWAY). Zero means no limit.

```yaml tab="File (YAML)"
## Static configuration
entryPoints:
name:
address: ":8888"
transport:
keepAliveMaxRequests: 42
```

```toml tab="File (TOML)"
## Static configuration
[entryPoints]
[entryPoints.name]
address = ":8888"
[entryPoints.name.transport]
keepAliveMaxRequests = 42
```

```bash tab="CLI"
## Static configuration
--entryPoints.name.address=:8888
--entryPoints.name.transport.keepAliveRequests=42
```

#### `keepAliveMaxTime`

_Optional, Default=0s_

The maximum duration Traefik can handle requests before sending a `Connection: Close` header to the client (for HTTP2, Traefik sends a GOAWAY). Zero means no limit.

```yaml tab="File (YAML)"
## Static configuration
entryPoints:
name:
address: ":8888"
transport:
keepAliveMaxTime: 42s
```

```toml tab="File (TOML)"
## Static configuration
[entryPoints]
[entryPoints.name]
address = ":8888"
[entryPoints.name.transport]
keepAliveMaxTime = 42s
```

```bash tab="CLI"
## Static configuration
--entryPoints.name.address=:8888
--entryPoints.name.transport.keepAliveTime=42s
```

### ProxyProtocol

Traefik supports [ProxyProtocol](https://www.haproxy.org/download/2.0/doc/proxy-protocol.txt) version 1 and 2.
Traefik supports [PROXY protocol](https://www.haproxy.org/download/2.0/doc/proxy-protocol.txt) version 1 and 2.

If Proxy Protocol header parsing is enabled for the entry point, this entry point can accept connections with or without Proxy Protocol headers.
If PROXY protocol header parsing is enabled for the entry point, this entry point can accept connections with or without PROXY protocol headers.

If the Proxy Protocol header is passed, then the version is determined automatically.
If the PROXY protocol header is passed, then the version is determined automatically.

??? info "`proxyProtocol.trustedIPs`"

Enabling Proxy Protocol with Trusted IPs.
Enabling PROXY protocol with Trusted IPs.

```yaml tab="File (YAML)"
## Static configuration
Expand Down Expand Up @@ -662,7 +722,7 @@ If the Proxy Protocol header is passed, then the version is determined automatic

!!! warning "Queuing Traefik behind Another Load Balancer"

When queuing Traefik behind another load-balancer, make sure to configure Proxy Protocol on both sides.
When queuing Traefik behind another load-balancer, make sure to configure PROXY protocol on both sides.
Not doing so could introduce a security risk in your system (enabling request forgery).

## HTTP Options
Expand Down
6 changes: 4 additions & 2 deletions pkg/config/static/entrypoints.go
Expand Up @@ -122,8 +122,10 @@ type EntryPoints map[string]*EntryPoint

// EntryPointsTransport configures communication between clients and Traefik.
type EntryPointsTransport struct {
LifeCycle *LifeCycle `description:"Timeouts influencing the server life cycle." json:"lifeCycle,omitempty" toml:"lifeCycle,omitempty" yaml:"lifeCycle,omitempty" export:"true"`
RespondingTimeouts *RespondingTimeouts `description:"Timeouts for incoming requests to the Traefik instance." json:"respondingTimeouts,omitempty" toml:"respondingTimeouts,omitempty" yaml:"respondingTimeouts,omitempty" export:"true"`
LifeCycle *LifeCycle `description:"Timeouts influencing the server life cycle." json:"lifeCycle,omitempty" toml:"lifeCycle,omitempty" yaml:"lifeCycle,omitempty" export:"true"`
RespondingTimeouts *RespondingTimeouts `description:"Timeouts for incoming requests to the Traefik instance." json:"respondingTimeouts,omitempty" toml:"respondingTimeouts,omitempty" yaml:"respondingTimeouts,omitempty" export:"true"`
KeepAliveMaxTime ptypes.Duration `description:"Maximum duration before closing a keep-alive connection." json:"keepAliveMaxTime,omitempty" toml:"keepAliveMaxTime,omitempty" yaml:"keepAliveMaxTime,omitempty" export:"true"`
KeepAliveMaxRequests int `description:"Maximum number of requests before closing a keep-alive connection." json:"keepAliveMaxRequests,omitempty" toml:"keepAliveMaxRequests,omitempty" yaml:"keepAliveMaxRequests,omitempty" export:"true"`
}

// SetDefaults sets the default values.
Expand Down
29 changes: 29 additions & 0 deletions pkg/server/keep_alive_middleware.go
@@ -0,0 +1,29 @@
package server

import (
"net/http"
"time"

ptypes "github.com/traefik/paerser/types"
"github.com/traefik/traefik/v2/pkg/log"
)

func newKeepAliveMiddleware(next http.Handler, maxRequests int, maxTime ptypes.Duration) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
state, ok := req.Context().Value(connStateKey).(*connState)
if ok {
state.HTTPRequestCount++
if maxRequests > 0 && state.HTTPRequestCount >= maxRequests {
log.WithoutContext().Debug("Close because of too many requests")
state.KeepAliveState = "Close because of too many requests"
rw.Header().Set("Connection", "close")
}
if maxTime > 0 && time.Now().After(state.Start.Add(time.Duration(maxTime))) {
log.WithoutContext().Debug("Close because of too long connection")
state.KeepAliveState = "Close because of too long connection"
rw.Header().Set("Connection", "close")
}
}
next.ServeHTTP(rw, req)
})
}
56 changes: 56 additions & 0 deletions pkg/server/server_entrypoint_tcp.go
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"errors"
"expvar"
"fmt"
stdlog "log"
"net"
Expand Down Expand Up @@ -34,6 +35,25 @@ import (

var httpServerLogger = stdlog.New(log.WithoutContext().WriterLevel(logrus.DebugLevel), "", 0)

type key string

const (
connStateKey key = "connState"
debugConnectionEnv string = "DEBUG_CONNECTION"
)

var (
clientConnectionStates = map[string]*connState{}
clientConnectionStatesMu = sync.RWMutex{}
)

type connState struct {
State string
KeepAliveState string
Start time.Time
HTTPRequestCount int
}

type httpForwarder struct {
net.Listener
connChan chan net.Conn
Expand Down Expand Up @@ -68,6 +88,12 @@ type TCPEntryPoints map[string]*TCPEntryPoint

// NewTCPEntryPoints creates a new TCPEntryPoints.
func NewTCPEntryPoints(entryPointsConfig static.EntryPoints, hostResolverConfig *types.HostResolverConfig) (TCPEntryPoints, error) {
if os.Getenv(debugConnectionEnv) != "" {
expvar.Publish("clientConnectionStates", expvar.Func(func() any {
return clientConnectionStates
}))
}

serverEntryPointsTCP := make(TCPEntryPoints)
for entryPointName, config := range entryPointsConfig {
protocol, err := config.GetProtocol()
Expand Down Expand Up @@ -548,13 +574,39 @@ func createHTTPServer(ctx context.Context, ln net.Listener, configuration *stati
})
}

debugConnection := os.Getenv(debugConnectionEnv) != ""
if debugConnection || (configuration.Transport != nil && (configuration.Transport.KeepAliveMaxTime > 0 || configuration.Transport.KeepAliveMaxRequests > 0)) {
handler = newKeepAliveMiddleware(handler, configuration.Transport.KeepAliveMaxRequests, configuration.Transport.KeepAliveMaxTime)
}

serverHTTP := &http.Server{
Handler: handler,
ErrorLog: httpServerLogger,
ReadTimeout: time.Duration(configuration.Transport.RespondingTimeouts.ReadTimeout),
WriteTimeout: time.Duration(configuration.Transport.RespondingTimeouts.WriteTimeout),
IdleTimeout: time.Duration(configuration.Transport.RespondingTimeouts.IdleTimeout),
}
if debugConnection || (configuration.Transport != nil && (configuration.Transport.KeepAliveMaxTime > 0 || configuration.Transport.KeepAliveMaxRequests > 0)) {
serverHTTP.ConnContext = func(ctx context.Context, c net.Conn) context.Context {
cState := &connState{Start: time.Now()}
if debugConnection {
clientConnectionStatesMu.Lock()
clientConnectionStates[getConnKey(c)] = cState
clientConnectionStatesMu.Unlock()
}
return context.WithValue(ctx, connStateKey, cState)
}

if debugConnection {
serverHTTP.ConnState = func(c net.Conn, state http.ConnState) {
clientConnectionStatesMu.Lock()
if clientConnectionStates[getConnKey(c)] != nil {
clientConnectionStates[getConnKey(c)].State = state.String()
}
clientConnectionStatesMu.Unlock()
}
}
}

// ConfigureServer configures HTTP/2 with the MaxConcurrentStreams option for the given server.
// Also keeping behavior the same as
Expand Down Expand Up @@ -584,6 +636,10 @@ func createHTTPServer(ctx context.Context, ln net.Listener, configuration *stati
}, nil
}

func getConnKey(conn net.Conn) string {
return fmt.Sprintf("%s => %s", conn.RemoteAddr(), conn.LocalAddr())
}

func newTrackedConnection(conn tcp.WriteCloser, tracker *connectionTracker) *trackedConnection {
tracker.AddConnection(conn)
return &trackedConnection{
Expand Down
88 changes: 88 additions & 0 deletions pkg/server/server_entrypoint_tcp_test.go
Expand Up @@ -230,3 +230,91 @@ func TestReadTimeoutWithFirstByte(t *testing.T) {
t.Error("Timeout while read")
}
}

func TestKeepAliveMaxRequests(t *testing.T) {
epConfig := &static.EntryPointsTransport{}
epConfig.SetDefaults()
epConfig.KeepAliveMaxRequests = 3

entryPoint, err := NewTCPEntryPoint(context.Background(), &static.EntryPoint{
Address: ":0",
Transport: epConfig,
ForwardedHeaders: &static.ForwardedHeaders{},
HTTP2: &static.HTTP2Config{},
}, nil)
require.NoError(t, err)

router := &tcprouter.Router{}
router.SetHTTPHandler(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusOK)
}))

conn, err := startEntrypoint(entryPoint, router)
require.NoError(t, err)

http.DefaultClient.Transport = &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return conn, nil
},
}

resp, err := http.Get("http://" + entryPoint.listener.Addr().String())
require.NoError(t, err)
require.False(t, resp.Close)
err = resp.Body.Close()
require.NoError(t, err)

resp, err = http.Get("http://" + entryPoint.listener.Addr().String())
require.NoError(t, err)
require.False(t, resp.Close)
err = resp.Body.Close()
require.NoError(t, err)

resp, err = http.Get("http://" + entryPoint.listener.Addr().String())
require.NoError(t, err)
require.True(t, resp.Close)
err = resp.Body.Close()
require.NoError(t, err)
}

func TestKeepAliveMaxTime(t *testing.T) {
epConfig := &static.EntryPointsTransport{}
epConfig.SetDefaults()
epConfig.KeepAliveMaxTime = ptypes.Duration(time.Millisecond)

entryPoint, err := NewTCPEntryPoint(context.Background(), &static.EntryPoint{
Address: ":0",
Transport: epConfig,
ForwardedHeaders: &static.ForwardedHeaders{},
HTTP2: &static.HTTP2Config{},
}, nil)
require.NoError(t, err)

router := &tcprouter.Router{}
router.SetHTTPHandler(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusOK)
}))

conn, err := startEntrypoint(entryPoint, router)
require.NoError(t, err)

http.DefaultClient.Transport = &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return conn, nil
},
}

resp, err := http.Get("http://" + entryPoint.listener.Addr().String())
require.NoError(t, err)
require.False(t, resp.Close)
err = resp.Body.Close()
require.NoError(t, err)

time.Sleep(time.Millisecond)

resp, err = http.Get("http://" + entryPoint.listener.Addr().String())
require.NoError(t, err)
require.True(t, resp.Close)
err = resp.Body.Close()
require.NoError(t, err)
}

0 comments on commit 9662cdc

Please sign in to comment.