Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ip block reservation #12

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pkg/cloud_provider/file/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,27 @@ func (manager *fakeServiceManager) GetInstance(ctx context.Context, obj *Service
},
}
}

func (manager *fakeServiceManager) ListInstances(ctx context.Context, obj *ServiceInstance) ([]*ServiceInstance, error) {
instances := []*ServiceInstance{
{
Project: "test-project",
Location: "test-location",
Name: "test",
Tier: "test_tier",
Network: Network{
ReservedIpRange: "192.168.92.32/29",
},
},
{
Project: "test-project",
Location: "test-location",
Name: "test",
Tier: "test_tier",
Network: Network{
ReservedIpRange: "192.168.92.40/29",
},
},
}
return instances, nil
}
20 changes: 20 additions & 0 deletions pkg/cloud_provider/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Service interface {
CreateInstance(ctx context.Context, obj *ServiceInstance) (*ServiceInstance, error)
DeleteInstance(ctx context.Context, obj *ServiceInstance) error
GetInstance(ctx context.Context, obj *ServiceInstance) (*ServiceInstance, error)
ListInstances(ctx context.Context, obj *ServiceInstance) ([]*ServiceInstance, error)
}

type gcfsServiceManager struct {
Expand Down Expand Up @@ -231,6 +232,25 @@ func (manager *gcfsServiceManager) DeleteInstance(ctx context.Context, obj *Serv
return nil
}

// ListInstances returns a list of active instances in a project at a specific location
func (manager *gcfsServiceManager) ListInstances(ctx context.Context, obj *ServiceInstance) ([]*ServiceInstance, error) {
// Calling cloud provider service to get list of active instances. - indicates we are looking for instances in all the locations for a project
instances, err := manager.instancesService.List(locationURI(obj.Project, "-")).Context(ctx).Do()
if err != nil {
return nil, err
}

var activeInstances []*ServiceInstance
for _, activeInstance := range instances.Instances {
serviceInstance, err := cloudInstanceToServiceInstance(activeInstance)
if err != nil {
return nil, err
}
activeInstances = append(activeInstances, serviceInstance)
}
return activeInstances, nil
}

func (manager *gcfsServiceManager) waitForOp(ctx context.Context, op *beta.Operation) error {
return wait.Poll(5*time.Second, 5*time.Minute, func() (bool, error) {
pollOp, err := manager.operationsService.Get(op.Name).Context(ctx).Do()
Expand Down
61 changes: 55 additions & 6 deletions pkg/csi_driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ const (

// CreateVolume parameters
const (
paramTier = "tier"
paramLocation = "location"
paramNetwork = "network"
paramTier = "tier"
paramLocation = "location"
paramNetwork = "network"
paramReservedIPV4CIDR = "reserved-ipv4-cidr"
)

// controllerServer handles volume provisioning
Expand All @@ -62,9 +63,11 @@ type controllerServerConfig struct {
driver *GCFSDriver
fileService file.Service
metaService metadata.Service
ipAllocator *util.IPAllocator
}

func newControllerServer(config *controllerServerConfig) csi.ControllerServer {
config.ipAllocator = util.NewIPAllocator(make(map[string]bool))
return &controllerServer{config: config}
}

Expand Down Expand Up @@ -101,6 +104,23 @@ func (s *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu
return nil, status.Error(codes.AlreadyExists, err.Error())
}
} else {
// If we are creating a new instance, we need pick an unused /29 range from reserved-ipv4-cidr
// If the param was not provided, we default reservedIPRange to "" and cloud provider takes care of the allocation
if reservedIPV4CIDR, ok := req.GetParameters()[paramReservedIPV4CIDR]; ok {
reservedIPRange, err := s.reserveIPRange(ctx, newFiler, reservedIPV4CIDR)

// Possible cases are 1) CreateInstanceAborted, 2)CreateInstance running in background
// The ListInstances response will contain the reservedIPRange if the operation was started
// In case of abort, the /29 IP is released and available for reservation
defer s.config.ipAllocator.ReleaseIPRange(reservedIPRange)
if err != nil {
return nil, err
}

// Adding the reserved IP range to the instance object
newFiler.Network.ReservedIpRange = reservedIPRange
}

// Create the instance
filer, err = s.config.fileService.CreateInstance(ctx, newFiler)
if err != nil {
Expand All @@ -110,6 +130,33 @@ func (s *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu
return &csi.CreateVolumeResponse{Volume: fileInstanceToCSIVolume(filer, modeInstance)}, nil
}

// reserveIPRange returns the available IP in the cidr
func (s *controllerServer) reserveIPRange(ctx context.Context, filer *file.ServiceInstance, cidr string) (string, error) {
cloudInstancesReservedIPRanges, err := s.getCloudInstancesReservedIPRanges(ctx, filer)
if err != nil {
return "", err
}
unreservedIPBlock, err := s.config.ipAllocator.GetUnreservedIPRange(cidr, cloudInstancesReservedIPRanges)
if err != nil {
return "", err
}
return unreservedIPBlock, nil
}

// getCloudInstancesReservedIPRanges gets the list of reservedIPRanges from cloud instances
func (s *controllerServer) getCloudInstancesReservedIPRanges(ctx context.Context, filer *file.ServiceInstance) (map[string]bool, error) {
instances, err := s.config.fileService.ListInstances(ctx, filer)
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
}
// Initialize an empty reserved list. It will be populated with all the reservedIPRanges obtained from the cloud instances
cloudInstancesReservedIPRanges := make(map[string]bool)
for _, instance := range instances {
cloudInstancesReservedIPRanges[instance.Network.ReservedIpRange] = true
}
return cloudInstancesReservedIPRanges, nil
}

// DeleteVolume deletes a GCFS instance
func (s *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
glog.V(4).Infof("DeleteVolume called with request %v", *req)
Expand Down Expand Up @@ -211,7 +258,6 @@ func (s *controllerServer) generateNewFileInstance(name string, capBytes int64,
tier := defaultTier
network := defaultNetwork
location := s.config.metaService.GetZone()

// Validate parameters (case-insensitive).
for k, v := range params {
switch strings.ToLower(k) {
Expand All @@ -222,7 +268,11 @@ func (s *controllerServer) generateNewFileInstance(name string, capBytes int64,
location = v
case paramNetwork:
network = v
// Unused

// Ignore the cidr flag as it is not passed to the cloud provider
// It will be used to get unreserved IP in the reserveIPV4Range function
case paramReservedIPV4CIDR:
continue
case "csiprovisionersecretname", "csiprovisionersecretnamespace":
default:
return nil, fmt.Errorf("invalid parameter %q", k)
Expand All @@ -235,7 +285,6 @@ func (s *controllerServer) generateNewFileInstance(name string, capBytes int64,
Tier: tier,
Network: file.Network{
Name: network,
// ReservedIpRange: "10.3.0.0/29", // TODO
},
Volume: file.Volume{
Name: newInstanceVolume,
Expand Down
13 changes: 7 additions & 6 deletions pkg/csi_driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import (
)

const (
testProject = "test-project"
testLocation = "test-location"
testIp = "test-ip"
testCSIVolume = "test-csi"
testVolumeId = "modeInstance/test-location/test-csi/vol1"
testBytes = 1 * util.Tb
testProject = "test-project"
testLocation = "test-location"
testIp = "test-ip"
testCSIVolume = "test-csi"
testVolumeId = "modeInstance/test-location/test-csi/vol1"
testReservedIPV4CIDR = "192.168.92.0/26"
testBytes = 1 * util.Tb
)

func initTestController(t *testing.T) csi.ControllerServer {
Expand Down
202 changes: 202 additions & 0 deletions pkg/util/ip_reservation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
Copyright 2018 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package util

import (
"fmt"
"math"
"net"
"sync"
)

const (
// Size of the network address of the IPRange we intend to reserve
ipRangeSize = 29
// Maximum value of a byte
byteMax = 255
// Total number of bits in an IPV4 address
ipV4Bits = 32
)

var (
// step size for IP range increment
incrementStep29IPRange = (byte)(math.Exp2(ipV4Bits - ipRangeSize))
// mask for IP range
ipRangeMask = net.CIDRMask(ipRangeSize, ipV4Bits)
)

// IPAllocator struct consists of shared resources that are used to keep track of the /29 IPRanges currently reserved by service instances
type IPAllocator struct {
// pendingIPRanges set maintains the set of IP ranges that have been reserved by the service instances but pending reservation in the cloud instances
// The key is a IP range currently reserved by a service instance e.g(192.168.92.0/29). Value is a bool to implement map as a set
pendingIPRanges map[string]bool

// pendingIPRangesMutex is used to synchronize access to the pendingIPRanges set to prevent data races
pendingIPRangesMutex sync.Mutex
}

// NewIPAllocator is the constructor to initialize the IPAllocator object
// Argument pendingIPRanges map[string]bool is a set of IP ranges currently reserved by service instances but pending reservation in the cloud instances
func NewIPAllocator(pendingIPRanges map[string]bool) *IPAllocator {
// Make a copy of the pending IP ranges and set it in the IPAllocator so that the caller cannot mutate this map outside the library
pendingIPRangesCopy := make(map[string]bool)
for pendingIPRange := range pendingIPRanges {
pendingIPRangesCopy[pendingIPRange] = true
}
return &IPAllocator{
pendingIPRanges: pendingIPRangesCopy,
}
}

// holdIPRange adds a particular IP range in the pendingIPRanges set
// Argument ipRange string is an IPV4 range which needs put in pendingIPRanges
func (ipAllocator *IPAllocator) holdIPRange(ipRange string) {
ipAllocator.pendingIPRanges[ipRange] = true
}

// ReleaseIPRange releases the pending IPRange
// Argument ipRange string is an IPV4 range which needs to be released
func (ipAllocator *IPAllocator) ReleaseIPRange(ipRange string) {
ipAllocator.pendingIPRangesMutex.Lock()
defer ipAllocator.pendingIPRangesMutex.Unlock()
delete(ipAllocator.pendingIPRanges, ipRange)
}

// GetUnreservedIPRange returns an unreserved /29 IP block.
// cidr: Provided cidr address in which we need to look for an unreserved /29 IP range
// cloudInstancesReservedIPRanges: All the used IP ranges in the cloud instances
// All the used IP ranges in the service instances not updated in cloud instances is extracted from the pendingIPRanges list in the IPAllocator
// Finally a final reservedIPRange list is created by merging these two lists
// Potential error cases:
// 1) No /29 IP range in the CIDR is unreserved
// 2) Parsing the CIDR resulted in an error
func (ipAllocator *IPAllocator) GetUnreservedIPRange(cidr string, cloudInstancesReservedIPRanges map[string]bool) (string, error) {
ip, ipnet, err := ipAllocator.parseCIDR(cidr)
if err != nil {
return "", err
}
var reservedIPRanges = make(map[string]bool)

// The final reserved list is obtained by combining the cloudInstancesReservedIPRanges list and the pendingIPRanges list in the ipAllocator
for cloudInstancesReservedIPRange := range cloudInstancesReservedIPRanges {
reservedIPRanges[cloudInstancesReservedIPRange] = true
}

// Lock is placed here so that the pendingIPRanges list captures all the IPs pending reservation in the cloud instances
ipAllocator.pendingIPRangesMutex.Lock()
defer ipAllocator.pendingIPRangesMutex.Unlock()
for reservedIPRange := range ipAllocator.pendingIPRanges {
reservedIPRanges[reservedIPRange] = true
}

for cidrIP := cloneIP(ip.Mask(ipnet.Mask)); ipnet.Contains(cidrIP) && err == nil; cidrIP, err = incrementIP(cidrIP, incrementStep29IPRange) {
overLap := false
for reservedIPRange := range reservedIPRanges {
_, reservedIPNet, err := net.ParseCIDR(reservedIPRange)
if err != nil {
return "", err
}
// Creating IPnet object using IP and mask
cidrIPNet := &net.IPNet{
IP: cidrIP,
Mask: ipRangeMask,
}

// Find if the current IP range in the CIDR overlaps with any of the reserved IP ranges. If not, this can be returned
overLap, err = isOverlap(cidrIPNet, reservedIPNet)

// Error while processing ipnet
if err != nil {
return "", err
}
if overLap {
break
}
}
if !overLap {
ipRange := fmt.Sprint(cidrIP.String(), "/", ipRangeSize)
ipAllocator.holdIPRange(ipRange)
return ipRange, nil
}
}

// No unreserved IP range available in the entire CIDR range since we did not return
return "", fmt.Errorf("all of the /29 IP ranges in the cidr %s are reserved", cidr)
}

// isOverlap checks if two ipnets have any overlapping IPs
func isOverlap(ipnet1 *net.IPNet, ipnet2 *net.IPNet) (bool, error) {
if ipnet1 == nil || ipnet2 == nil {
return true, fmt.Errorf("invalid ipnet object provided for cidr overlap check")
}
return ipnet1.Contains(ipnet2.IP) || ipnet2.Contains(ipnet1.IP), nil
}

// ParseCIDR function parses the CIDR and returns the ip and ipnet object if the cidr is valid
// For a CIDR to be valid it must satisfy the following properties
// 1) Network address bits must be less than 30
// 2) The IP in the CIDR must be 'aligned' i.e we must have 8 available IPs before byte overflow occurs
func (ipAllocator *IPAllocator) parseCIDR(cidr string) (net.IP, *net.IPNet, error) {
ip, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
return nil, nil, err
}
// The reserved-ipv4-cidr network size must be at least /29
cidrSize, _ := ipnet.Mask.Size()
if cidrSize > ipRangeSize {
return nil, nil, fmt.Errorf("the reserved-ipv4-cidr network size must be at least /%d", ipRangeSize)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated error message

}

// The IP specified in the reserved-ipv4-cidr must be aligned on the /29 network boundary
if ip.String() != ip.Mask(ipRangeMask).String() {
return nil, nil, fmt.Errorf("the IP specified in the reserved-ipv4-cidr must be aligned on the /29 network boundary")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated error message

}
return ip, ipnet, nil
}

// Increment the given IP value by the provided step. The step is a byte
func incrementIP(ip net.IP, step byte) (net.IP, error) {
incrementedIP := cloneIP(ip)
incrementedIP = incrementedIP.To4()

// Step can be added directly to the Least Significant Byte and we can return the result
if incrementedIP[3] <= byteMax-step {
incrementedIP[3] += step
return incrementedIP, nil
}

// Step addition in the Least Significant Byte resulted in overflow
// Propogating the carry addition to the higher order bytes and calculating value of the current byte
incrementedIP[3] = incrementedIP[3] - byteMax + step - 1

for ipByte := 2; ipByte >= 0; ipByte-- {
// Rollover occurs when value changes from maximum byte value to 0 as propagated carry is 1
if incrementedIP[ipByte] != byteMax {
incrementedIP[ipByte]++
return incrementedIP, nil
}
incrementedIP[ipByte] = 0
}
return nil, fmt.Errorf("ip range overflowed while incrementing IP %s by step %d", ip.String(), step)
}

// Clone the provided IP and return the copy
func cloneIP(ip net.IP) net.IP {
clone := make(net.IP, len(ip))
copy(clone, ip)
return clone
}
Loading