Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automated cherry pick of #29101 #29576

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
50 changes: 45 additions & 5 deletions pkg/controller/node/cidr_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ import (
"errors"
"fmt"
"net"
"sync"

"k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"

"github.com/golang/glog"
Expand Down Expand Up @@ -60,6 +63,9 @@ type rangeAllocator struct {
// This increases a throughput of CIDR assignment by not blocking on long operations.
nodeCIDRUpdateChannel chan nodeAndCIDR
recorder record.EventRecorder
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
sync.Mutex
nodesInProcessing sets.String
}

// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
Expand All @@ -77,6 +83,7 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
clusterCIDR: clusterCIDR,
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
recorder: recorder,
nodesInProcessing: sets.NewString(),
}

if serviceCIDR != nil {
Expand Down Expand Up @@ -122,7 +129,24 @@ func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, s
return ra, nil
}

func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
r.Lock()
defer r.Unlock()
if r.nodesInProcessing.Has(nodeName) {
return false
}
r.nodesInProcessing.Insert(nodeName)
return true
}

func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
r.Lock()
defer r.Unlock()
r.nodesInProcessing.Delete(nodeName)
}

func (r *rangeAllocator) occupyCIDR(node *api.Node) error {
defer r.removeNodeFromProcessing(node.Name)
if node.Spec.PodCIDR == "" {
return nil
}
Expand All @@ -138,12 +162,22 @@ func (r *rangeAllocator) occupyCIDR(node *api.Node) error {

// AllocateOrOccupyCIDR looks at the given node, assigns it a valid CIDR
// if it doesn't currently have one or mark the CIDR as used if the node already have one.
// WARNING: If you're adding any return calls or defer any more work from this function
// you have to handle correctly nodesInProcessing.
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *api.Node) error {
if node == nil {
return nil
}
if !r.insertNodeToProcessing(node.Name) {
glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
return nil
}
if node.Spec.PodCIDR != "" {
return r.occupyCIDR(node)
}
podCIDR, err := r.cidrs.allocateNext()
if err != nil {
r.removeNodeFromProcessing(node.Name)
recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr: %v", err)
}
Expand Down Expand Up @@ -173,8 +207,8 @@ func (r *rangeAllocator) ReleaseCIDR(node *api.Node) error {
return err
}

// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used, so that they won't be
// assignable.
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used,
// so that they won't be assignable.
func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
// Checks if service CIDR has a nonempty intersection with cluster CIDR. It is the case if either
// clusterCIDR contains serviceCIDR with clusterCIDR's Mask applied (this means that clusterCIDR contains serviceCIDR)
Expand All @@ -192,6 +226,7 @@ func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
var err error
var node *api.Node
defer r.removeNodeFromProcessing(data.nodeName)
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
// TODO: change it to using PATCH instead of full Node updates.
node, err = r.client.Core().Nodes().Get(data.nodeName)
Expand All @@ -209,9 +244,14 @@ func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
}
if err != nil {
recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
// We accept the fact that we may leek CIDRs here. This is safer than releasing
// them in case when we don't know if request went through.
// NodeController restart will return all falsely allocated CIDRs to the pool.
if !apierrors.IsServerTimeout(err) {
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
}
}
}
return err
Expand Down
44 changes: 26 additions & 18 deletions pkg/controller/node/cidr_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,13 @@ func (s *cidrSet) allocateNext() (*net.IPNet, error) {
}, nil
}

func (s *cidrSet) release(cidr *net.IPNet) error {
used, err := s.getIndexForCIDR(cidr)
if err != nil {
return err
}

s.Lock()
defer s.Unlock()
s.used.SetBit(&s.used, used, 0)

return nil
}

func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
begin, end := 0, s.maxCIDRs
func (s *cidrSet) getBeginingAndEndIndices(cidr *net.IPNet) (begin, end int, err error) {
begin, end = 0, s.maxCIDRs
cidrMask := cidr.Mask
maskSize, _ := cidrMask.Size()

if !s.clusterCIDR.Contains(cidr.IP.Mask(s.clusterCIDR.Mask)) && !cidr.Contains(s.clusterCIDR.IP.Mask(cidr.Mask)) {
return fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR)
return -1, -1, fmt.Errorf("cidr %v is out the range of cluster cidr %v", cidr, s.clusterCIDR)
}

if s.clusterMaskSize < maskSize {
Expand All @@ -107,7 +94,7 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
Mask: subNetMask,
})
if err != nil {
return err
return -1, -1, err
}

ip := make([]byte, 4)
Expand All @@ -118,9 +105,30 @@ func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
Mask: subNetMask,
})
if err != nil {
return err
return -1, -1, err
}
}
return begin, end, nil
}

func (s *cidrSet) release(cidr *net.IPNet) error {
begin, end, err := s.getBeginingAndEndIndices(cidr)
if err != nil {
return err
}
s.Lock()
defer s.Unlock()
for i := begin; i <= end; i++ {
s.used.SetBit(&s.used, i, 0)
}
return nil
}

func (s *cidrSet) occupy(cidr *net.IPNet) (err error) {
begin, end, err := s.getBeginingAndEndIndices(cidr)
if err != nil {
return err
}

s.Lock()
defer s.Unlock()
Expand Down
28 changes: 28 additions & 0 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,34 @@ func NewNodeController(
glog.Errorf("Error allocating CIDR: %v", err)
}
},
UpdateFunc: func(_, obj interface{}) {
node := obj.(*api.Node)
// If the PodCIDR is not empty we either:
// - already processed a Node that already had a CIDR after NC restarted
// (cidr is marked as used),
// - already processed a Node successfully and allocated a CIDR for it
// (cidr is marked as used),
// - already processed a Node but we did saw a "timeout" response and
// request eventually got through in this case we haven't released
// the allocated CIDR (cidr is still marked as used).
// There's a possible error here:
// - NC sees a new Node and assigns a CIDR X to it,
// - Update Node call fails with a timeout,
// - Node is updated by some other component, NC sees an update and
// assigns CIDR Y to the Node,
// - Both CIDR X and CIDR Y are marked as used in the local cache,
// even though Node sees only CIDR Y
// The problem here is that in in-memory cache we see CIDR X as marked,
// which prevents it from being assigned to any new node. The cluster
// state is correct.
// Restart of NC fixes the issue.
if node.Spec.PodCIDR == "" {
err := nc.cidrAllocator.AllocateOrOccupyCIDR(node)
if err != nil {
glog.Errorf("Error allocating CIDR: %v", err)
}
}
},
DeleteFunc: func(obj interface{}) {
node := obj.(*api.Node)
err := nc.cidrAllocator.ReleaseCIDR(node)
Expand Down