diff --git a/pkg/openstack/routes.go b/pkg/openstack/routes.go index 59cb5ba8b9..265299f850 100644 --- a/pkg/openstack/routes.go +++ b/pkg/openstack/routes.go @@ -18,16 +18,21 @@ package openstack import ( "context" - openstackutil "k8s.io/cloud-provider-openstack/pkg/util/openstack" + "fmt" "net" "sync" + v1 "k8s.io/api/core/v1" + openstackutil "k8s.io/cloud-provider-openstack/pkg/util/openstack" + "github.com/gophercloud/gophercloud" "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/layer3/extraroutes" "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/layer3/routers" + "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/groups" + "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/rules" "github.com/gophercloud/gophercloud/openstack/networking/v2/ports" + secgroups "github.com/gophercloud/utils/openstack/networking/v2/extensions/security/groups" - "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" cloudprovider "k8s.io/cloud-provider" @@ -40,6 +45,9 @@ import ( type Routes struct { network *gophercloud.ServiceClient os *OpenStack + // Bind to every nodes' port, there are a bulk of rules in the SecurityGroup, + // these rules' RemoteIPPrefix are all of the nodes' PodCidr and NodeAddress. + nodeSecurityGroupId string // router's private network IDs networkIDs []string // whether Neutron supports "extraroute-atomic" extension @@ -59,12 +67,93 @@ func NewRoutes(os *OpenStack, network *gophercloud.ServiceClient, atomicRoutes b return nil, errors.ErrNoRouterID } - return &Routes{ + routes := Routes{ network: network, os: os, atomicRoutes: atomicRoutes, allowedAddressPairs: allowedAddressPairs, - }, nil + } + + err := routes.getOrCreateNodeSecurityGroup(os.routeOpts.RouterID) + if err != nil { + return nil, err + } + + return &routes, nil +} + +func (r *Routes) getOrCreateNodeSecurityGroup(router_id string) error { + // TODO(jeffyjf) Maybe one router can be used to support multiple + // clusters. Whether it is appropriate that use router id as the security + // group name's identity. Maybe we should introduce other mechanism to identify + // the security group. + sgName := fmt.Sprintf("k8s-node-sg-%s", router_id) + sgId, err := secgroups.IDFromName(r.network, sgName) + if err != nil { + if errors.IsNotFound(err) { + mc := metrics.NewMetricContext("security_group", "create") + group, err := groups.Create(r.network, groups.CreateOpts{Name: sgName}).Extract() + if mc.ObserveRequest(err) != nil { + return err + } + sgId = group.ID + } else { + return err + } + } + r.nodeSecurityGroupId = sgId + return nil +} + +// checkSecurityGroupRules check whether the ingress security group rules that related to the route is existing. +// these security group rules use to ensure other nodes permit the traffic from this node (nodeName) pass through. +func (r *Routes) checkSecurityGroupRules(route routers.Route, existingRules []rules.SecGroupRule, nodeName types.NodeName, nodes []*v1.Node) bool { + sgRuleForPodCidrFound := false + sgRuleForNodeAddressFound := false + ip, _, _ := net.ParseCIDR(route.DestinationCIDR) + isCIDRv6 := ip.To4() == nil + nodeAddr := getAddrByNodeName(nodeName, isCIDRv6, nodes) + nodeAddrCidr := fmt.Sprintf("%s/32", nodeAddr) + + for _, rule := range existingRules { + // route.DestinationCIDR is equivalent with the node (nodeName)'s PodCidr + if route.DestinationCIDR == rule.RemoteIPPrefix { + sgRuleForPodCidrFound = true + } + if rule.RemoteGroupID == r.nodeSecurityGroupId || rule.RemoteIPPrefix == nodeAddrCidr { + sgRuleForNodeAddressFound = true + } + if sgRuleForPodCidrFound && sgRuleForNodeAddressFound { + return true + } + } + return false +} + +// checkPortSecurityRules check the port's security rules (SecurityGroups and AllowAddressPairs) whether is valid +func (r *Routes) checkPortSecurityRules(port *PortWithPortSecurity, route routers.Route) bool { + if !port.PortSecurityEnabled { + return true + } + // check if the node security group bind to the port + nodePortBindSecurityGroup := false + for _, sg := range port.SecurityGroups { + if sg == r.nodeSecurityGroupId { + nodePortBindSecurityGroup = true + break + } + } + if !nodePortBindSecurityGroup { + return false + } + + // check whether the related AllowAddressPair is existing + for _, addrPair := range port.AllowedAddressPairs { + if addrPair.IPAddress == route.DestinationCIDR { + return true + } + } + return false } // ListRoutes lists all managed routes that belong to the specified clusterName @@ -87,8 +176,35 @@ func (r *Routes) ListRoutes(ctx context.Context, clusterName string) ([]*cloudpr } routes := make([]*cloudprovider.Route, 0, len(router.Routes)) + + // detect router's private network ID for further VM ports filtering + r.networkIDs, err = getRouterNetworkIDs(r.network, r.os.routeOpts.RouterID) + if err != nil { + return nil, err + } + + rules, err := openstackutil.GetSecurityGroupRules(r.network, rules.ListOpts{SecGroupID: r.nodeSecurityGroupId, Direction: string(rules.DirIngress)}) + if err != nil { + return nil, err + } + for _, item := range router.Routes { nodeName, foundNode := getNodeNameByAddr(item.NextHop, nodes) + if foundNode { + if !r.checkSecurityGroupRules(item, rules, nodeName, nodes) { + continue + } + + // get the node port that the route next hop addr belong to + port, err := getPortByIP(r.network, item.NextHop, r.networkIDs) + if err != nil { + return nil, err + } + + if !r.checkPortSecurityRules(port, item) { + continue + } + } route := cloudprovider.Route{ Name: item.DestinationCIDR, TargetNode: nodeName, //contains the nexthop address if node name was not found @@ -98,12 +214,6 @@ func (r *Routes) ListRoutes(ctx context.Context, clusterName string) ([]*cloudpr routes = append(routes, &route) } - // detect router's private network ID for further VM ports filtering - r.networkIDs, err = getRouterNetworkIDs(r.network, r.os.routeOpts.RouterID) - if err != nil { - return nil, err - } - return routes, nil } @@ -268,6 +378,65 @@ func updateAllowedAddressPairs(network *gophercloud.ServiceClient, port *PortWit return unwinder, nil } +// updateSecurityGroup update the port's security groups +func updateSecurityGroup(network *gophercloud.ServiceClient, port *PortWithPortSecurity, sgs []string) (func(), error) { + origSgs := port.SecurityGroups + mc := metrics.NewMetricContext("port", "update") + _, err := ports.Update(network, port.ID, ports.UpdateOpts{ + SecurityGroups: &sgs, + }).Extract() + if mc.ObserveRequest(err) != nil { + return nil, err + } + + unwinder := func() { + klog.V(4).Infof("Reverting security-groups change to port %v", port.ID) + mc := metrics.NewMetricContext("port", "update") + _, err := ports.Update(network, port.ID, ports.UpdateOpts{ + SecurityGroups: &origSgs, + }).Extract() + if mc.ObserveRequest(err) != nil { + klog.Warningf("Unable to reset port's security-groups during error unwind: %v", err) + } + } + + return unwinder, nil +} + +func createSecurityGroupRule(network *gophercloud.ServiceClient, rule rules.CreateOpts) (func(), error) { + mc := metrics.NewMetricContext("security_group_rule", "create") + newRule, err := rules.Create(network, rule).Extract() + if mc.ObserveRequest(err) != nil { + return nil, err + } + unwinder := func() { + klog.V(4).Infof("Reverting security-group-rule creation %v", newRule.ID) + mc := metrics.NewMetricContext("security_group_rule", "delete") + err := rules.Delete(network, newRule.ID).ExtractErr() + if mc.ObserveRequest(err) != nil { + klog.Warningf("Unable to revert security-group-rule creation during error unwind: %v", err) + } + } + return unwinder, nil +} + +func deleteSecurityGroupRule(network *gophercloud.ServiceClient, rule *rules.SecGroupRule) (func(), error) { + mc := metrics.NewMetricContext("security-group-rule", "delete") + err := rules.Delete(network, rule.ID).ExtractErr() + if mc.ObserveRequest(err) != nil { + return nil, err + } + unwinder := func() { + klog.V(4).Infof("Reverting security_group_rule deletion %v", rule) + mc := metrics.NewMetricContext("security-group-rule", "create") + _, err := rules.Create(network, rules.CreateOpts{SecGroupID: rule.ID, RemoteIPPrefix: rule.RemoteIPPrefix}).Extract() + if mc.ObserveRequest(err) != nil { + klog.Warningf("Unable to revert security_group_rule deletion error unwind: %v", err) + } + } + return unwinder, nil +} + // CreateRoute creates the described managed route func (r *Routes) CreateRoute(ctx context.Context, clusterName string, nameHint string, route *cloudprovider.Route) error { ip, _, _ := net.ParseCIDR(route.DestinationCIDR) @@ -288,82 +457,161 @@ func (r *Routes) CreateRoute(ctx context.Context, clusterName string, nameHint s klog.V(4).Infof("Using nexthop %v for node %v", addr, route.TargetNode) - if !r.atomicRoutes { - // classical logic - r.Lock() - defer r.Unlock() + mc := metrics.NewMetricContext("router", "get") + router, err := routers.Get(r.network, r.os.routeOpts.RouterID).Extract() + if mc.ObserveRequest(err) != nil { + return err + } - mc := metrics.NewMetricContext("router", "get") - router, err := routers.Get(r.network, r.os.routeOpts.RouterID).Extract() - if mc.ObserveRequest(err) != nil { - return err + routeFound := false + for _, item := range router.Routes { + if item.DestinationCIDR == route.DestinationCIDR && item.NextHop == addr { + routeFound = true + break } + } - routes := router.Routes + if !routeFound { + if !r.atomicRoutes { + // classical logic + r.Lock() + defer r.Unlock() + + routes := append(router.Routes, routers.Route{ + DestinationCIDR: route.DestinationCIDR, + NextHop: addr, + }) - for _, item := range routes { - if item.DestinationCIDR == route.DestinationCIDR && item.NextHop == addr { - klog.V(4).Infof("Skipping existing route: %v", route) - return nil + unwind, err := updateRoutes(r.network, router, routes) + if err != nil { + return err } - } - routes = append(routes, routers.Route{ - DestinationCIDR: route.DestinationCIDR, - NextHop: addr, - }) + defer onFailure.call(unwind) + } else { + // atomic route update + route := []routers.Route{{ + DestinationCIDR: route.DestinationCIDR, + NextHop: addr, + }} + unwind, err := addRoute(r.network, r.os.routeOpts.RouterID, route) + if err != nil { + return err + } - unwind, err := updateRoutes(r.network, router, routes) + defer onFailure.call(unwind) + } + } + + // update node security group rules, so that other nodes (already bound this node security group) permit the traffic from this + // node (route.TargetNode) or this node's pods pass through + origRules, err := openstackutil.GetSecurityGroupRules(r.network, rules.ListOpts{SecGroupID: r.nodeSecurityGroupId, Direction: string(rules.DirIngress)}) + if err != nil { + return err + } + sgRuleForPodCidrFound := false + sgRuleForNodeAddressFound := false + nodeAddrCidr := fmt.Sprintf("%s/32", addr) + for _, rule := range origRules { + if rule.RemoteIPPrefix == route.DestinationCIDR { + sgRuleForPodCidrFound = true + } + if rule.RemoteGroupID == r.nodeSecurityGroupId || rule.RemoteIPPrefix == nodeAddrCidr { + sgRuleForNodeAddressFound = true + } + if sgRuleForPodCidrFound && sgRuleForNodeAddressFound { + break + } + } + if !sgRuleForPodCidrFound { + // add security group rule for the pods of this node (route.TargetNode) + etherType := rules.EtherType4 + if isCIDRv6 { + etherType = rules.EtherType6 + } + unwind, err := createSecurityGroupRule( + r.network, + rules.CreateOpts{ + SecGroupID: r.nodeSecurityGroupId, + RemoteIPPrefix: route.DestinationCIDR, + Direction: rules.DirIngress, + EtherType: etherType}) if err != nil { return err } - defer onFailure.call(unwind) - } else { - // atomic route update - route := []routers.Route{{ - DestinationCIDR: route.DestinationCIDR, - NextHop: addr, - }} - unwind, err := addRoute(r.network, r.os.routeOpts.RouterID, route) + } + if !sgRuleForNodeAddressFound { + // add security group rule for this node (route.TargetNode) + etherType := rules.EtherType4 + if isCIDRv6 { + etherType = rules.EtherType6 + } + unwind, err := createSecurityGroupRule( + r.network, + rules.CreateOpts{ + SecGroupID: r.nodeSecurityGroupId, + RemoteIPPrefix: nodeAddrCidr, + Direction: rules.DirIngress, + EtherType: etherType}) if err != nil { return err } - defer onFailure.call(unwind) } - if !r.allowedAddressPairs { - klog.V(4).Infof("Route created (skipping the allowed_address_pairs update): %v", route) - onFailure.disarm() - return nil - } - // get the port of addr on target node. port, err := getPortByIP(r.network, addr, r.networkIDs) if err != nil { return err } + if !port.PortSecurityEnabled { - klog.Warningf("Skipping allowed_address_pair for port: %s", port.ID) + klog.Warningf("Skipping updation of the port : %s (allowed_address_pairs and node security_group)", port.ID) onFailure.disarm() return nil } - found := false - for _, item := range port.AllowedAddressPairs { - if item.IPAddress == route.DestinationCIDR { - klog.V(4).Infof("Found existing allowed-address-pair: %v", item) - found = true - break + if !r.allowedAddressPairs { + klog.V(4).Infof("Route created (skipping the allowed_address_pairs update): %v", route) + } else { + // update node port's AllowedAddressPairs, so that the traffic access the pods/services of this + // node (route.TargetNode) can enter into this node. in other words, permitting the packets the + // destination addresses are this nodes' pods/services enter into this node. + allowAddressPairFound := false + for _, item := range port.AllowedAddressPairs { + if item.IPAddress == route.DestinationCIDR { + klog.V(4).Infof("Found existing allowed-address-pair: %v", item) + allowAddressPairFound = true + break + } + } + var newPairs []ports.AddressPair + if !allowAddressPairFound { + newPairs = append(port.AllowedAddressPairs, ports.AddressPair{ + IPAddress: route.DestinationCIDR, + }) + unwind, err := updateAllowedAddressPairs(r.network, port, newPairs) + if err != nil { + return err + } + defer onFailure.call(unwind) } } - if !found { - newPairs := append(port.AllowedAddressPairs, ports.AddressPair{ - IPAddress: route.DestinationCIDR, - }) - unwind, err := updateAllowedAddressPairs(r.network, port, newPairs) + // node port bind node security group, so that other nodes' traffic can enter + // into this node (route.TargetNode). in other words, permitting the packets the + // source addresses are other nodes or their pods enter ino this node (route.TargetNode). + nodePortBindSecurityGroup := false + for _, sg := range port.SecurityGroups { + if sg == r.nodeSecurityGroupId { + nodePortBindSecurityGroup = true + break + } + } + if !nodePortBindSecurityGroup { + newSgs := append(port.SecurityGroups, r.nodeSecurityGroupId) + unwind, err := updateSecurityGroup(r.network, port, newSgs) if err != nil { return err } @@ -452,10 +700,22 @@ func (r *Routes) DeleteRoute(ctx context.Context, clusterName string, route *clo defer onFailure.call(unwind) } - if !r.allowedAddressPairs { - klog.V(4).Infof("Route deleted (skipping the allowed_address_pairs update): %v", route) - onFailure.disarm() - return nil + origRules, err := openstackutil.GetSecurityGroupRules(r.network, rules.ListOpts{SecGroupID: r.nodeSecurityGroupId, Direction: string(rules.DirIngress)}) + if err != nil { + return err + } + var staleRule *rules.SecGroupRule + for _, rule := range origRules { + if rule.RemoteIPPrefix == route.DestinationCIDR { + staleRule = &rule + } + } + if staleRule != nil { + unwind, err := deleteSecurityGroupRule(r.network, staleRule) + if err != nil { + return err + } + defer onFailure.call(unwind) } // get the port of addr on target node. @@ -464,26 +724,47 @@ func (r *Routes) DeleteRoute(ctx context.Context, clusterName string, route *clo return err } if !port.PortSecurityEnabled { - klog.Warningf("Skipping allowed_address_pair for port: %s", port.ID) + klog.Warningf("Skipping updation of port : %s (allowed_address_pairs and node security_group)", port.ID) onFailure.disarm() return nil } - addrPairs := port.AllowedAddressPairs - index := -1 - for i, item := range addrPairs { - if item.IPAddress == route.DestinationCIDR { - index = i - break + if !r.allowedAddressPairs { + klog.V(4).Infof("Route deleted (skipping the allowed_address_pairs update): %v", route) + } else { + addrPairs := port.AllowedAddressPairs + addrPairIndex := -1 + for i, item := range addrPairs { + if item.IPAddress == route.DestinationCIDR { + addrPairIndex = i + break + } + } + if addrPairIndex != -1 { + // Delete element `index` + addrPairs[addrPairIndex] = addrPairs[len(addrPairs)-1] + addrPairs = addrPairs[:len(addrPairs)-1] + + unwind, err := updateAllowedAddressPairs(r.network, port, addrPairs) + if err != nil { + return err + } + defer onFailure.call(unwind) } } - if index != -1 { + sgs := port.SecurityGroups + sgIndex := -1 + for i, item := range sgs { + if item == r.nodeSecurityGroupId { + sgIndex = i + } + } + if sgIndex != -1 { // Delete element `index` - addrPairs[index] = addrPairs[len(addrPairs)-1] - addrPairs = addrPairs[:len(addrPairs)-1] - - unwind, err := updateAllowedAddressPairs(r.network, port, addrPairs) + sgs[sgIndex] = sgs[len(sgs)-1] + sgs = sgs[:len(sgs)-1] + unwind, err := updateSecurityGroup(r.network, port, sgs) if err != nil { return err }