Skip to content

Commit

Permalink
Merge pull request #95 from dulek/mover-interface
Browse files Browse the repository at this point in the history
Add MovePrivateIP and its OpenStack implementation
  • Loading branch information
openshift-merge-robot committed Feb 28, 2023
2 parents 28c6926 + 8376880 commit f4aeb70
Show file tree
Hide file tree
Showing 4 changed files with 315 additions and 83 deletions.
14 changes: 14 additions & 0 deletions pkg/cloudprovider/cloudprovider.go
Expand Up @@ -59,6 +59,20 @@ type CloudProviderIntf interface {
GetNodeEgressIPConfiguration(node *corev1.Node, cloudPrivateIPConfigs []*v1.CloudPrivateIPConfig) ([]*NodeEgressIPConfiguration, error)
}

// CloudProviderWithMoveIntf is additional interface that can be added to cloud
// plugins that can benefit from a separate set of operations on IP address
// failover, instead of running ReleasePrivateIP followed by AssignPrivateIP.
type CloudProviderWithMoveIntf interface {
// MovePrivateIP is called instead of ReleasePrivateIP followed by
// AssignPrivateIP if plugin implements CloudProviderWithMoveIntf. It
// should effectively move IP address from nodeToDel to nodeToAdd, but not
// necessarily remove resources from the cloud. E.g. in case of OpenStack
// we don't want to delete the reservation Neutron port, but rather just
// manipulate allowedAddressPairs on the nodeToDel and nodeToAdd ports to
// move the IP from one node to another.
MovePrivateIP(ip net.IP, nodeToAdd *corev1.Node, nodeToDel *corev1.Node) error
}

// CloudProviderConfig is all the command-line options needed to initialize
// a cloud provider client.
type CloudProviderConfig struct {
Expand Down
240 changes: 181 additions & 59 deletions pkg/cloudprovider/openstack.go
Expand Up @@ -57,6 +57,7 @@ const (
// to the OpenStack API
type OpenStack struct {
CloudProvider
CloudProviderWithMoveIntf
novaClient *gophercloud.ServiceClient
neutronClient *gophercloud.ServiceClient
portLockMapMutex sync.Mutex
Expand Down Expand Up @@ -162,35 +163,19 @@ func (o *OpenStack) initCredentials() error {
return nil
}

// AssignPrivateIP attempts to assigning the IP address provided to the VM
// instance corresponding to the corev1.Node provided on the cloud the
// cluster is deployed on.
// NOTE: This operation is performed against all interfaces that are attached
// to the server. In case that an instance has 2 interfaces with the same CIDR
// that this IP address could fit in, the first interface that is found will be used.
// No guarantees about the correct interface ordering are given in such a case.
// Throw an AlreadyExistingIPError if the IP provided is already associated with the
// node, it's up to the caller to decide what to do with that.
// NOTE: For OpenStack, this is a 2 step operation which is not atomic:
//
// a) Reserve a neutron port.
// b) Add the IP address to the allowed_address_pairs field.
//
// If step b) fails, then we will try to undo step a). However, if this undo fails,
// then we will be in a situation where the user or an upper layer will have to call
// ReleasePrivateIP to get out of this situation.
func (o *OpenStack) AssignPrivateIP(ip net.IP, node *corev1.Node) error {
// findAssignSubnetAndPort attempts to identify a subnet and server port for an AssignPrivateIP operation on a given node.
func (o *OpenStack) findAssignSubnetAndPort(ip net.IP, node *corev1.Node) (*neutronsubnets.Subnet, *neutronports.Port, error) {
if node == nil {
return fmt.Errorf("invalid nil pointer provided for node when trying to assign private IP %s", ip.String())
return nil, nil, fmt.Errorf("invalid nil pointer provided for node when trying to assign private IP %s", ip.String())
}
// List all ports that are attached to this server.
serverID, err := getNovaServerIDFromProviderID(node.Spec.ProviderID)
if err != nil {
return err
return nil, nil, err
}
serverPorts, err := o.listNovaServerPorts(serverID)
if err != nil {
return err
return nil, nil, err
}

// Loop over all ports that are attached to this nova instance and find the subnets
Expand All @@ -202,7 +187,7 @@ func (o *OpenStack) AssignPrivateIP(ip net.IP, node *corev1.Node) error {
// This is part of normal operation.
// Callers will likely ignore this and go on with their business logic and
// report success to the user.
return AlreadyExistingIPError
return nil, nil, AlreadyExistingIPError
}

// Get all subnets that are attached to this port.
Expand All @@ -212,12 +197,9 @@ func (o *OpenStack) AssignPrivateIP(ip net.IP, node *corev1.Node) error {
continue
}
// 1) Loop over all subnets of the port and check if the IP address fits inside the subnet CIDR.
// If the IP address is inside the subnet:
// 2) Reserve the IP address on the subnet by creating a new unattached neutron port.
// Set variable unboundPort, and exit out of the subnet loop.
// 3) Then, add the IP address to the port's allowed_address_pairs.
// 4) Return nil to indicate success if steps 2 and 3 passed.
// 5) Throw an error if the IP address does not fit in any of the attached network's subnets.
// If the IP address is inside the subnet:
// 2) Return it as this is the subnet and port we were looking for.
// 3) Throw an error if the IP address does not fit in any of the attached network's subnets.
var matchingSubnet *neutronsubnets.Subnet
for _, s := range subnets {
// Because we're dealing with a pointer here for matchingSubnet:
Expand All @@ -237,45 +219,132 @@ func (o *OpenStack) AssignPrivateIP(ip net.IP, node *corev1.Node) error {
continue
}
if matchingSubnet != nil {
return fmt.Errorf("requested IP address %s for node %s and port %s matches 2 different subnets, %s and %s",
return nil, nil, fmt.Errorf("requested IP address %s for node %s and port %s matches 2 different subnets, %s and %s",
ip, node.Name, serverPort.ID, matchingSubnet.ID, s.ID)
}
matchingSubnet = &s
}
if matchingSubnet != nil {
// 2) Reserve the IP address on the subnet by creating a new unattached neutron port.
unboundPort, err := o.reserveNeutronIPAddress(*matchingSubnet, ip, serverID)
if err != nil {
return err
// 2) Return the matching subnet and port.
return matchingSubnet, &serverPort, nil
}
}

// 3) The IP address does not fit in any of the attached networks' subnets.
return nil, nil, fmt.Errorf("could not assign IP address %s to node %s", ip, node.Name)
}

// AssignPrivateIP attempts to assigning the IP address provided to the VM
// instance corresponding to the corev1.Node provided on the cloud the
// cluster is deployed on.
// NOTE: This operation is performed against all interfaces that are attached
// to the server. In case that an instance has 2 interfaces with the same CIDR
// that this IP address could fit in, the first interface that is found will be used.
// No guarantees about the correct interface ordering are given in such a case.
// Throw an AlreadyExistingIPError if the IP provided is already associated with the
// node, it's up to the caller to decide what to do with that.
// NOTE: For OpenStack, this is a 2 step operation which is not atomic:
//
// a) Reserve a neutron port.
// b) Add the IP address to the allowed_address_pairs field.
//
// If step b) fails, then we will try to undo step a). However, if this undo fails,
// then we will be in a situation where the user or an upper layer will have to call
// ReleasePrivateIP to get out of this situation.
func (o *OpenStack) AssignPrivateIP(ip net.IP, node *corev1.Node) error {
if node == nil {
return fmt.Errorf("invalid nil pointer provided for node when trying to assign private IP %s", ip.String())
}
serverID, err := getNovaServerIDFromProviderID(node.Spec.ProviderID)
if err != nil {
return err
}

matchingSubnet, matchingPort, err := o.findAssignSubnetAndPort(ip, node)
if err != nil {
return err
}

// Reserve the IP address on the subnet by creating a new unattached neutron port.
unboundPort, err := o.reserveNeutronIPAddress(*matchingSubnet, ip, serverID)
if err != nil {
return err
}
// Then, add the IP address to the port's allowed_address_pairs.
// TODO: use a more elegant retry mechanism.
if err = o.allowIPAddressOnNeutronPort(matchingPort.ID, ip); err != nil && !errors.Is(err, AlreadyExistingIPError) {
// Try to clean up the allocated port if adding the IP to allowed_address_pairs failed.
// Try this 10 times, but if this operation fails more than that, then user intervention is needed or
// the upper layer must call ReleasePrivateIP (because if the neutron port exists and holds
// a reservation, then the assign step will not continue after step 2).
var errRelease error
var releaseStatus string
for i := 0; i < 10; i++ {
errRelease = o.releaseNeutronIPAddress(*unboundPort, serverID)
// If the release operation was successful, then we are done.
if errRelease == nil {
releaseStatus = "Released neutron port reservation."
break
}
// 3) Then, add the IP address to the port's allowed_address_pairs.
// TODO: use a more elegant retry mechanism.
if err = o.allowIPAddressOnNeutronPort(serverPort.ID, ip); err != nil && !errors.Is(err, AlreadyExistingIPError) {
// Try to clean up the allocated port if adding the IP to allowed_address_pairs failed.
// Try this 10 times, but if this operation fails more than that, then user intervention is needed or
// the upper layer must call ReleasePrivateIP (because if the neutron port exists and holds
// a reservation, then the assign step will not continue after step 2).
var errRelease error
var releaseStatus string
for i := 0; i < 10; i++ {
errRelease = o.releaseNeutronIPAddress(*unboundPort, serverID)
// If the release operation was successful, then we are done.
if errRelease == nil {
releaseStatus = "Released neutron port reservation."
break
}
// Otherwise store the error message and retry.
releaseStatus = fmt.Sprintf("Could not release neutron port reservation after %d tries, err: %q", i+1, errRelease)
}
return fmt.Errorf("could not allow IP address %s on port %s, err: %q. %s", ip.String(), serverPort.ID, err, releaseStatus)
// Otherwise store the error message and retry.
releaseStatus = fmt.Sprintf("Could not release neutron port reservation after %d tries, err: %q", i+1, errRelease)
}
return fmt.Errorf("could not allow IP address %s on port %s, err: %q. %s", ip.String(), matchingPort.ID, err, releaseStatus)
}
// Return nil to indicate success if steps 2 and 3 passed.
return nil
}

// MovePrivateIP implements moving the IP from one node to another to serve cases like a failover.
// What's different from calling ReleasePrivateIP followed by AssignPrivateIP is that the reservation
// Neutron port will not get deleted - MovePrivateIP will only change the allowed_address_pairs on the node's
// ports to remove IP from nodeToDel and add it to nodeToAdd and update the existing reservation port with the DeviceID
// of nodeToAdd. Additionally, if reservation port is missing MovePrivateIP will attempt to recreate it (this is a
// corner case and should not happen in normal operation).
func (o *OpenStack) MovePrivateIP(ip net.IP, nodeToAdd, nodeToDel *corev1.Node) error {
if nodeToAdd == nil || nodeToDel == nil {
return fmt.Errorf("invalid nil pointer provided for node when trying to move IP %s", ip.String())
}

// List all ports that are attached to this server.
serverID, err := getNovaServerIDFromProviderID(nodeToDel.Spec.ProviderID)
if err != nil {
return err
}
serverPorts, err := o.listNovaServerPorts(serverID)
if err != nil {
return err
}

// Loop over all ports that are attached to this nova instance.
for _, serverPort := range serverPorts {
if isIPAddressAllowedOnNeutronPort(serverPort, ip) {
if err = o.unallowIPAddressOnNeutronPort(serverPort.ID, ip); err != nil {
return err
}
// 4) Return nil to indicate success if steps 2 and 3 passed.
return nil
}
}

// 5) The IP address does not fit in any of the attached networks' subnets.
return fmt.Errorf("could not assign IP address %s to node %s", ip, node.Name)
subnet, port, err := o.findAssignSubnetAndPort(ip, nodeToAdd)
if err != nil {
return err
}

// This call is to double-check if the reservation port exists and update its DeviceID. If reservation port is
// missing it will be recreated.
serverID, err = getNovaServerIDFromProviderID(nodeToAdd.Spec.ProviderID) // got to use new node's ProviderID now
if err != nil {
return err
}
_, err = o.reserveNeutronIPAddress(*subnet, ip, serverID)
if err != nil {
return err
}

if err = o.allowIPAddressOnNeutronPort(port.ID, ip); err != nil && !errors.Is(err, AlreadyExistingIPError) {
return fmt.Errorf("could not allow IP address %s on port %s, err: %q", ip.String(), port.ID, err)
}
return nil
}

// ReleasePrivateIP attempts to release the IP address provided from the
Expand Down Expand Up @@ -532,6 +601,10 @@ func (o *OpenStack) getNeutronPortNodeEgressIPConfiguration(p neutronports.Port,
// the given subnet. This will serve as our IPAM as it is impossible to create 2 ports
// with the same IP on the same subnet. The created port will be identified with a custom
// DeviceID and DeviceOwner.
// If create call returns error we'll check if it's 409 Conflict. If so, we'll try to fetch
// list of ports with matching IP and DeviceOwner on that subnet. In case of success, we'll
// assume this is an exisitng reservation port for that EgressIP, check if DeviceID needs to
// be updated, update it if so and return that already existing port.
// NOTE: We are not using tags. According to the neutron API, it's possible to add a tag when creating
// a port. But gophercloud does not allow us to do that and we must use a 2 step process (create port, then
// add tag).
Expand All @@ -540,7 +613,8 @@ func (o *OpenStack) reserveNeutronIPAddress(s neutronsubnets.Subnet, ip net.IP,
return nil, fmt.Errorf("cannot assign IP address %s on subnet %s with an invalid serverID '%s'", ip.String(), s.ID, serverID)
}

// Now, create the port.
// Create the port.
expectedDeviceID := generateDeviceID(serverID)
opts := neutronports.CreateOpts{
NetworkID: s.NetworkID,
FixedIPs: []neutronports.IP{
Expand All @@ -550,11 +624,59 @@ func (o *OpenStack) reserveNeutronIPAddress(s neutronsubnets.Subnet, ip net.IP,
},
},
DeviceOwner: egressIPTag,
DeviceID: generateDeviceID(serverID),
DeviceID: expectedDeviceID,
Name: fmt.Sprintf("egressip-%s", ip.String()),
}
p, err := neutronports.Create(o.neutronClient, opts).Extract()

if err != nil {
// Let's check if error suggests that port with that IP already exists in that subnet.
if _, ok := err.(gophercloud.ErrDefault409); ok {
klog.Infof("Got conflict when trying to create reservation port with IP %s. Checking for an existing "+
"reservation port", ip.String())
// If so, let's get the port and check if it's our reservation port. It's possible we've created it earlier.
// It's also possible user configured a taken range, we should error out in such case.
opts := neutronports.ListOpts{
// {SubnetID: <id>, IPAddress: <ip>} would generate a query like
// fixed_ips=subnet_id%3D<id>,ip_address%3D<ip>. Instead we need
// fixed_ips=subnet_id%3D<id>&fixed_ips=ip_address%3D<ip>, hence we use
// {SubnetID: s.ID}, {IPAddress: ip.String()}
FixedIPs: []neutronports.FixedIPOpts{{SubnetID: s.ID}, {IPAddress: ip.String()}},
DeviceOwner: egressIPTag,
}
page, err := neutronports.List(o.neutronClient, opts).AllPages()
if err != nil {
return nil, err
}
ports, err := neutronports.ExtractPorts(page)
if err != nil {
return nil, err
}
if len(ports) > 1 {
// This is unexpected, Neutron should not allow multiple ports with the same IP
// in one subnet
return p, fmt.Errorf(
"bogus result from Neutron, multiple ports with IP %s in subnet %s", ip.String(), s.ID)
} else if len(ports) == 1 {
// We've got our port, we'll return it, but first let's check if DeviceOwner is set correctly.
p = &ports[0]
klog.Infof("Found reservation port %s for IP %s. Reusing it", p.ID, ip.String())
if p.DeviceID != expectedDeviceID {
// If not, we got to update it. We intend to replace the result with updated representation of the port.
p, err = neutronports.Update(o.neutronClient, p.ID, neutronports.UpdateOpts{DeviceID: &expectedDeviceID}).Extract()
if err != nil {
return nil, err
}
klog.Infof("Port %s updated with DeviceID %s", p.ID, expectedDeviceID)
}
return p, nil
}
// The only case left is 0 results, which indicate that the port that is using that IP is not ours
// (DeviceOwner doesn't match). So let's log a warning and return the 409 error as a legitimate indicator
// of something else using the IP we intended to use. Most likely it's a configuration issue.
klog.Errorf("Conflict when creating a reservation port with IP %s on subnet %s. Most likely the IP is "+
"already in use on the subnet", ip.String(), s.ID)
}
return nil, err
}

Expand Down
28 changes: 23 additions & 5 deletions pkg/cloudprovider/openstack_test.go
Expand Up @@ -408,6 +408,14 @@ func portListHandler(t *testing.T, w http.ResponseWriter, r *http.Request) {
deviceID := r.URL.Query().Get("device_id")
deviceOwner := r.URL.Query().Get("device_owner")
networkID := r.URL.Query().Get("network_id")
IPs := r.URL.Query()["fixed_ips"]
IP := ""
for _, val := range IPs {
if strings.HasPrefix(val, "ip_address=") {
IP = strings.TrimPrefix(val, "ip_address=")
break
}
}

var portList []neutronports.Port
for _, p := range portMap {
Expand All @@ -420,6 +428,17 @@ func portListHandler(t *testing.T, w http.ResponseWriter, r *http.Request) {
if networkID != "" && networkID != p.NetworkID {
continue
}
if IP != "" {
matched := false
for _, fixedIP := range p.FixedIPs {
if fixedIP.IPAddress == IP {
matched = true
}
}
if !matched {
continue
}
}
portList = append(portList, p)
}

Expand Down Expand Up @@ -1196,11 +1215,10 @@ func TestReserveAndReleaseNeutronIPAddress(t *testing.T) {
},
// ... and try to create a duplicate of it.
{
subnet: subnetMap["de0cda14-6ac6-4439-bc94-da0a27938b7b"],
ip: net.ParseIP("2000::9"), // reserving the same IP address 2x shall fail
reserve: true,
nodeName: "node1",
errString: "but got 409 instead",
subnet: subnetMap["de0cda14-6ac6-4439-bc94-da0a27938b7b"],
ip: net.ParseIP("2000::9"), // reserving the same IP address 2x should find the existing port
reserve: true,
nodeName: "node1",
},
// Release the first IPv6 port.
{
Expand Down

0 comments on commit f4aeb70

Please sign in to comment.