Skip to content

Commit

Permalink
IP Reservation functionality
Browse files Browse the repository at this point in the history
Addressing PR comments

Fixing build

Fixing nits

Fixing commenting issues
  • Loading branch information
krunaljain committed Aug 1, 2018
1 parent ba769f8 commit 8035839
Show file tree
Hide file tree
Showing 6 changed files with 343 additions and 132 deletions.
6 changes: 3 additions & 3 deletions pkg/cloud_provider/file/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ func (manager *fakeServiceManager) GetInstance(ctx context.Context, obj *Service
}
}

func (manager *fakeServiceManager) ListInstances(ctx context.Context, parent string) ([]*ServiceInstance, error) {
func (manager *fakeServiceManager) ListInstances(ctx context.Context, obj *ServiceInstance) ([]*ServiceInstance, error) {
instances := []*ServiceInstance{
&ServiceInstance{
{
Project: "test-project",
Location: "test-location",
Name: "test",
Expand All @@ -84,7 +84,7 @@ func (manager *fakeServiceManager) ListInstances(ctx context.Context, parent str
ReservedIpRange: "192.168.92.32/29",
},
},
&ServiceInstance{
{
Project: "test-project",
Location: "test-location",
Name: "test",
Expand Down
9 changes: 6 additions & 3 deletions pkg/cloud_provider/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Service interface {
CreateInstance(ctx context.Context, obj *ServiceInstance) (*ServiceInstance, error)
DeleteInstance(ctx context.Context, obj *ServiceInstance) error
GetInstance(ctx context.Context, obj *ServiceInstance) (*ServiceInstance, error)
ListInstances(ctx context.Context, parent string) ([]*ServiceInstance, error)
ListInstances(ctx context.Context, obj *ServiceInstance) ([]*ServiceInstance, error)
}

type gcfsServiceManager struct {
Expand Down Expand Up @@ -232,11 +232,14 @@ func (manager *gcfsServiceManager) DeleteInstance(ctx context.Context, obj *Serv
return nil
}

func (manager *gcfsServiceManager) ListInstances(ctx context.Context, parent string) ([]*ServiceInstance, error) {
instances, err := manager.instancesService.List(parent).Context(ctx).Do()
// ListInstances returns a list of active instances in a project at a specific location
func (manager *gcfsServiceManager) ListInstances(ctx context.Context, obj *ServiceInstance) ([]*ServiceInstance, error) {
// Calling cloud provider service to get list of active instances. - indicates we are looking for instances in all the locations for a project
instances, err := manager.instancesService.List(locationURI(obj.Project, "-")).Context(ctx).Do()
if err != nil {
return nil, err
}

var activeInstances []*ServiceInstance
for _, activeInstance := range instances.Instances {
serviceInstance, err := cloudInstanceToServiceInstance(activeInstance)
Expand Down
80 changes: 52 additions & 28 deletions pkg/csi_driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package driver
import (
"fmt"
"strings"
"sync"

csi "github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/golang/glog"
Expand Down Expand Up @@ -52,7 +51,7 @@ const (
paramTier = "tier"
paramLocation = "location"
paramNetwork = "network"
paramReservedIPV4CIDR = "cidr"
paramReservedIPV4CIDR = "reserved-ipv4-cidr"
)

// controllerServer handles volume provisioning
Expand All @@ -61,14 +60,14 @@ type controllerServer struct {
}

type controllerServerConfig struct {
driver *GCFSDriver
fileService file.Service
metaService metadata.Service
reservedIPRanges map[string]bool
mutex sync.Mutex
driver *GCFSDriver
fileService file.Service
metaService metadata.Service
ipAllocator *util.IPAllocator
}

func newControllerServer(config *controllerServerConfig) csi.ControllerServer {
config.ipAllocator = util.NewIPAllocator(make(map[string]bool))
return &controllerServer{config: config}
}

Expand All @@ -85,16 +84,7 @@ func (s *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu
if err := s.config.driver.validateVolumeCapabilities(req.GetVolumeCapabilities()); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
s.config.mutex.Lock()
instances, err := s.config.fileService.ListInstances(ctx, "-")
if err != nil {
return nil, status.Error(codes.Aborted, err.Error())
}

s.config.reservedIPRanges = make(map[string]bool)
for _, instance := range instances {
s.config.reservedIPRanges[instance.Network.ReservedIpRange] = true
}
capBytes := getRequestCapacity(req.GetCapacityRange())
glog.V(5).Infof("Using capacity bytes %q for volume %q", capBytes, name)

Expand All @@ -114,16 +104,56 @@ func (s *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu
return nil, status.Error(codes.AlreadyExists, err.Error())
}
} else {
// If we are creating a new instance, we need pick an unused /29 range from reserved-ipv4-cidr
// If the param was not provided, we default reservedIPRange to "" and cloud provider takes care of the allocation
if reservedIPV4CIDR, ok := req.GetParameters()[paramReservedIPV4CIDR]; ok {
validCIDR := s.config.ipAllocator.ValidateCIDR(reservedIPV4CIDR)
if !validCIDR {
return nil, fmt.Errorf("invalid reserved-ipv4-cidr %s", reservedIPV4CIDR)
}

reservedIPRange, err := s.reserveIPV4Range(ctx, newFiler, reservedIPV4CIDR)

// Possible cases are 1) CreateInstanceAborted, 2)CreateInstance running in background
// In either cases, the ListInstances responses would consist of the updated result
defer s.config.ipAllocator.ReleaseIPV4Range(reservedIPRange)
if err != nil {
return nil, err
}

// Adding the reserved IP range to the instance object
newFiler.Network.ReservedIpRange = reservedIPRange
}

// Create the instance
filer, err = s.config.fileService.CreateInstance(ctx, newFiler)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
s.config.mutex.Unlock()
return &csi.CreateVolumeResponse{Volume: fileInstanceToCSIVolume(filer, modeInstance)}, nil
}

// getUnreservedIPBlock returns the available IP in the cidr
func (s *controllerServer) reserveIPV4Range(ctx context.Context, filer *file.ServiceInstance, cidr string) (string, error) {
instances, err := s.config.fileService.ListInstances(ctx, filer)
if err != nil {
return "", status.Error(codes.Aborted, err.Error())
}

// Initialize an empty reserved list. It will be populated with all the reservedIPRanges obtained from the active instances
reservedIPRangesActiveInstances := make(map[string]bool)
for _, instance := range instances {
reservedIPRangesActiveInstances[instance.Network.ReservedIpRange] = true
}

unreservedIPBlock, err := s.config.ipAllocator.GetUnreservedIPBlock(cidr, reservedIPRangesActiveInstances)
if err != nil {
return "", err
}
return unreservedIPBlock, nil
}

// DeleteVolume deletes a GCFS instance
func (s *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
glog.V(4).Infof("DeleteVolume called with request %v", *req)
Expand Down Expand Up @@ -225,7 +255,6 @@ func (s *controllerServer) generateNewFileInstance(name string, capBytes int64,
tier := defaultTier
network := defaultNetwork
location := s.config.metaService.GetZone()
reservedIPV4CIDR := ""
// Validate parameters (case-insensitive).
for k, v := range params {
switch strings.ToLower(k) {
Expand All @@ -236,15 +265,11 @@ func (s *controllerServer) generateNewFileInstance(name string, capBytes int64,
location = v
case paramNetwork:
network = v
case paramReservedIPV4CIDR:
reservedIPV4CIDR, err := util.GetUnreservedIPBlock(s.config.reservedIPRanges, v)
if err != nil {
return nil, err
}
if reservedIPV4CIDR == "" {
return nil, fmt.Errorf("Invalid unreserved IP block received for cidr %s", v)
}

// Ignore the cidr flag as it is not passed to the cloud provider
// It will be used to get unreserved IP in the reserveIPV4Range function
case paramReservedIPV4CIDR:
continue
case "csiprovisionersecretname", "csiprovisionersecretnamespace":
default:
return nil, fmt.Errorf("invalid parameter %q", k)
Expand All @@ -256,8 +281,7 @@ func (s *controllerServer) generateNewFileInstance(name string, capBytes int64,
Location: location,
Tier: tier,
Network: file.Network{
Name: network,
ReservedIpRange: reservedIPV4CIDR,
Name: network,
},
Volume: file.Volume{
Name: newInstanceVolume,
Expand Down
13 changes: 7 additions & 6 deletions pkg/csi_driver/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ import (
)

const (
testProject = "test-project"
testLocation = "test-location"
testIp = "test-ip"
testCSIVolume = "test-csi"
testVolumeId = "modeInstance/test-location/test-csi/vol1"
testBytes = 1 * util.Tb
testProject = "test-project"
testLocation = "test-location"
testIp = "test-ip"
testCSIVolume = "test-csi"
testVolumeId = "modeInstance/test-location/test-csi/vol1"
testReservedIPV4CIDR = "192.168.92.0/26"
testBytes = 1 * util.Tb
)

func initTestController(t *testing.T) csi.ControllerServer {
Expand Down
142 changes: 110 additions & 32 deletions pkg/util/ip_reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,71 +17,149 @@ limitations under the License.
package util

import (
"bytes"
"fmt"
"net"
"sync"
)

// GetUnreservedIPBlock returns an unreserved /29 IP block. It accepts the list of currently reserved
// IPs and the requested CIDR as arguments and returns the /29 IP available in that CIDR
const (
incrementStep29IPBlock = 8
reservationBlockSize = "/29"
byteMax = 255
)

// IPAllocator struct consists of shared resources that are used to keep track of the /29 IPRanges currently reserved by service instances
type IPAllocator struct {
// hold set maintains the set of IP ranges that have been reserved by the service instances but pending reservation in the cloud instances
// The key is a IP range currently reserved by a service instance e.g(192.168.92.0/29). Value is a bool to implement map as a set
hold map[string]bool

// holdMutex is used to synchronize access to the hold set to prevent data races
holdMutex sync.Mutex
}

// NewIPAllocator is the constructor to initialize the IPAllocator object
// Argument hold map[string]bool is a set of IP ranges currently reserved by service instances but pending reservation in the cloud instances
func NewIPAllocator(hold map[string]bool) *IPAllocator {
return &IPAllocator{
hold: hold,
}
}

// holdIPV4Range adds a particular IP range in the hold set
// Argument ipBlock string is an IPV4 range which needs put on hold
func (ipAllocator *IPAllocator) holdIPV4Range(ipV4Range string) {
ipAllocator.hold[ipV4Range] = true
}

// ReleaseIPV4Range releases the hold on a particular IP range
// Argument ipBlock string is an IPV4 range which needs to be released
func (ipAllocator *IPAllocator) ReleaseIPV4Range(ipV4Range string) {
ipAllocator.holdMutex.Lock()
defer ipAllocator.holdMutex.Unlock()
delete(ipAllocator.hold, ipV4Range)
}

// ValidateCIDR function validates whether a particular cidr is a valid IP range
// Argument cidr string is is a CIDR range that needs to be validated
func (ipAllocator *IPAllocator) ValidateCIDR(cidr string) bool {
_, _, err := net.ParseCIDR(cidr)
return err == nil
}

// GetUnreservedIPBlock returns an unreserved /29 IP block.
// cidr: Provided cidr address in which we need to look for an unreserved /29 IP Block
// activeInstancesReservedIPRanges: All the reserved IP ranges present in the cloud instances
// All the reserved IP ranges in the service instances not updated in cloud instances extracted from the hold list in the IPAllocator
// Finally a final reserved list is created by merging these two lists
// Potential error cases: 1) No /29 Block in the CIDR is unreserved
// Parsing the CIDR resulted in an error
func GetUnreservedIPBlock(reservedIPRanges map[string]bool, cidr string) (string, error) {
// 2) Parsing the CIDR resulted in an error
func (ipAllocator *IPAllocator) GetUnreservedIPBlock(cidr string, activeInstancesReservedIPRanges map[string]bool) (string, error) {

ip, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
return "", fmt.Errorf("Unable to parse CIDR %s", cidr)
}
buffer := bytes.NewBufferString("")
for cidrIPBlock := cloneIP(ip.Mask(ipnet.Mask)); ipnet.Contains(cidrIPBlock) && err == nil; err = incrementIP(cidrIPBlock, 8) {

var reserved = make(map[string]bool)

// The final reserved list is obtained by combining the activeInstancesReservedIPRanges list and the hold list in the ipAllocator
for activeInstanceReservedIPRange := range activeInstancesReservedIPRanges {
reserved[activeInstanceReservedIPRange] = true
}

// Lock is placed here so that the hold list captures all the IPs on hold while polling
ipAllocator.holdMutex.Lock()
defer ipAllocator.holdMutex.Unlock()
for reservedIPRange := range ipAllocator.hold {
reserved[reservedIPRange] = true
}

for cidrIPBlock := cloneIP(ip.Mask(ipnet.Mask)); ipnet.Contains(cidrIPBlock) && err == nil; cidrIPBlock, err = incrementIP(cidrIPBlock, incrementStep29IPBlock) {
overLap := false
buffer.WriteString(cidrIPBlock.String())
buffer.WriteString("/29")
for reservedIPRange := range reservedIPRanges {
err = validateCIDROverlap(buffer.String(), reservedIPRange)
ipRange := fmt.Sprint(cidrIPBlock.String(), reservationBlockSize)
for reservedIPRange := range reserved {
// Find if the current IP range in the CIDR overlaps with any of the reserved IP ranges. If not, this can be returned
overLap, err = isOverlap(ipRange, reservedIPRange)
// Error while processing cidr
if err != nil {
overLap = true
return "", fmt.Errorf("Error while parsing cidr to determine overlap between %s and %s", cidrIPBlock.String(), reservedIPRange)
}
if overLap {
break
}
}
if !overLap {
return buffer.String(), nil
ipAllocator.holdIPV4Range(ipRange)
return ipRange, nil
}
buffer.Reset()
}

// No unreserved IP range available in the entire CIDR range since we did not return
return "", fmt.Errorf("All of the /29 IP ranges in the cidr %s are reserved", cidr)
}

// validateCIDROverlap checks if two cidrs have any overlapping IPs
func validateCIDROverlap(cidr1 string, cidr2 string) error {
// isOverlap checks if two cidrs have any overlapping IPs
func isOverlap(cidr1 string, cidr2 string) (bool, error) {
_, ipnet1, err := net.ParseCIDR(cidr1)
// Invalid CIDRs are considered as overlapping
if err != nil {
return err
return true, fmt.Errorf("Invalid cidr %s provided", cidr1)
}

_, ipnet2, err := net.ParseCIDR(cidr2)
if err != nil {
return err
}

if ipnet1.Contains(ipnet2.IP) || ipnet2.Contains(ipnet1.IP) {
return fmt.Errorf("The cidr ranges %s and %s overlap", cidr1, cidr2)
return true, fmt.Errorf("Invalid cidr %s provided", cidr2)
}

return nil
return ipnet1.Contains(ipnet2.IP) || ipnet2.Contains(ipnet1.IP), nil
}

// Increment the given IP value by the provided step
func incrementIP(ip net.IP, step byte) error {
if uint8(255)-uint8(ip[len(ip)-1]) < uint8(step) {
return fmt.Errorf("IP overflow occured when incrementing ip %s with step %d", ip.String(), step)
// Increment the given IP value by the provided step. The step is a byte with maximum value maximum byte value
func incrementIP(ip net.IP, step byte) (net.IP, error) {
incrementedIP := cloneIP(ip)
incrementedIP = incrementedIP.To4()

// Step can be added directly to the Least Significant Byte and we can return the result
if incrementedIP[3] < byteMax-step {
incrementedIP[3] += step
return incrementedIP, nil
}
for j := len(ip) - 1; j >= 0; j-- {
ip[j] += step
if ip[j] > 0 {
break

// Step addition in the Least Significant Byte resulted in overflow
// Propogating the carry addition to the higher order bytes and calculating value of the current byte
incrementedIP[3] = incrementedIP[3] - byteMax + step - 1

for ipByte := 2; ipByte >= 0; ipByte-- {
// Rollover occurs when value changes from maximum byte value to 0 as maximum propagated carry is 1
if incrementedIP[ipByte] != byteMax {
incrementedIP[ipByte]++
return incrementedIP, nil
}
incrementedIP[ipByte] = 0
}
return nil
return nil, fmt.Errorf("IP range overflowed while incrementing IP %s by step %d", ip.String(), step)

}

// Clone the provided IP and return the copy
Expand Down
Loading

0 comments on commit 8035839

Please sign in to comment.