Skip to content

Commit

Permalink
[occm] update listener on Octavia annotation changes (kubernetes#1150)
Browse files Browse the repository at this point in the history
This implements the update of the listener if one of the service
annotations
* loadbalancer.openstack.org/timeout-client-data
* loadbalancer.openstack.org/timeout-member-connect
* loadbalancer.openstack.org/timeout-member-data
* loadbalancer.openstack.org/timeout-tcp-inspect
* loadbalancer.openstack.org/x-forwarded-for
changes.

Signed-off-by: Sean Schneeweiss <sean.schneeweiss@daimler.com>
  • Loading branch information
seanschneeweiss authored and jichenjc committed Sep 7, 2020
1 parent 460e809 commit 2820e57
Showing 1 changed file with 91 additions and 61 deletions.
152 changes: 91 additions & 61 deletions pkg/cloudprovider/providers/openstack/openstack_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ const (
activeStatus = "ACTIVE"
errorStatus = "ERROR"

annotationXForwardedFor = "X-Forwarded-For"

// ServiceAnnotationLoadBalancerInternal defines whether or not to create an internal loadbalancer. Default: false.
ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/openstack-internal-load-balancer"
ServiceAnnotationLoadBalancerConnLimit = "loadbalancer.openstack.org/connection-limit"
Expand Down Expand Up @@ -108,18 +110,23 @@ type LbaasV2 struct {

// serviceConfig contains configurations for creating a Service.
type serviceConfig struct {
internal bool
configClassName string
lbNetworkID string
lbSubnetID string
lbMemberSubnetID string
lbPublicNetworkID string
lbPublicSubnetID string
keepClientIP bool
enableProxyProtocol bool
allowedCIDR []string
enableMonitor bool
flavorID string
internal bool
connLimit int
configClassName string
lbNetworkID string
lbSubnetID string
lbMemberSubnetID string
lbPublicNetworkID string
lbPublicSubnetID string
keepClientIP bool
enableProxyProtocol bool
timeoutClientData int
timeoutMemberConnect int
timeoutMemberData int
timeoutTCPInspect int
allowedCIDR []string
enableMonitor bool
flavorID string
}

type listenerKey struct {
Expand Down Expand Up @@ -595,19 +602,23 @@ func getStringFromServiceAnnotation(service *corev1.Service, annotationKey strin
return defaultSetting
}

func getIntFromServiceAnnotation(service *corev1.Service, annotationKey string) (int, bool) {
intString := getStringFromServiceAnnotation(service, annotationKey, "")
if len(intString) > 0 {
annotationValue, err := strconv.Atoi(intString)
if err == nil {
klog.V(4).Infof("Found a Service Annotation: %v = %v", annotationKey, annotationValue)
return annotationValue, true
//getIntFromServiceAnnotation searches a given v1.Service for a specific annotationKey and either returns the annotation's integer value or a specified defaultSetting
func getIntFromServiceAnnotation(service *corev1.Service, annotationKey string, defaultSetting int) int {
if annotationValue, ok := service.Annotations[annotationKey]; ok {
returnValue, err := strconv.Atoi(annotationValue)
if err != nil {
klog.Warningf("Could not parse int value from %q, failing back to default %s = %v, %v", annotationValue, annotationKey, defaultSetting, err)
return defaultSetting
}

klog.V(4).Infof("Found a Service Annotation: %v = %v", annotationKey, annotationValue)
return returnValue
}
return 0, false
klog.V(4).Infof("Could not find a Service Annotation; falling back to default setting: %v = %v", annotationKey, defaultSetting)
return defaultSetting
}

//getBoolFromServiceAnnotation searches a given v1.Service for a specific annotationKey and either returns the annotation's value or a specified defaultSetting
//getBoolFromServiceAnnotation searches a given v1.Service for a specific annotationKey and either returns the annotation's boolean value or a specified defaultSetting
func getBoolFromServiceAnnotation(service *corev1.Service, annotationKey string, defaultSetting bool) (bool, error) {
klog.V(4).Infof("getBoolFromServiceAnnotation(%v, %v, %v)", service, annotationKey, defaultSetting)
if annotationValue, ok := service.Annotations[annotationKey]; ok {
Expand Down Expand Up @@ -1091,15 +1102,6 @@ func (lbaas *LbaasV2) ensureOctaviaListener(lbID string, oldListeners []listener
lbListeners[key] = &l
}

climit := getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerConnLimit, "-1")
connLimit := -1
tmp, err := strconv.Atoi(climit)
if err != nil {
klog.V(4).Infof("Could not parse int value from %s, use default -1", climit)
} else {
connLimit = tmp
}

proto := toListenersProtocol(port.Protocol)
if svcConf.keepClientIP {
proto = listeners.ProtocolHTTP
Expand All @@ -1112,38 +1114,31 @@ func (lbaas *LbaasV2) ensureOctaviaListener(lbID string, oldListeners []listener
if !ok {
listenerProtocol := listeners.Protocol(port.Protocol)
listenerCreateOpt := listeners.CreateOpts{
Protocol: listenerProtocol,
ProtocolPort: int(port.Port),
ConnLimit: &connLimit,
LoadbalancerID: lbID,
Protocol: listenerProtocol,
ProtocolPort: int(port.Port),
ConnLimit: &svcConf.connLimit,
LoadbalancerID: lbID,
TimeoutClientData: &svcConf.timeoutClientData,
TimeoutMemberConnect: &svcConf.timeoutMemberConnect,
TimeoutMemberData: &svcConf.timeoutMemberData,
TimeoutTCPInspect: &svcConf.timeoutTCPInspect,
}

if svcConf.keepClientIP {
if listenerCreateOpt.Protocol != listeners.ProtocolHTTP {
klog.V(4).Infof("Forcing to use %q protocol for listener because %q annotation is set", listeners.ProtocolHTTP, ServiceAnnotationLoadBalancerXForwardedFor)
listenerCreateOpt.Protocol = listeners.ProtocolHTTP
}
listenerCreateOpt.InsertHeaders = map[string]string{"X-Forwarded-For": "true"}
listenerCreateOpt.InsertHeaders = map[string]string{annotationXForwardedFor: "true"}
}

if timeoutClientData, ok := getIntFromServiceAnnotation(service, ServiceAnnotationLoadBalancerTimeoutClientData); ok {
listenerCreateOpt.TimeoutClientData = &timeoutClientData
}
if timeoutMemberData, ok := getIntFromServiceAnnotation(service, ServiceAnnotationLoadBalancerTimeoutMemberData); ok {
listenerCreateOpt.TimeoutMemberData = &timeoutMemberData
}
if timeoutMemberConnect, ok := getIntFromServiceAnnotation(service, ServiceAnnotationLoadBalancerTimeoutMemberConnect); ok {
listenerCreateOpt.TimeoutMemberConnect = &timeoutMemberConnect
}
if timeoutTCPInspect, ok := getIntFromServiceAnnotation(service, ServiceAnnotationLoadBalancerTimeoutTCPInspect); ok {
listenerCreateOpt.TimeoutTCPInspect = &timeoutTCPInspect
}
if len(svcConf.allowedCIDR) > 0 {
listenerCreateOpt.AllowedCIDRs = svcConf.allowedCIDR
}

klog.V(2).Infof("Creating listener for port %d using protocol %s", int(port.Port), listenerProtocol)

var err error
listener, err = openstackutil.CreateListener(lbaas.lb, lbID, listenerCreateOpt)
if err != nil {
return nil, fmt.Errorf("failed to create listener for loadbalancer %s: %v", lbID, err)
Expand All @@ -1154,8 +1149,34 @@ func (lbaas *LbaasV2) ensureOctaviaListener(lbID string, oldListeners []listener
listenerChanged := false
updateOpts := listeners.UpdateOpts{}

if connLimit != listener.ConnLimit {
updateOpts.ConnLimit = &connLimit
if svcConf.connLimit != listener.ConnLimit {
updateOpts.ConnLimit = &svcConf.connLimit
listenerChanged = true
}
updateOpts.InsertHeaders = &listener.InsertHeaders
listenerKeepClientIP := listener.InsertHeaders[annotationXForwardedFor] == "true"
if svcConf.keepClientIP != listenerKeepClientIP {
if svcConf.keepClientIP {
(*updateOpts.InsertHeaders)[annotationXForwardedFor] = "true"
} else {
delete(*updateOpts.InsertHeaders, annotationXForwardedFor)
}
listenerChanged = true
}
if svcConf.timeoutClientData != listener.TimeoutClientData {
updateOpts.TimeoutClientData = &svcConf.timeoutClientData
listenerChanged = true
}
if svcConf.timeoutMemberConnect != listener.TimeoutMemberConnect {
updateOpts.TimeoutMemberConnect = &svcConf.timeoutMemberConnect
listenerChanged = true
}
if svcConf.timeoutMemberData != listener.TimeoutMemberData {
updateOpts.TimeoutMemberData = &svcConf.timeoutMemberData
listenerChanged = true
}
if svcConf.timeoutTCPInspect != listener.TimeoutTCPInspect {
updateOpts.TimeoutTCPInspect = &svcConf.timeoutTCPInspect
listenerChanged = true
}
if openstackutil.IsOctaviaFeatureSupported(lbaas.lb, openstackutil.OctaviaFeatureVIPACL) {
Expand Down Expand Up @@ -1198,6 +1219,8 @@ func (lbaas *LbaasV2) checkService(service *corev1.Service, nodes []*corev1.Node
svcConf.internal = internal
}

svcConf.connLimit = getIntFromServiceAnnotation(service, ServiceAnnotationLoadBalancerConnLimit, -1)

svcConf.lbNetworkID = getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerNetworkID, lbaas.opts.NetworkID)
svcConf.lbSubnetID = getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerSubnetID, lbaas.opts.SubnetID)
svcConf.flavorID = getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerFlavorID, lbaas.opts.FlavorID)
Expand Down Expand Up @@ -1301,6 +1324,11 @@ func (lbaas *LbaasV2) checkService(service *corev1.Service, nodes []*corev1.Node
svcConf.keepClientIP = keepClientIP
svcConf.enableProxyProtocol = useProxyProtocol

svcConf.timeoutClientData = getIntFromServiceAnnotation(service, ServiceAnnotationLoadBalancerTimeoutClientData, 50000)
svcConf.timeoutMemberConnect = getIntFromServiceAnnotation(service, ServiceAnnotationLoadBalancerTimeoutMemberConnect, 5000)
svcConf.timeoutMemberData = getIntFromServiceAnnotation(service, ServiceAnnotationLoadBalancerTimeoutMemberData, 50000)
svcConf.timeoutTCPInspect = getIntFromServiceAnnotation(service, ServiceAnnotationLoadBalancerTimeoutTCPInspect, 0)

var listenerAllowedCIDRs []string
sourceRanges, err := GetLoadBalancerSourceRanges(service)
if err != nil {
Expand Down Expand Up @@ -1505,6 +1533,10 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(ctx context.Context, clusterName string

var keepClientIP bool
var useProxyProtocol bool
var timeoutClientData int
var timeoutMemberConnect int
var timeoutMemberData int
var timeoutTCPInspect int
if !lbaas.opts.UseOctavia {
// Check for TCP protocol on each port
for _, port := range ports {
Expand All @@ -1527,6 +1559,11 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(ctx context.Context, clusterName string
if useProxyProtocol && keepClientIP {
return nil, fmt.Errorf("annotation %s and %s cannot be used together", ServiceAnnotationLoadBalancerProxyEnabled, ServiceAnnotationLoadBalancerXForwardedFor)
}

timeoutClientData = getIntFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerTimeoutClientData, 50000)
timeoutMemberConnect = getIntFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerTimeoutMemberConnect, 5000)
timeoutMemberData = getIntFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerTimeoutMemberData, 50000)
timeoutTCPInspect = getIntFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerTimeoutTCPInspect, 0)
}

var listenerAllowedCIDRs []string
Expand Down Expand Up @@ -1615,18 +1652,11 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(ctx context.Context, clusterName string
listenerCreateOpt.InsertHeaders = map[string]string{"X-Forwarded-For": "true"}
}

if timeoutClientData, ok := getIntFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerTimeoutClientData); ok {
listenerCreateOpt.TimeoutClientData = &timeoutClientData
}
if timeoutMemberData, ok := getIntFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerTimeoutMemberData); ok {
listenerCreateOpt.TimeoutMemberData = &timeoutMemberData
}
if timeoutMemberConnect, ok := getIntFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerTimeoutMemberConnect); ok {
listenerCreateOpt.TimeoutMemberConnect = &timeoutMemberConnect
}
if timeoutTCPInspect, ok := getIntFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerTimeoutTCPInspect); ok {
listenerCreateOpt.TimeoutTCPInspect = &timeoutTCPInspect
}
listenerCreateOpt.TimeoutClientData = &timeoutClientData
listenerCreateOpt.TimeoutMemberData = &timeoutMemberData
listenerCreateOpt.TimeoutMemberConnect = &timeoutMemberConnect
listenerCreateOpt.TimeoutTCPInspect = &timeoutTCPInspect

if len(listenerAllowedCIDRs) > 0 {
listenerCreateOpt.AllowedCIDRs = listenerAllowedCIDRs
}
Expand Down

0 comments on commit 2820e57

Please sign in to comment.