Permalink
Find file Copy path
1683 lines (1475 sloc) 58.9 KB
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package openstack
import (
"context"
"fmt"
"net"
"reflect"
"strconv"
"strings"
"time"
"github.com/gophercloud/gophercloud"
octavialb "github.com/gophercloud/gophercloud/openstack/loadbalancer/v2/loadbalancers"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/external"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/layer3/floatingips"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/listeners"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/loadbalancers"
v2monitors "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/monitors"
v2pools "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/pools"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/groups"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/rules"
"github.com/gophercloud/gophercloud/openstack/networking/v2/networks"
neutronports "github.com/gophercloud/gophercloud/openstack/networking/v2/ports"
"github.com/gophercloud/gophercloud/pagination"
"k8s.io/klog"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider"
v1service "k8s.io/cloud-provider-openstack/pkg/api/v1/service"
)
// Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use,
// this timeout is used for waiting until the Loadbalancer provisioning status goes to ACTIVE state.
const (
// loadbalancerActive* is configuration of exponential backoff for
// going into ACTIVE loadbalancer provisioning status. Starting with 1
// seconds, multiplying by 1.2 with each step and taking 19 steps at maximum
// it will time out after 128s, which roughly corresponds to 120s
loadbalancerActiveInitDelay = 1 * time.Second
loadbalancerActiveFactor = 1.2
loadbalancerActiveSteps = 19
// loadbalancerDelete* is configuration of exponential backoff for
// waiting for delete operation to complete. Starting with 1
// seconds, multiplying by 1.2 with each step and taking 13 steps at maximum
// it will time out after 32s, which roughly corresponds to 30s
loadbalancerDeleteInitDelay = 1 * time.Second
loadbalancerDeleteFactor = 1.2
loadbalancerDeleteSteps = 13
activeStatus = "ACTIVE"
errorStatus = "ERROR"
ServiceAnnotationLoadBalancerFloatingNetworkID = "loadbalancer.openstack.org/floating-network-id"
ServiceAnnotationLoadBalancerSubnetID = "loadbalancer.openstack.org/subnet-id"
ServiceAnnotationLoadBalancerConnLimit = "loadbalancer.openstack.org/connection-limit"
ServiceAnnotationLoadBalancerKeepFloatingIP = "loadbalancer.openstack.org/keep-floatingip"
// ServiceAnnotationLoadBalancerInternal is the annotation used on the service
// to indicate that we want an internal loadbalancer service.
// If the value of ServiceAnnotationLoadBalancerInternal is false, it indicates that we want an external loadbalancer service. Default to false.
ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/openstack-internal-load-balancer"
)
// LbaasV2 is a LoadBalancer implementation for Neutron LBaaS v2 API
type LbaasV2 struct {
LoadBalancer
}
type empty struct{}
func networkExtensions(client *gophercloud.ServiceClient) (map[string]bool, error) {
seen := make(map[string]bool)
pager := extensions.List(client)
err := pager.EachPage(func(page pagination.Page) (bool, error) {
exts, err := extensions.ExtractExtensions(page)
if err != nil {
return false, err
}
for _, ext := range exts {
seen[ext.Alias] = true
}
return true, nil
})
return seen, err
}
func getFloatingIPByFloatingIP(client *gophercloud.ServiceClient, floatingIP string) (*floatingips.FloatingIP, error) {
opts := floatingips.ListOpts{
FloatingIP: floatingIP,
}
return getFloatingIP(client, opts)
}
func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*floatingips.FloatingIP, error) {
opts := floatingips.ListOpts{
PortID: portID,
}
return getFloatingIP(client, opts)
}
func getFloatingIP(client *gophercloud.ServiceClient, opts floatingips.ListOpts) (*floatingips.FloatingIP, error) {
pager := floatingips.List(client, opts)
floatingIPList := make([]floatingips.FloatingIP, 0, 1)
err := pager.EachPage(func(page pagination.Page) (bool, error) {
f, err := floatingips.ExtractFloatingIPs(page)
if err != nil {
return false, err
}
floatingIPList = append(floatingIPList, f...)
if len(floatingIPList) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, err
}
if len(floatingIPList) == 0 {
return nil, ErrNotFound
} else if len(floatingIPList) > 1 {
return nil, ErrMultipleResults
}
return &floatingIPList[0], nil
}
func getLoadbalancerByName(client *gophercloud.ServiceClient, name string) (*loadbalancers.LoadBalancer, error) {
opts := loadbalancers.ListOpts{
Name: name,
}
pager := loadbalancers.List(client, opts)
loadbalancerList := make([]loadbalancers.LoadBalancer, 0, 1)
err := pager.EachPage(func(page pagination.Page) (bool, error) {
v, err := loadbalancers.ExtractLoadBalancers(page)
if err != nil {
return false, err
}
loadbalancerList = append(loadbalancerList, v...)
if len(loadbalancerList) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, err
}
if len(loadbalancerList) == 0 {
return nil, ErrNotFound
} else if len(loadbalancerList) > 1 {
return nil, ErrMultipleResults
}
return &loadbalancerList[0], nil
}
func getListenersByLoadBalancerID(client *gophercloud.ServiceClient, id string) ([]listeners.Listener, error) {
var existingListeners []listeners.Listener
err := listeners.List(client, listeners.ListOpts{LoadbalancerID: id}).EachPage(func(page pagination.Page) (bool, error) {
listenerList, err := listeners.ExtractListeners(page)
if err != nil {
return false, err
}
for _, l := range listenerList {
for _, lb := range l.Loadbalancers {
if lb.ID == id {
existingListeners = append(existingListeners, l)
break
}
}
}
return true, nil
})
if err != nil {
return nil, err
}
return existingListeners, nil
}
// get listener for a port or nil if does not exist
func getListenerForPort(existingListeners []listeners.Listener, port v1.ServicePort) *listeners.Listener {
for _, l := range existingListeners {
if listeners.Protocol(l.Protocol) == toListenersProtocol(port.Protocol) && l.ProtocolPort == int(port.Port) {
return &l
}
}
return nil
}
// Get pool for a listener. A listener always has exactly one pool.
func getPoolByListenerID(client *gophercloud.ServiceClient, loadbalancerID string, listenerID string) (*v2pools.Pool, error) {
listenerPools := make([]v2pools.Pool, 0, 1)
err := v2pools.List(client, v2pools.ListOpts{LoadbalancerID: loadbalancerID}).EachPage(func(page pagination.Page) (bool, error) {
poolsList, err := v2pools.ExtractPools(page)
if err != nil {
return false, err
}
for _, p := range poolsList {
for _, l := range p.Listeners {
if l.ID == listenerID {
listenerPools = append(listenerPools, p)
}
}
}
if len(listenerPools) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
if isNotFound(err) {
return nil, ErrNotFound
}
return nil, err
}
if len(listenerPools) == 0 {
return nil, ErrNotFound
} else if len(listenerPools) > 1 {
return nil, ErrMultipleResults
}
return &listenerPools[0], nil
}
func getMembersByPoolID(client *gophercloud.ServiceClient, id string) ([]v2pools.Member, error) {
var members []v2pools.Member
err := v2pools.ListMembers(client, id, v2pools.ListMembersOpts{}).EachPage(func(page pagination.Page) (bool, error) {
membersList, err := v2pools.ExtractMembers(page)
if err != nil {
return false, err
}
members = append(members, membersList...)
return true, nil
})
if err != nil {
return nil, err
}
return members, nil
}
// Check if a member exists for node
func memberExists(members []v2pools.Member, addr string, port int) bool {
for _, member := range members {
if member.Address == addr && member.ProtocolPort == port {
return true
}
}
return false
}
func popListener(existingListeners []listeners.Listener, id string) []listeners.Listener {
for i, existingListener := range existingListeners {
if existingListener.ID == id {
existingListeners[i] = existingListeners[len(existingListeners)-1]
existingListeners = existingListeners[:len(existingListeners)-1]
break
}
}
return existingListeners
}
func popMember(members []v2pools.Member, addr string, port int) []v2pools.Member {
for i, member := range members {
if member.Address == addr && member.ProtocolPort == port {
members[i] = members[len(members)-1]
members = members[:len(members)-1]
}
}
return members
}
func getSecurityGroupName(service *v1.Service) string {
securityGroupName := fmt.Sprintf("lb-sg-%s-%s-%s", service.UID, service.Namespace, service.Name)
//OpenStack requires that the name of a security group is shorter than 255 bytes.
if len(securityGroupName) > 255 {
securityGroupName = securityGroupName[:255]
}
return securityGroupName
}
func getSecurityGroupRules(client *gophercloud.ServiceClient, opts rules.ListOpts) ([]rules.SecGroupRule, error) {
pager := rules.List(client, opts)
var securityRules []rules.SecGroupRule
err := pager.EachPage(func(page pagination.Page) (bool, error) {
ruleList, err := rules.ExtractRules(page)
if err != nil {
return false, err
}
securityRules = append(securityRules, ruleList...)
return true, nil
})
if err != nil {
return nil, err
}
return securityRules, nil
}
func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) {
backoff := wait.Backoff{
Duration: loadbalancerActiveInitDelay,
Factor: loadbalancerActiveFactor,
Steps: loadbalancerActiveSteps,
}
var provisioningStatus string
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract()
if err != nil {
return false, err
}
provisioningStatus = loadbalancer.ProvisioningStatus
if loadbalancer.ProvisioningStatus == activeStatus {
return true, nil
} else if loadbalancer.ProvisioningStatus == errorStatus {
return true, fmt.Errorf("loadbalancer has gone into ERROR state")
} else {
return false, nil
}
})
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("loadbalancer failed to go into ACTIVE provisioning status within allotted time")
}
return provisioningStatus, err
}
func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID string) error {
backoff := wait.Backoff{
Duration: loadbalancerDeleteInitDelay,
Factor: loadbalancerDeleteFactor,
Steps: loadbalancerDeleteSteps,
}
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
_, err := loadbalancers.Get(client, loadbalancerID).Extract()
if err != nil {
if isNotFound(err) {
return true, nil
}
return false, err
}
return false, nil
})
if err == wait.ErrWaitTimeout {
err = fmt.Errorf("loadbalancer failed to delete within the allotted time")
}
return err
}
func toRuleProtocol(protocol v1.Protocol) rules.RuleProtocol {
switch protocol {
case v1.ProtocolTCP:
return rules.ProtocolTCP
case v1.ProtocolUDP:
return rules.ProtocolUDP
default:
return rules.RuleProtocol(strings.ToLower(string(protocol)))
}
}
func toListenersProtocol(protocol v1.Protocol) listeners.Protocol {
switch protocol {
case v1.ProtocolTCP:
return listeners.ProtocolTCP
default:
return listeners.Protocol(string(protocol))
}
}
func createNodeSecurityGroup(client *gophercloud.ServiceClient, nodeSecurityGroupID string, port int, protocol v1.Protocol, lbSecGroup string) error {
v4NodeSecGroupRuleCreateOpts := rules.CreateOpts{
Direction: rules.DirIngress,
PortRangeMax: port,
PortRangeMin: port,
Protocol: toRuleProtocol(protocol),
RemoteGroupID: lbSecGroup,
SecGroupID: nodeSecurityGroupID,
EtherType: rules.EtherType4,
}
v6NodeSecGroupRuleCreateOpts := rules.CreateOpts{
Direction: rules.DirIngress,
PortRangeMax: port,
PortRangeMin: port,
Protocol: toRuleProtocol(protocol),
RemoteGroupID: lbSecGroup,
SecGroupID: nodeSecurityGroupID,
EtherType: rules.EtherType6,
}
_, err := rules.Create(client, v4NodeSecGroupRuleCreateOpts).Extract()
if err != nil {
return err
}
_, err = rules.Create(client, v6NodeSecGroupRuleCreateOpts).Extract()
if err != nil {
return err
}
return nil
}
func (lbaas *LbaasV2) createLoadBalancer(service *v1.Service, name, clusterName string, internalAnnotation bool) (*loadbalancers.LoadBalancer, error) {
createOpts := loadbalancers.CreateOpts{
Name: name,
Description: fmt.Sprintf("Kubernetes external service %s from cluster %s", name, clusterName),
VipSubnetID: lbaas.opts.SubnetID,
Provider: lbaas.opts.LBProvider,
}
loadBalancerIP := service.Spec.LoadBalancerIP
if loadBalancerIP != "" && internalAnnotation {
createOpts.VipAddress = loadBalancerIP
}
loadbalancer, err := loadbalancers.Create(lbaas.lb, createOpts).Extract()
if err != nil {
return nil, fmt.Errorf("error creating loadbalancer %v: %v", createOpts, err)
}
return loadbalancer, nil
}
// GetLoadBalancer returns whether the specified load balancer exists and its status
func (lbaas *LbaasV2) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
loadBalancerName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
if err == ErrNotFound {
return nil, false, nil
}
if loadbalancer == nil {
return nil, false, err
}
status := &v1.LoadBalancerStatus{}
portID := loadbalancer.VipPortID
if portID != "" {
floatIP, err := getFloatingIPByPortID(lbaas.network, portID)
if err != nil {
return nil, false, fmt.Errorf("error getting floating ip for port %s: %v", portID, err)
}
status.Ingress = []v1.LoadBalancerIngress{{IP: floatIP.FloatingIP}}
} else {
status.Ingress = []v1.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
}
return status, true, err
}
// GetLoadBalancerName is an implementation of LoadBalancer.GetLoadBalancerName.
func (lbaas *LbaasV2) GetLoadBalancerName(ctx context.Context, clusterName string, service *v1.Service) string {
// TODO: replace DefaultLoadBalancerName to generate more meaningful loadbalancer names.
return cloudprovider.DefaultLoadBalancerName(service)
}
// The LB needs to be configured with instance addresses on the same
// subnet as the LB (aka opts.SubnetID). Currently we're just
// guessing that the node's InternalIP is the right address.
// In case no InternalIP can be found, ExternalIP is tried.
// If neither InternalIP nor ExternalIP can be found an error is
// returned.
func nodeAddressForLB(node *v1.Node) (string, error) {
addrs := node.Status.Addresses
if len(addrs) == 0 {
return "", ErrNoAddressFound
}
allowedAddrTypes := []v1.NodeAddressType{v1.NodeInternalIP, v1.NodeExternalIP}
for _, allowedAddrType := range allowedAddrTypes {
for _, addr := range addrs {
if addr.Type == allowedAddrType {
return addr.Address, nil
}
}
}
return "", ErrNoAddressFound
}
//getStringFromServiceAnnotation searches a given v1.Service for a specific annotationKey and either returns the annotation's value or a specified defaultSetting
func getStringFromServiceAnnotation(service *v1.Service, annotationKey string, defaultSetting string) string {
klog.V(4).Infof("getStringFromServiceAnnotation(%v, %v, %v)", service, annotationKey, defaultSetting)
if annotationValue, ok := service.Annotations[annotationKey]; ok {
//if there is an annotation for this setting, set the "setting" var to it
// annotationValue can be empty, it is working as designed
// it makes possible for instance provisioning loadbalancer without floatingip
klog.V(4).Infof("Found a Service Annotation: %v = %v", annotationKey, annotationValue)
return annotationValue
}
//if there is no annotation, set "settings" var to the value from cloud config
klog.V(4).Infof("Could not find a Service Annotation; falling back on cloud-config setting: %v = %v", annotationKey, defaultSetting)
return defaultSetting
}
//getBoolFromServiceAnnotation searches a given v1.Service for a specific annotationKey and either returns the annotation's value or a specified defaultSetting
func getBoolFromServiceAnnotation(service *v1.Service, annotationKey string, defaultSetting bool) (bool, error) {
klog.V(4).Infof("getBoolFromServiceAnnotation(%v, %v, %v)", service, annotationKey, defaultSetting)
if annotationValue, ok := service.Annotations[annotationKey]; ok {
returnValue := false
switch annotationValue {
case "true":
returnValue = true
case "false":
returnValue = false
default:
return returnValue, fmt.Errorf("unknown %s annotation: %v, specify \"true\" or \"false\" ", annotationKey, annotationValue)
}
klog.V(4).Infof("Found a Service Annotation: %v = %v", annotationKey, returnValue)
return returnValue, nil
}
klog.V(4).Infof("Could not find a Service Annotation; falling back to default setting: %v = %v", annotationKey, defaultSetting)
return defaultSetting, nil
}
// getSubnetIDForLB returns subnet-id for a specific node
func getSubnetIDForLB(compute *gophercloud.ServiceClient, node v1.Node) (string, error) {
ipAddress, err := nodeAddressForLB(&node)
if err != nil {
return "", err
}
instanceID := node.Spec.ProviderID
if ind := strings.LastIndex(instanceID, "/"); ind >= 0 {
instanceID = instanceID[(ind + 1):]
}
interfaces, err := getAttachedInterfacesByID(compute, instanceID)
if err != nil {
return "", err
}
for _, intf := range interfaces {
for _, fixedIP := range intf.FixedIPs {
if fixedIP.IPAddress == ipAddress {
return fixedIP.SubnetID, nil
}
}
}
return "", ErrNotFound
}
// getNodeSecurityGroupIDForLB lists node-security-groups for specific nodes
func getNodeSecurityGroupIDForLB(compute *gophercloud.ServiceClient, network *gophercloud.ServiceClient, nodes []*v1.Node) ([]string, error) {
secGroupNames := sets.NewString()
for _, node := range nodes {
nodeName := types.NodeName(node.Name)
srv, err := getServerByName(compute, nodeName)
if err != nil {
return []string{}, err
}
// use the first node-security-groups
// case 0: node1:SG1 node2:SG1 return SG1
// case 1: node1:SG1 node2:SG2 return SG1,SG2
// case 2: node1:SG1,SG2 node2:SG3,SG4 return SG1,SG3
// case 3: node1:SG1,SG2 node2:SG2,SG3 return SG1,SG2
secGroupNames.Insert(srv.SecurityGroups[0]["name"].(string))
}
secGroupIDs := make([]string, secGroupNames.Len())
for i, name := range secGroupNames.List() {
secGroupID, err := groups.IDFromName(network, name)
if err != nil {
return []string{}, err
}
secGroupIDs[i] = secGroupID
}
return secGroupIDs, nil
}
// isSecurityGroupNotFound return true while 'err' is object of gophercloud.ErrResourceNotFound
func isSecurityGroupNotFound(err error) bool {
errType := reflect.TypeOf(err).String()
errTypeSlice := strings.Split(errType, ".")
errTypeValue := ""
if len(errTypeSlice) != 0 {
errTypeValue = errTypeSlice[len(errTypeSlice)-1]
}
if errTypeValue == "ErrResourceNotFound" {
return true
}
return false
}
// getFloatingNetworkIDForLB returns a floating-network-id for cluster.
func getFloatingNetworkIDForLB(client *gophercloud.ServiceClient) (string, error) {
var floatingNetworkIds []string
type NetworkWithExternalExt struct {
networks.Network
external.NetworkExternalExt
}
err := networks.List(client, networks.ListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
var externalNetwork []NetworkWithExternalExt
err := networks.ExtractNetworksInto(page, &externalNetwork)
if err != nil {
return false, err
}
for _, externalNet := range externalNetwork {
if externalNet.External {
floatingNetworkIds = append(floatingNetworkIds, externalNet.ID)
}
}
if len(floatingNetworkIds) > 1 {
return false, ErrMultipleResults
}
return true, nil
})
if err != nil {
if isNotFound(err) {
return "", ErrNotFound
}
if err == ErrMultipleResults {
klog.V(4).Infof("find multiple external networks, pick the first one when there are no explicit configuration.")
return floatingNetworkIds[0], nil
}
return "", err
}
if len(floatingNetworkIds) == 0 {
return "", ErrNotFound
}
return floatingNetworkIds[0], nil
}
// TODO: This code currently ignores 'region' and always creates a
// loadbalancer in only the current OpenStack region. We should take
// a list of regions (from config) and query/create loadbalancers in
// each region.
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one.
func (lbaas *LbaasV2) EnsureLoadBalancer(ctx context.Context, clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
klog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", clusterName, apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, nodes, apiService.Annotations)
if len(nodes) == 0 {
return nil, fmt.Errorf("there are no available nodes for LoadBalancer service %s/%s", apiService.Namespace, apiService.Name)
}
lbaas.opts.SubnetID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerSubnetID, lbaas.opts.SubnetID)
if len(lbaas.opts.SubnetID) == 0 {
// Get SubnetID automatically.
// The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
subnetID, err := getSubnetIDForLB(lbaas.compute, *nodes[0])
if err != nil {
klog.Warningf("Failed to find subnet-id for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
return nil, fmt.Errorf("no subnet-id for service %s/%s : subnet-id not set in cloud provider config, "+
"and failed to find subnet-id from OpenStack: %v", apiService.Namespace, apiService.Name, err)
}
lbaas.opts.SubnetID = subnetID
}
ports := apiService.Spec.Ports
if len(ports) == 0 {
return nil, fmt.Errorf("no ports provided to openstack load balancer")
}
floatingPool := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerFloatingNetworkID, lbaas.opts.FloatingNetworkID)
if len(floatingPool) == 0 {
var err error
floatingPool, err = getFloatingNetworkIDForLB(lbaas.network)
if err != nil {
klog.Warningf("Failed to find floating-network-id for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
}
}
var internalAnnotation bool
internal := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerInternal, "false")
switch internal {
case "true":
klog.V(4).Infof("Ensure an internal loadbalancer service.")
internalAnnotation = true
case "false":
if len(floatingPool) != 0 {
klog.V(4).Infof("Ensure an external loadbalancer service, using floatingPool: %v", floatingPool)
internalAnnotation = false
} else {
return nil, fmt.Errorf("floating-network-id or loadbalancer.openstack.org/floating-network-id should be specified when ensuring an external loadbalancer service")
}
default:
return nil, fmt.Errorf("unknown service.beta.kubernetes.io/openstack-internal-load-balancer annotation: %v, specify \"true\" or \"false\" ",
internal)
}
// Check for TCP protocol on each port
// TODO: Convert all error messages to use an event recorder
for _, port := range ports {
if port.Protocol != v1.ProtocolTCP {
return nil, fmt.Errorf("only TCP LoadBalancer is supported for openstack load balancers")
}
}
sourceRanges, err := v1service.GetLoadBalancerSourceRanges(apiService)
if err != nil {
return nil, fmt.Errorf("failed to get source ranges for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
}
if !v1service.IsAllowAll(sourceRanges) && !lbaas.opts.ManageSecurityGroups {
return nil, fmt.Errorf("source range restrictions are not supported for openstack load balancers without managing security groups")
}
affinity := apiService.Spec.SessionAffinity
var persistence *v2pools.SessionPersistence
switch affinity {
case v1.ServiceAffinityNone:
persistence = nil
case v1.ServiceAffinityClientIP:
persistence = &v2pools.SessionPersistence{Type: "SOURCE_IP"}
default:
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
}
name := lbaas.GetLoadBalancerName(ctx, clusterName, apiService)
loadbalancer, err := getLoadbalancerByName(lbaas.lb, name)
if err != nil {
if err != ErrNotFound {
return nil, fmt.Errorf("error getting loadbalancer %s: %v", name, err)
}
klog.V(2).Infof("Creating loadbalancer %s", name)
loadbalancer, err = lbaas.createLoadBalancer(apiService, name, clusterName, internalAnnotation)
if err != nil {
// Unknown error, retry later
return nil, fmt.Errorf("error creating loadbalancer %s: %v", name, err)
}
} else {
klog.V(2).Infof("LoadBalancer %s already exists", name)
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
lbmethod := v2pools.LBMethod(lbaas.opts.LBMethod)
if lbmethod == "" {
lbmethod = v2pools.LBMethodRoundRobin
}
oldListeners, err := getListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, fmt.Errorf("error getting LB %s listeners: %v", name, err)
}
for portIndex, port := range ports {
listener := getListenerForPort(oldListeners, port)
climit := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerConnLimit, "-1")
connLimit := -1
tmp, err := strconv.Atoi(climit)
if err != nil {
klog.V(4).Infof("Could not parse int value from \"%s\" error \"%v\" failing back to default", climit, err)
} else {
connLimit = tmp
}
if listener == nil {
klog.V(4).Infof("Creating listener for port %d", int(port.Port))
listener, err = listeners.Create(lbaas.lb, listeners.CreateOpts{
Name: fmt.Sprintf("listener_%s_%d", name, portIndex),
Protocol: listeners.Protocol(port.Protocol),
ProtocolPort: int(port.Port),
ConnLimit: &connLimit,
LoadbalancerID: loadbalancer.ID,
}).Extract()
if err != nil {
// Unknown error, retry later
return nil, fmt.Errorf("error creating LB listener: %v", err)
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
} else {
if connLimit != listener.ConnLimit {
klog.V(4).Infof("Updating listener connection limit from %d to %d", listener.ConnLimit, connLimit)
updateOpts := listeners.UpdateOpts{
ConnLimit: &connLimit,
}
_, err := listeners.Update(lbaas.lb, listener.ID, updateOpts).Extract()
if err != nil {
return nil, fmt.Errorf("Error updating LB listener: %v", err)
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
}
klog.V(4).Infof("Listener for %s port %d: %s", string(port.Protocol), int(port.Port), listener.ID)
// After all ports have been processed, remaining listeners are removed as obsolete.
// Pop valid listeners.
oldListeners = popListener(oldListeners, listener.ID)
pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listener.ID)
if err != nil && err != ErrNotFound {
// Unknown error, retry later
return nil, fmt.Errorf("error getting pool for listener %s: %v", listener.ID, err)
}
if pool == nil {
klog.V(4).Infof("Creating pool for listener %s", listener.ID)
pool, err = v2pools.Create(lbaas.lb, v2pools.CreateOpts{
Name: fmt.Sprintf("pool_%s_%d", name, portIndex),
Protocol: v2pools.Protocol(port.Protocol),
LBMethod: lbmethod,
ListenerID: listener.ID,
Persistence: persistence,
}).Extract()
if err != nil {
// Unknown error, retry later
return nil, fmt.Errorf("error creating pool for listener %s: %v", listener.ID, err)
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
klog.V(4).Infof("Pool for listener %s: %s", listener.ID, pool.ID)
members, err := getMembersByPoolID(lbaas.lb, pool.ID)
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("error getting pool members %s: %v", pool.ID, err)
}
for _, node := range nodes {
addr, err := nodeAddressForLB(node)
if err != nil {
if err == ErrNotFound {
// Node failure, do not create member
klog.Warningf("Failed to create LB pool member for node %s: %v", node.Name, err)
continue
} else {
return nil, fmt.Errorf("error getting address for node %s: %v", node.Name, err)
}
}
if !memberExists(members, addr, int(port.NodePort)) {
klog.V(4).Infof("Creating member for pool %s", pool.ID)
_, err := v2pools.CreateMember(lbaas.lb, pool.ID, v2pools.CreateMemberOpts{
Name: fmt.Sprintf("member_%s_%d_%s", name, portIndex, node.Name),
ProtocolPort: int(port.NodePort),
Address: addr,
SubnetID: lbaas.opts.SubnetID,
}).Extract()
if err != nil {
return nil, fmt.Errorf("error creating LB pool member for node: %s, %v", node.Name, err)
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
} else {
// After all members have been processed, remaining members are deleted as obsolete.
members = popMember(members, addr, int(port.NodePort))
}
klog.V(4).Infof("Ensured pool %s has member for %s at %s", pool.ID, node.Name, addr)
}
// Delete obsolete members for this pool
for _, member := range members {
klog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
err := v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
monitorID := pool.MonitorID
if monitorID == "" && lbaas.opts.CreateMonitor {
klog.V(4).Infof("Creating monitor for pool %s", pool.ID)
monitor, err := v2monitors.Create(lbaas.lb, v2monitors.CreateOpts{
Name: fmt.Sprintf("monitor_%s_%d", name, portIndex),
PoolID: pool.ID,
Type: string(port.Protocol),
Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()),
Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()),
MaxRetries: int(lbaas.opts.MonitorMaxRetries),
}).Extract()
if err != nil {
return nil, fmt.Errorf("error creating LB pool healthmonitor: %v", err)
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
monitorID = monitor.ID
} else if lbaas.opts.CreateMonitor == false {
klog.V(4).Infof("Do not create monitor for pool %s when create-monitor is false", pool.ID)
}
if monitorID != "" {
klog.V(4).Infof("Monitor for pool %s: %s", pool.ID, monitorID)
}
}
// All remaining listeners are obsolete, delete
for _, listener := range oldListeners {
klog.V(4).Infof("Deleting obsolete listener %s:", listener.ID)
// get pool for listener
pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listener.ID)
if err != nil && err != ErrNotFound {
return nil, fmt.Errorf("error getting pool for obsolete listener %s: %v", listener.ID, err)
}
if pool != nil {
// get and delete monitor
monitorID := pool.MonitorID
if monitorID != "" {
klog.V(4).Infof("Deleting obsolete monitor %s for pool %s", monitorID, pool.ID)
err = v2monitors.Delete(lbaas.lb, monitorID).ExtractErr()
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("error deleting obsolete monitor %s for pool %s: %v", monitorID, pool.ID, err)
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
// get and delete pool members
members, err := getMembersByPoolID(lbaas.lb, pool.ID)
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("error getting members for pool %s: %v", pool.ID, err)
}
if members != nil {
for _, member := range members {
klog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
err := v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
}
klog.V(4).Infof("Deleting obsolete pool %s for listener %s", pool.ID, listener.ID)
// delete pool
err = v2pools.Delete(lbaas.lb, pool.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("error deleting obsolete pool %s for listener %s: %v", pool.ID, listener.ID, err)
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
// delete listener
err = listeners.Delete(lbaas.lb, listener.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return nil, fmt.Errorf("error deleteting obsolete listener: %v", err)
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
klog.V(2).Infof("Deleted obsolete listener: %s", listener.ID)
}
portID := loadbalancer.VipPortID
floatIP, err := getFloatingIPByPortID(lbaas.network, portID)
if err != nil && err != ErrNotFound {
return nil, fmt.Errorf("error getting floating ip for port %s: %v", portID, err)
}
if floatIP == nil && floatingPool != "" && !internalAnnotation {
loadBalancerIP := apiService.Spec.LoadBalancerIP
needCreate := true
// check first does floatingip exist already in project, otherwise try to create new one
if loadBalancerIP != "" {
floatingip, err := getFloatingIPByFloatingIP(lbaas.network, loadBalancerIP)
if err != nil {
klog.V(4).Infof("could not find floating ip %s from project: %v", floatingip, err)
} else {
if len(floatingip.PortID) == 0 {
floatUpdateOpts := floatingips.UpdateOpts{
PortID: &portID,
}
floatIP, err = floatingips.Update(lbaas.network, floatingip.ID, floatUpdateOpts).Extract()
if err != nil {
return nil, fmt.Errorf("error updating LB floatingip %+v: %v", floatUpdateOpts, err)
} else {
needCreate = false
}
} else {
return nil, fmt.Errorf("floatingip is attached already to another port")
}
}
}
if needCreate {
klog.V(4).Infof("Creating floating ip for loadbalancer %s port %s", loadbalancer.ID, portID)
floatIPOpts := floatingips.CreateOpts{
FloatingNetworkID: floatingPool,
PortID: portID,
Description: fmt.Sprintf("Floating IP for Kubernetes external service %s from cluster %s", name, clusterName),
}
if loadBalancerIP != "" {
klog.V(4).Infof("creating a new floating ip %s", loadBalancerIP)
floatIPOpts.FloatingIP = loadBalancerIP
}
floatIP, err = floatingips.Create(lbaas.network, floatIPOpts).Extract()
if err != nil {
return nil, fmt.Errorf("error creating LB floatingip %+v: %v", floatIPOpts, err)
}
}
}
status := &v1.LoadBalancerStatus{}
if floatIP != nil {
status.Ingress = []v1.LoadBalancerIngress{{IP: floatIP.FloatingIP}}
} else {
status.Ingress = []v1.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
}
if lbaas.opts.ManageSecurityGroups {
err := lbaas.ensureSecurityGroup(clusterName, apiService, nodes, loadbalancer)
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(ctx, clusterName, apiService)
return status, err
}
}
return status, nil
}
// ensureSecurityGroup ensures security group exist for specific loadbalancer service.
// Creating security group for specific loadbalancer service when it does not exist.
func (lbaas *LbaasV2) ensureSecurityGroup(clusterName string, apiService *v1.Service, nodes []*v1.Node, loadbalancer *loadbalancers.LoadBalancer) error {
// find node-security-group for service
var err error
if len(lbaas.opts.NodeSecurityGroupIDs) == 0 {
lbaas.opts.NodeSecurityGroupIDs, err = getNodeSecurityGroupIDForLB(lbaas.compute, lbaas.network, nodes)
if err != nil {
return fmt.Errorf("failed to find node-security-group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
}
}
klog.V(4).Infof("find node-security-group %v for loadbalancer service %s/%s", lbaas.opts.NodeSecurityGroupIDs, apiService.Namespace, apiService.Name)
// get service ports
ports := apiService.Spec.Ports
if len(ports) == 0 {
return fmt.Errorf("no ports provided to openstack load balancer")
}
// get service source ranges
sourceRanges, err := v1service.GetLoadBalancerSourceRanges(apiService)
if err != nil {
return fmt.Errorf("failed to get source ranges for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
}
// ensure security group for LB
lbSecGroupName := getSecurityGroupName(apiService)
lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
if err != nil {
// If the security group of LB not exist, create it later
if isSecurityGroupNotFound(err) {
lbSecGroupID = ""
} else {
return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err)
}
}
if len(lbSecGroupID) == 0 {
// create security group
lbSecGroupCreateOpts := groups.CreateOpts{
Name: getSecurityGroupName(apiService),
Description: fmt.Sprintf("Security Group for %s/%s Service LoadBalancer in cluster %s", apiService.Namespace, apiService.Name, clusterName),
}
lbSecGroup, err := groups.Create(lbaas.network, lbSecGroupCreateOpts).Extract()
if err != nil {
return fmt.Errorf("failed to create Security Group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
}
lbSecGroupID = lbSecGroup.ID
//add rule in security group
for _, port := range ports {
for _, sourceRange := range sourceRanges.StringSlice() {
ethertype := rules.EtherType4
network, _, err := net.ParseCIDR(sourceRange)
if err != nil {
return fmt.Errorf("error parsing source range %s as a CIDR: %v", sourceRange, err)
}
if network.To4() == nil {
ethertype = rules.EtherType6
}
lbSecGroupRuleCreateOpts := rules.CreateOpts{
Direction: rules.DirIngress,
PortRangeMax: int(port.Port),
PortRangeMin: int(port.Port),
Protocol: toRuleProtocol(port.Protocol),
RemoteIPPrefix: sourceRange,
SecGroupID: lbSecGroup.ID,
EtherType: ethertype,
}
_, err = rules.Create(lbaas.network, lbSecGroupRuleCreateOpts).Extract()
if err != nil {
return fmt.Errorf("error occurred creating rule for SecGroup %s: %v", lbSecGroup.ID, err)
}
}
}
lbSecGroupRuleCreateOpts := rules.CreateOpts{
Direction: rules.DirIngress,
PortRangeMax: 4, // ICMP: Code - Values for ICMP "Destination Unreachable: Fragmentation Needed and Don't Fragment was Set"
PortRangeMin: 3, // ICMP: Type
Protocol: rules.ProtocolICMP,
RemoteIPPrefix: "0.0.0.0/0", // The Fragmentation packet can come from anywhere along the path back to the sourceRange - we need to all this from all
SecGroupID: lbSecGroup.ID,
EtherType: rules.EtherType4,
}
_, err = rules.Create(lbaas.network, lbSecGroupRuleCreateOpts).Extract()
if err != nil {
return fmt.Errorf("error occurred creating rule for SecGroup %s: %v", lbSecGroup.ID, err)
}
lbSecGroupRuleCreateOpts = rules.CreateOpts{
Direction: rules.DirIngress,
PortRangeMax: 0, // ICMP: Code - Values for ICMP "Packet Too Big"
PortRangeMin: 2, // ICMP: Type
Protocol: rules.ProtocolICMP,
RemoteIPPrefix: "::/0", // The Fragmentation packet can come from anywhere along the path back to the sourceRange - we need to all this from all
SecGroupID: lbSecGroup.ID,
EtherType: rules.EtherType6,
}
_, err = rules.Create(lbaas.network, lbSecGroupRuleCreateOpts).Extract()
if err != nil {
return fmt.Errorf("error occurred creating rule for SecGroup %s: %v", lbSecGroup.ID, err)
}
// get security groups of port
portID := loadbalancer.VipPortID
port, err := getPortByID(lbaas.network, portID)
if err != nil {
return err
}
// ensure the vip port has the security groups
found := false
for _, portSecurityGroups := range port.SecurityGroups {
if portSecurityGroups == lbSecGroup.ID {
found = true
break
}
}
// update loadbalancer vip port
if !found {
port.SecurityGroups = append(port.SecurityGroups, lbSecGroup.ID)
updateOpts := neutronports.UpdateOpts{SecurityGroups: &port.SecurityGroups}
res := neutronports.Update(lbaas.network, portID, updateOpts)
if res.Err != nil {
msg := fmt.Sprintf("Error occurred updating port %s for loadbalancer service %s/%s: %v", portID, apiService.Namespace, apiService.Name, res.Err)
return fmt.Errorf(msg)
}
}
}
// ensure rules for every node security group
for _, port := range ports {
for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
opts := rules.ListOpts{
Direction: string(rules.DirIngress),
SecGroupID: nodeSecurityGroupID,
RemoteGroupID: lbSecGroupID,
PortRangeMax: int(port.NodePort),
PortRangeMin: int(port.NodePort),
Protocol: string(port.Protocol),
}
secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
if err != nil && !isNotFound(err) {
msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
return fmt.Errorf(msg)
}
if len(secGroupRules) != 0 {
// Do not add rule when find rules for remote group in the Node Security Group
continue
}
// Add the rules in the Node Security Group
err = createNodeSecurityGroup(lbaas.network, nodeSecurityGroupID, int(port.NodePort), port.Protocol, lbSecGroupID)
if err != nil {
return fmt.Errorf("error occurred creating security group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
}
}
}
return nil
}
// UpdateLoadBalancer updates hosts under the specified load balancer.
func (lbaas *LbaasV2) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
loadBalancerName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
klog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", clusterName, loadBalancerName, nodes)
lbaas.opts.SubnetID = getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerSubnetID, lbaas.opts.SubnetID)
if len(lbaas.opts.SubnetID) == 0 && len(nodes) > 0 {
// Get SubnetID automatically.
// The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
subnetID, err := getSubnetIDForLB(lbaas.compute, *nodes[0])
if err != nil {
klog.Warningf("Failed to find subnet-id for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
return fmt.Errorf("no subnet-id for service %s/%s : subnet-id not set in cloud provider config, "+
"and failed to find subnet-id from OpenStack: %v", service.Namespace, service.Name, err)
}
lbaas.opts.SubnetID = subnetID
}
ports := service.Spec.Ports
if len(ports) == 0 {
return fmt.Errorf("no ports provided to openstack load balancer")
}
loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
if err != nil {
return err
}
if loadbalancer == nil {
return fmt.Errorf("loadbalancer %s does not exist", loadBalancerName)
}
// Get all listeners for this loadbalancer, by "port key".
type portKey struct {
Protocol listeners.Protocol
Port int
}
var listenerIDs []string
lbListeners := make(map[portKey]listeners.Listener)
allListeners, err := getListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
if err != nil {
return fmt.Errorf("error getting listeners for LB %s: %v", loadBalancerName, err)
}
for _, l := range allListeners {
key := portKey{Protocol: listeners.Protocol(l.Protocol), Port: l.ProtocolPort}
lbListeners[key] = l
listenerIDs = append(listenerIDs, l.ID)
}
// Get all pools for this loadbalancer, by listener ID.
lbPools := make(map[string]v2pools.Pool)
for _, listenerID := range listenerIDs {
pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listenerID)
if err != nil {
return fmt.Errorf("error getting pool for listener %s: %v", listenerID, err)
}
lbPools[listenerID] = *pool
}
// Compose Set of member (addresses) that _should_ exist
addrs := make(map[string]*v1.Node)
for _, node := range nodes {
addr, err := nodeAddressForLB(node)
if err != nil {
return err
}
addrs[addr] = node
}
// Check for adding/removing members associated with each port
for portIndex, port := range ports {
// Get listener associated with this port
listener, ok := lbListeners[portKey{
Protocol: toListenersProtocol(port.Protocol),
Port: int(port.Port),
}]
if !ok {
return fmt.Errorf("loadbalancer %s does not contain required listener for port %d and protocol %s", loadBalancerName, port.Port, port.Protocol)
}
// Get pool associated with this listener
pool, ok := lbPools[listener.ID]
if !ok {
return fmt.Errorf("loadbalancer %s does not contain required pool for listener %s", loadBalancerName, listener.ID)
}
// Find existing pool members (by address) for this port
getMembers, err := getMembersByPoolID(lbaas.lb, pool.ID)
if err != nil {
return fmt.Errorf("error getting pool members %s: %v", pool.ID, err)
}
members := make(map[string]v2pools.Member)
for _, member := range getMembers {
members[member.Address] = member
}
// Add any new members for this port
for addr, node := range addrs {
if _, ok := members[addr]; ok && members[addr].ProtocolPort == int(port.NodePort) {
// Already exists, do not create member
continue
}
_, err := v2pools.CreateMember(lbaas.lb, pool.ID, v2pools.CreateMemberOpts{
Name: fmt.Sprintf("member_%s_%d_%s", loadbalancer.Name, portIndex, node.Name),
Address: addr,
ProtocolPort: int(port.NodePort),
SubnetID: lbaas.opts.SubnetID,
}).Extract()
if err != nil {
return err
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
// Remove any old members for this port
for _, member := range members {
if _, ok := addrs[member.Address]; ok && member.ProtocolPort == int(port.NodePort) {
// Still present, do not delete member
continue
}
err = v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
}
if lbaas.opts.ManageSecurityGroups {
err := lbaas.updateSecurityGroup(clusterName, service, nodes, loadbalancer)
if err != nil {
return fmt.Errorf("failed to update Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
}
}
return nil
}
// updateSecurityGroup updating security group for specific loadbalancer service.
func (lbaas *LbaasV2) updateSecurityGroup(clusterName string, apiService *v1.Service, nodes []*v1.Node, loadbalancer *loadbalancers.LoadBalancer) error {
originalNodeSecurityGroupIDs := lbaas.opts.NodeSecurityGroupIDs
var err error
lbaas.opts.NodeSecurityGroupIDs, err = getNodeSecurityGroupIDForLB(lbaas.compute, lbaas.network, nodes)
if err != nil {
return fmt.Errorf("failed to find node-security-group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
}
klog.V(4).Infof("find node-security-group %v for loadbalancer service %s/%s", lbaas.opts.NodeSecurityGroupIDs, apiService.Namespace, apiService.Name)
original := sets.NewString(originalNodeSecurityGroupIDs...)
current := sets.NewString(lbaas.opts.NodeSecurityGroupIDs...)
removals := original.Difference(current)
// Generate Name
lbSecGroupName := getSecurityGroupName(apiService)
lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
if err != nil {
return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err)
}
ports := apiService.Spec.Ports
if len(ports) == 0 {
return fmt.Errorf("no ports provided to openstack load balancer")
}
for _, port := range ports {
for removal := range removals {
// Delete the rules in the Node Security Group
opts := rules.ListOpts{
Direction: string(rules.DirIngress),
SecGroupID: removal,
RemoteGroupID: lbSecGroupID,
PortRangeMax: int(port.NodePort),
PortRangeMin: int(port.NodePort),
Protocol: string(port.Protocol),
}
secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
if err != nil && !isNotFound(err) {
return fmt.Errorf("error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, removal, err)
}
for _, rule := range secGroupRules {
res := rules.Delete(lbaas.network, rule.ID)
if res.Err != nil && !isNotFound(res.Err) {
return fmt.Errorf("error occurred deleting security group rule: %s: %v", rule.ID, res.Err)
}
}
}
for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
opts := rules.ListOpts{
Direction: string(rules.DirIngress),
SecGroupID: nodeSecurityGroupID,
RemoteGroupID: lbSecGroupID,
PortRangeMax: int(port.NodePort),
PortRangeMin: int(port.NodePort),
Protocol: string(port.Protocol),
}
secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
if err != nil && !isNotFound(err) {
return fmt.Errorf("error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
}
if len(secGroupRules) != 0 {
// Do not add rule when find rules for remote group in the Node Security Group
continue
}
// Add the rules in the Node Security Group
err = createNodeSecurityGroup(lbaas.network, nodeSecurityGroupID, int(port.NodePort), port.Protocol, lbSecGroupID)
if err != nil {
return fmt.Errorf("error occurred creating security group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
}
}
}
return nil
}
// EnsureLoadBalancerDeleted deletes the specified load balancer
func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
loadBalancerName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
klog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v)", clusterName, loadBalancerName)
loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
if err != nil && err != ErrNotFound {
return err
}
if loadbalancer == nil {
return nil
}
keepFloatingAnnotation, err := getBoolFromServiceAnnotation(service, ServiceAnnotationLoadBalancerKeepFloatingIP, false)
if err != nil {
return err
}
if !keepFloatingAnnotation {
if loadbalancer.VipPortID != "" {
portID := loadbalancer.VipPortID
floatingIP, err := getFloatingIPByPortID(lbaas.network, portID)
if err != nil && err != ErrNotFound {
return err
}
if floatingIP != nil {
err = floatingips.Delete(lbaas.network, floatingIP.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
}
}
}
// delete the loadbalancer and all its sub-resources.
if lbaas.opts.UseOctavia {
deleteOpts := octavialb.DeleteOpts{Cascade: true}
if err := octavialb.Delete(lbaas.lb, loadbalancer.ID, deleteOpts).ExtractErr(); err != nil {
return fmt.Errorf("failed to delete loadbalancer %s: %v", loadbalancer.ID, err)
}
} else {
// get all listeners associated with this loadbalancer
listenerList, err := getListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
if err != nil {
return fmt.Errorf("error getting LB %s listeners: %v", loadbalancer.ID, err)
}
// get all pools (and health monitors) associated with this loadbalancer
var poolIDs []string
var monitorIDs []string
for _, listener := range listenerList {
pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listener.ID)
if err != nil && err != ErrNotFound {
return fmt.Errorf("error getting pool for listener %s: %v", listener.ID, err)
}
if pool != nil {
poolIDs = append(poolIDs, pool.ID)
// If create-monitor of cloud-config is false, pool has not monitor.
if pool.MonitorID != "" {
monitorIDs = append(monitorIDs, pool.MonitorID)
}
}
}
// delete all monitors
for _, monitorID := range monitorIDs {
err := v2monitors.Delete(lbaas.lb, monitorID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
// delete all members and pools
for _, poolID := range poolIDs {
// get members for current pool
membersList, err := getMembersByPoolID(lbaas.lb, poolID)
if err != nil && !isNotFound(err) {
return fmt.Errorf("error getting pool members %s: %v", poolID, err)
}
// delete all members for this pool
for _, member := range membersList {
err := v2pools.DeleteMember(lbaas.lb, poolID, member.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
// delete pool
err = v2pools.Delete(lbaas.lb, poolID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
// delete all listeners
for _, listener := range listenerList {
err := listeners.Delete(lbaas.lb, listener.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
// delete loadbalancer
err = loadbalancers.Delete(lbaas.lb, loadbalancer.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
err = waitLoadbalancerDeleted(lbaas.lb, loadbalancer.ID)
if err != nil {
return fmt.Errorf("failed to delete loadbalancer: %v", err)
}
}
// Delete the Security Group
if lbaas.opts.ManageSecurityGroups {
err := lbaas.EnsureSecurityGroupDeleted(clusterName, service)
if err != nil {
return fmt.Errorf("failed to delete Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
}
}
return nil
}
// EnsureSecurityGroupDeleted deleting security group for specific loadbalancer service.
func (lbaas *LbaasV2) EnsureSecurityGroupDeleted(clusterName string, service *v1.Service) error {
// Generate Name
lbSecGroupName := getSecurityGroupName(service)
lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
if err != nil {
if isSecurityGroupNotFound(err) {
// It is OK when the security group has been deleted by others.
return nil
}
return fmt.Errorf("Error occurred finding security group: %s: %v", lbSecGroupName, err)
}
lbSecGroup := groups.Delete(lbaas.network, lbSecGroupID)
if lbSecGroup.Err != nil && !isNotFound(lbSecGroup.Err) {
return lbSecGroup.Err
}
if len(lbaas.opts.NodeSecurityGroupIDs) == 0 {
// Just happen when nodes have not Security Group, or should not happen
// UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty
// And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted.
klog.Warningf("Can not find node-security-group from all the nodes of this cluster when delete loadbalancer service %s/%s",
service.Namespace, service.Name)
} else {
// Delete the rules in the Node Security Group
for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
opts := rules.ListOpts{
SecGroupID: nodeSecurityGroupID,
RemoteGroupID: lbSecGroupID,
}
secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
if err != nil && !isNotFound(err) {
msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
return fmt.Errorf(msg)
}
for _, rule := range secGroupRules {
res := rules.Delete(lbaas.network, rule.ID)
if res.Err != nil && !isNotFound(res.Err) {
return fmt.Errorf("Error occurred deleting security group rule: %s: %v", rule.ID, res.Err)
}
}
}
}
return nil
}