Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gateway): add listener connection limits #4755

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 135 additions & 56 deletions api/mesh/v1alpha1/gateway.pb.go

Large diffs are not rendered by default.

5 changes: 5 additions & 0 deletions api/mesh/v1alpha1/gateway.proto
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ message MeshGateway {
}

message Listener {
message Resources { uint32 connection_limit = 1; }

enum Protocol {
NONE = 0;
TCP = 1;
Expand Down Expand Up @@ -121,6 +123,9 @@ message MeshGateway {
// CrossMesh enables traffic to flow to this listener only from other
// meshes.
bool crossMesh = 6;

// Resources is used to specify listener-specific resource settings.
Resources resources = 7;
michaelbeaumont marked this conversation as resolved.
Show resolved Hide resolved
}

// Conf defines the desired state of MeshGateway.
Expand Down
8 changes: 7 additions & 1 deletion docs/generated/resources/policy_meshgateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,5 +103,11 @@
- `crossMesh` (optional)

CrossMesh enables traffic to flow to this listener only from other
meshes.
meshes.

- `resources` (optional)

Resources is used to specify listener-specific resource settings.

- `connectionLimit` (optional)

46 changes: 37 additions & 9 deletions pkg/core/resources/apis/mesh/gateway_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,33 +49,52 @@ func (g *MeshGatewayResource) Validate() error {
return err.OrNil()
}

func validateListenerCompatibility(path validators.PathBuilder, listeners []*mesh_proto.MeshGateway_Listener) validators.ValidationError {
type resourceLimits struct {
connectionLimits map[uint32]struct{}
listeners []int
}

func validateListenerCollapsibility(path validators.PathBuilder, listeners []*mesh_proto.MeshGateway_Listener) validators.ValidationError {
protocolsForPort := map[uint32]map[string][]int{}
hostnamesForPort := map[uint32]map[string][]int{}
limitedListenersForPort := map[uint32]resourceLimits{}

for i, ep := range listeners {
protocols, ok := protocolsForPort[ep.GetPort()]
for i, listener := range listeners {
protocols, ok := protocolsForPort[listener.GetPort()]
if !ok {
protocols = map[string][]int{}
}

hostnames, ok := hostnamesForPort[ep.GetPort()]
hostnames, ok := hostnamesForPort[listener.GetPort()]
if !ok {
hostnames = map[string][]int{}
}

protocols[ep.GetProtocol().String()] = append(protocols[ep.GetProtocol().String()], i)
limitedListeners, ok := limitedListenersForPort[listener.GetPort()]
if !ok {
limitedListeners = resourceLimits{
connectionLimits: map[uint32]struct{}{},
}
}

protocols[listener.GetProtocol().String()] = append(protocols[listener.GetProtocol().String()], i)

// An empty hostname is the same as "*", i.e. matches all hosts.
hostname := ep.GetHostname()
hostname := listener.GetHostname()
if hostname == "" {
hostname = mesh_proto.WildcardHostname
}

hostnames[hostname] = append(hostnames[hostname], i)

hostnamesForPort[ep.GetPort()] = hostnames
protocolsForPort[ep.GetPort()] = protocols
if l := listener.GetResources().GetConnectionLimit(); l != 0 {
limitedListeners.listeners = append(limitedListeners.listeners, i)
limitedListeners.connectionLimits[l] = struct{}{}
}

hostnamesForPort[listener.GetPort()] = hostnames
protocolsForPort[listener.GetPort()] = protocols
limitedListenersForPort[listener.GetPort()] = limitedListeners
}

err := validators.ValidationError{}
Expand Down Expand Up @@ -104,6 +123,15 @@ func validateListenerCompatibility(path validators.PathBuilder, listeners []*mes
}
}

for _, listeners := range limitedListenersForPort {
if len(listeners.connectionLimits) <= 1 {
continue
}
for _, index := range listeners.listeners {
err.AddViolationAt(path.Index(index).Field("resources").Field("connectionLimit"), "conflicting values for this port")
}
}

return err
}

Expand Down Expand Up @@ -198,7 +226,7 @@ func validateMeshGatewayConf(path validators.PathBuilder, conf *mesh_proto.MeshG
}))
}

err.Add(validateListenerCompatibility(path, conf.GetListeners()))
err.Add(validateListenerCollapsibility(path, conf.GetListeners()))

return err
}
35 changes: 34 additions & 1 deletion pkg/core/resources/apis/mesh/gateway_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,29 @@ conf:
tags:
name: http`,
),
Entry("listeners with connectionLimits", `
type: MeshGateway
name: gateway
mesh: default
selectors:
- match:
kuma.io/service: gateway
tags:
product: edge
conf:
listeners:
- protocol: HTTP
hostname: one.com
port: 99
resources:
connectionLimit: 2
- protocol: HTTP
hostname: two.com
port: 99
resources:
connectionLimit: 2
`,
),
)

DescribeErrorCases(
Expand Down Expand Up @@ -398,7 +421,7 @@ conf:
protocol: TCP
`),

ErrorCases("hostname and protocol conflict",
ErrorCases("hostname, protocol and resource conflict",
[]validators.Violation{{
Field: "conf.listeners[0]",
Message: "protocol conflicts with other listeners on this port",
Expand All @@ -411,6 +434,12 @@ conf:
}, {
Field: "conf.listeners[1]",
Message: "multiple listeners for hostname on this port",
}, {
Field: "conf.listeners[0].resources.connectionLimit",
Message: "conflicting values for this port",
}, {
Field: "conf.listeners[1].resources.connectionLimit",
Message: "conflicting values for this port",
}}, `
type: MeshGateway
name: gateway
Expand All @@ -423,11 +452,15 @@ conf:
- hostname: www-1.example.com
port: 443
protocol: TCP
resources:
connectionLimit: 2
- hostname: www-1.example.com
port: 443
protocol: HTTPS
tls:
mode: PASSTHROUGH
resources:
connectionLimit: 1
`),
)
})
43 changes: 37 additions & 6 deletions pkg/plugins/runtime/gateway/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sort"
"strings"

envoy_service_runtime_v3 "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3"
"github.com/pkg/errors"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
Expand All @@ -15,6 +16,7 @@ import (
"github.com/kumahq/kuma/pkg/plugins/runtime/gateway/match"
"github.com/kumahq/kuma/pkg/plugins/runtime/gateway/merge"
"github.com/kumahq/kuma/pkg/plugins/runtime/gateway/route"
util_proto "github.com/kumahq/kuma/pkg/util/proto"
xds_context "github.com/kumahq/kuma/pkg/xds/context"
envoy_listeners "github.com/kumahq/kuma/pkg/xds/envoy/listeners"
envoy_names "github.com/kumahq/kuma/pkg/xds/envoy/names"
Expand Down Expand Up @@ -60,6 +62,7 @@ type GatewayListener struct {
// CrossMesh is important because for generation we need to treat such a
// listener as if we have HTTPS with the Mesh cert for this Dataplane
CrossMesh bool
Resources *mesh_proto.MeshGateway_Listener_Resources // TODO verify these don't conflict when merging
michaelbeaumont marked this conversation as resolved.
Show resolved Hide resolved
}

// GatewayListenerInfo holds everything needed to generate resources for a
Expand Down Expand Up @@ -197,6 +200,8 @@ func (g Generator) Generate(ctx xds_context.Context, proxy *core_xds.Proxy) (*co
return nil, errors.Wrap(err, "error generating listener info from Proxy")
}

var limits []RuntimeResoureLimitListener

for _, info := range listenerInfos {
// This is checked by the gateway validator
if !SupportsProtocol(info.Listener.Protocol) {
Expand All @@ -209,25 +214,50 @@ func (g Generator) Generate(ctx xds_context.Context, proxy *core_xds.Proxy) (*co
}
resources.AddSet(cdsResources)

ldsResources, err := g.generateLDS(ctx, info, info.HostInfos)
ldsResources, limit, err := g.generateLDS(ctx, info, info.HostInfos)
if err != nil {
return nil, err
}
resources.AddSet(ldsResources)

if limit != nil {
limits = append(limits, *limit)
}

rdsResources, err := g.generateRDS(ctx, info, info.HostInfos)
if err != nil {
return nil, err
}
resources.AddSet(rdsResources)
}

resources.Add(g.generateRTDS(limits))

return resources, nil
}

func (g Generator) generateLDS(ctx xds_context.Context, info GatewayListenerInfo, hostInfos []GatewayHostInfo) (*core_xds.ResourceSet, error) {
func (g Generator) generateRTDS(limits []RuntimeResoureLimitListener) *core_xds.Resource {
layer := map[string]interface{}{}
for _, limit := range limits {
layer[fmt.Sprintf("envoy.resource_limits.listener.%s.connection_limit", limit.Name)] = limit.ConnectionLimit
}

res := &core_xds.Resource{
Name: "gateway.listeners",
Origin: OriginGateway,
Resource: &envoy_service_runtime_v3.Runtime{
Name: "gateway.listeners",
Layer: util_proto.MustStruct(layer),
},
}

return res
}

func (g Generator) generateLDS(ctx xds_context.Context, info GatewayListenerInfo, hostInfos []GatewayHostInfo) (*core_xds.ResourceSet, *RuntimeResoureLimitListener, error) {
resources := core_xds.NewResourceSet()
listenerBuilder := GenerateListener(info)

listenerBuilder, limit := GenerateListener(info)

var gatewayHosts []GatewayHost
for _, hostInfo := range hostInfos {
Expand All @@ -240,7 +270,7 @@ func (g Generator) generateLDS(ctx xds_context.Context, info GatewayListenerInfo
}
res, filterChainBuilders, err := g.FilterChainGenerators.FilterChainGenerators[protocol].Generate(ctx, info, gatewayHosts)
if err != nil {
return nil, err
return nil, limit, err
}
resources.AddSet(res)

Expand All @@ -250,11 +280,11 @@ func (g Generator) generateLDS(ctx xds_context.Context, info GatewayListenerInfo

res, err = BuildResourceSet(listenerBuilder)
if err != nil {
return nil, errors.Wrapf(err, "failed to build listener resource")
return nil, limit, errors.Wrapf(err, "failed to build listener resource")
}
resources.AddSet(res)

return resources, nil
return resources, limit, nil
}

func (g Generator) generateCDS(ctx xds_context.Context, info GatewayListenerInfo, hostInfos []GatewayHostInfo) (*core_xds.ResourceSet, error) {
Expand Down Expand Up @@ -323,6 +353,7 @@ func MakeGatewayListener(
listeners[0].GetPort(),
),
CrossMesh: listeners[0].CrossMesh,
Resources: listeners[0].GetResources(),
}

// Hostnames must be unique to a listener to remove ambiguity
Expand Down
24 changes: 20 additions & 4 deletions pkg/plugins/runtime/gateway/listener_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ func SupportsProtocol(p mesh_proto.MeshGateway_Listener_Protocol) bool {
}
}

func GenerateListener(info GatewayListenerInfo) *envoy_listeners.ListenerBuilder {
type RuntimeResoureLimitListener struct {
Name string
ConnectionLimit uint32
}

func GenerateListener(info GatewayListenerInfo) (*envoy_listeners.ListenerBuilder, *RuntimeResoureLimitListener) {
// TODO(jpeach) what we really need to do here is to
// generate a HTTP filter chain for each
// host on the same HTTPConnectionManager. Each HTTP filter
Expand All @@ -48,18 +53,29 @@ func GenerateListener(info GatewayListenerInfo) *envoy_listeners.ListenerBuilder
"protocol", protocol,
)

// TODO(jpeach) if proxy protocol is enabled, add the proxy protocol listener filter.
name := envoy_names.GetGatewayListenerName(info.Gateway.Meta.GetName(), protocol.String(), port)

var limits *RuntimeResoureLimitListener
if resources := info.Listener.Resources; resources != nil {
if resources.ConnectionLimit > 0 {
limits = &RuntimeResoureLimitListener{
Name: name,
ConnectionLimit: resources.ConnectionLimit,
}
}
}

// TODO(jpeach) if proxy protocol is enabled, add the proxy protocol listener filter.
return envoy_listeners.NewListenerBuilder(info.Proxy.APIVersion).
Configure(
envoy_listeners.InboundListener(
envoy_names.GetGatewayListenerName(info.Gateway.Meta.GetName(), protocol.String(), port),
name,
address, port, core_xds.SocketAddressProtocolTCP),
// Limit default buffering for edge connections.
envoy_listeners.ConnectionBufferLimit(DefaultConnectionBuffer),
// Roughly balance incoming connections.
envoy_listeners.EnableReusePort(true),
// Always sniff for TLS.
envoy_listeners.TLSInspector(),
)
), limits
}
20 changes: 18 additions & 2 deletions pkg/plugins/runtime/gateway/listener_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package gateway_test
import (
"path"

envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/ghodss/yaml"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -84,7 +83,7 @@ data: LS0tLS1CRUdJTiBSU0EgUFJJVkFURSBLRVktLS0tLQpNSUlFb3dJQkFBS0NBUUVBM3ZWM1cvNX
snap, err := Do(gateway)
Expect(err).To(Succeed())

out, err := yaml.Marshal(MakeProtoResource(snap.Resources[envoy_types.Listener]))
out, err := yaml.Marshal(MakeProtoSnapshot(snap))
Expect(err).To(Succeed())

Expect(out).To(matchers.MatchGoldenYAML(path.Join("testdata", golden)))
Expand Down Expand Up @@ -218,5 +217,22 @@ conf:
tags:
name: example.com
`),

Entry("should add connection limits",
"connection-limited-listener.yaml", `
type: MeshGateway
mesh: default
name: default-gateway
selectors:
- match:
kuma.io/service: gateway-default
conf:
listeners:
- port: 443
protocol: TCP
hostname: bar.example.com
resources:
connectionLimit: 10000
`),
)
})
Loading