Skip to content

Commit

Permalink
Merge pull request #4582 from k8s-infra-cherrypick-robot/cherry-pick-…
Browse files Browse the repository at this point in the history
…4568-to-release-1.28

[release-1.28] fix: fix duplicate podCIDR allocation when node patch request fails
  • Loading branch information
k8s-ci-robot committed Sep 8, 2023
2 parents 76f1d69 + 4abfe40 commit b88f513
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 27 deletions.
80 changes: 56 additions & 24 deletions pkg/nodeipam/ipam/range_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (r *rangeAllocator) worker(stopChan <-chan struct{}) {
klog.Warning("Channel nodeCIDRUpdateChannel was unexpectedly closed")
return
}
if err := r.updateCIDRsAllocation(workItem); err != nil {
if workItem, err := r.updateCIDRsAllocation(workItem); err != nil {
// Requeue the failed node for update again.
r.nodeCIDRUpdateChannel <- workItem
}
Expand Down Expand Up @@ -257,23 +257,20 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
if len(node.Spec.PodCIDRs) > 0 {
return r.occupyCIDRs(node)
}
// allocate and queue the assignment

// allocate pod cidrs
allocatedCIDRs, err := r.allocatePodCIDRs()
if err != nil {
r.removeNodeFromProcessing(node.Name)
nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr for node %s: %w", node.Name, err)
}
allocated := nodeReservedCIDRs{
nodeName: node.Name,
allocatedCIDRs: make([]*net.IPNet, len(r.cidrSets)),
}

for idx := range r.cidrSets {
podCIDR, err := r.cidrSets[idx].AllocateNext()
if err != nil {
r.removeNodeFromProcessing(node.Name)
nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return fmt.Errorf("failed to allocate cidr from cluster cidr at idx:%v: %w", idx, err)
}
allocated.allocatedCIDRs[idx] = podCIDR
allocatedCIDRs: allocatedCIDRs,
}

//queue the assignment
// queue the assignment
klog.V(4).Infof("Putting node %s with CIDR %v into the work queue", node.Name, allocated.allocatedCIDRs)
r.nodeCIDRUpdateChannel <- allocated
return nil
Expand Down Expand Up @@ -327,21 +324,55 @@ func filterOutServiceRange(clusterCIDRs []*net.IPNet, cidrSets []*cidrset.CidrSe
}
}

func (r *rangeAllocator) allocatePodCIDRs() ([]*net.IPNet, error) {
allocatedCIDRs := make([]*net.IPNet, len(r.cidrSets))
for idx := range r.cidrSets {
podCIDR, err := r.cidrSets[idx].AllocateNext()
if err != nil {
for i := 0; i < idx; i++ {
if releaseErr := r.cidrSets[idx].Release(allocatedCIDRs[i]); releaseErr != nil {
// continue releasing the rest
klog.Errorf("Error releasing allocated CIDR at index %d for node: %v", i, releaseErr)
}
}
return nil, fmt.Errorf("failed to allocate cidr from cluster cidr at idx:%v: %w", idx, err)
}
allocatedCIDRs[idx] = podCIDR
}
return allocatedCIDRs, nil
}

// updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server.
func (r *rangeAllocator) updateCIDRsAllocation(data nodeReservedCIDRs) error {
var err error
func (r *rangeAllocator) updateCIDRsAllocation(data nodeReservedCIDRs) (dataToRetry nodeReservedCIDRs, err error) {
var node *v1.Node
defer r.removeNodeFromProcessing(data.nodeName)
cidrsString := cidrsAsString(data.allocatedCIDRs)

defer func() {
if err == nil {
// only remove node when node patch succeeds
r.removeNodeFromProcessing(data.nodeName)
}
}()

node, err = r.nodeLister.Get(data.nodeName)
if err != nil {
if apierrors.IsNotFound(err) {
klog.Warningf("Failed to get node %s: not found", data.nodeName)
return nil
return data, nil
}
klog.Errorf("Failed while getting node %v for updating Node.Spec.PodCIDRs: %v", data.nodeName, err)
return err
return data, err
}

// this happens when node patch fails, we release the CIDRs allocated and retry
if data.allocatedCIDRs == nil {
allocatedCIDRs, err := r.allocatePodCIDRs()
if err != nil {
nodeutil.RecordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
return data, fmt.Errorf("failed to allocate cidr for node %s: %w", data.nodeName, err)
}
data.allocatedCIDRs = allocatedCIDRs
}
cidrsString := cidrsAsString(data.allocatedCIDRs)

// if cidr list matches the proposed.
// then we possibly updated this node
Expand All @@ -356,7 +387,7 @@ func (r *rangeAllocator) updateCIDRsAllocation(data nodeReservedCIDRs) error {
}
if match {
klog.V(4).Infof("Node %v already has allocated CIDR %v. It matches the proposed one.", node.Name, data.allocatedCIDRs)
return nil
return data, nil
}
}

Expand All @@ -368,13 +399,13 @@ func (r *rangeAllocator) updateCIDRsAllocation(data nodeReservedCIDRs) error {
klog.Errorf("Error when releasing CIDR idx:%v value: %v err:%v", idx, cidr, releaseErr)
}
}
return nil
return data, nil
}

// If we reached here, it means that the node has no CIDR currently assigned. So we set it.
for i := 0; i < cidrUpdateRetries; i++ {
if err = utilnode.PatchNodeCIDRs(r.client, types.NodeName(node.Name), cidrsString); err == nil {
return nil
return data, nil
}
}
// failed release back to the pool
Expand All @@ -390,8 +421,9 @@ func (r *rangeAllocator) updateCIDRsAllocation(data nodeReservedCIDRs) error {
klog.Errorf("Error releasing allocated CIDR for node %v: %v", node.Name, releaseErr)
}
}
data.allocatedCIDRs = nil
}
return err
return data, err
}

// converts a slice of cidrs into <c-1>,<c-2>,<c-n>
Expand Down
88 changes: 88 additions & 0 deletions pkg/nodeipam/ipam/range_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,3 +854,91 @@ func TestReleaseCIDRSuccess(t *testing.T) {
testFunc(tc)
}
}

func TestRetryAllocatingPodCidr(t *testing.T) {
testCases := []testCase{
{
description: "It should release allocated CIDR and retry with new CIDR when node patch fails",
fakeNodeHandler: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
},
},
},
Clientset: fake.NewSimpleClientset(),
PatchErrorCount: 9, // need 9 because we want first 3 ranges to fail, and each range is retried for 3 times
},
allocatorParams: CIDRAllocatorParams{
ClusterCIDRs: func() []*net.IPNet {
_, clusterCIDR, _ := net.ParseCIDR("127.123.234.0/28")
return []*net.IPNet{clusterCIDR}
}(),
ServiceCIDR: nil,
SecondaryServiceCIDR: nil,
NodeCIDRMaskSizes: []int{30},
},
allocatedCIDRs: map[int][]string{
0: {"127.123.234.0/30", "127.123.234.4/30", "127.123.234.8/30"},
},
expectedAllocatedCIDR: map[int]string{
0: "127.123.234.12/30",
},
},
}

testFunc := func(tc testCase) {
// Initialize the range allocator.
allocator, err := NewCIDRRangeAllocator(tc.fakeNodeHandler, getFakeNodeInformer(tc.fakeNodeHandler), tc.allocatorParams, nil)
if err != nil {
t.Logf("%v: failed to create CIDRRangeAllocator with error %v", tc.description, err)
}
rangeAllocator, ok := allocator.(*rangeAllocator)
if !ok {
t.Logf("%v: found non-default implementation of CIDRAllocator, skipping white-box test...", tc.description)
return
}
rangeAllocator.nodesSynced = alwaysReady
rangeAllocator.recorder = testutil.NewFakeRecorder()
go allocator.Run(wait.NeverStop)

if err := allocator.AllocateOrOccupyCIDR(tc.fakeNodeHandler.Existing[0]); err != nil {
t.Errorf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err)
}

if err := waitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil {
t.Fatalf("%v: timeout while waiting for Node update: %v", tc.description, err)
}

for _, updatedNode := range tc.fakeNodeHandler.GetUpdatedNodesCopy() {
if len(updatedNode.Spec.PodCIDRs) == 0 {
continue // not assigned yet
}
//match
for podCIDRIdx, expectedPodCIDR := range tc.expectedAllocatedCIDR {
if updatedNode.Spec.PodCIDRs[podCIDRIdx] != expectedPodCIDR {
t.Errorf("%v: Unable to find allocated CIDR %v, found updated Nodes with CIDRs: %v", tc.description, expectedPodCIDR, updatedNode.Spec.PodCIDRs)
break
}
}
}

// Make sure previously tried CIDRs are all released
for idx, allocatedList := range tc.allocatedCIDRs {
for _, allocated := range allocatedList {
_, cidr, err := net.ParseCIDR(allocated)
if err != nil {
t.Fatalf("%v: unexpected error when parsing CIDR %v: %v", tc.description, allocated, err)
}
if err = rangeAllocator.cidrSets[idx].Occupy(cidr); err != nil {
t.Fatalf("%v: unexpected error when occupying CIDR %v: %v", tc.description, allocated, err)
}
}
}
}

for _, tc := range testCases {
testFunc(tc)
}
}
10 changes: 8 additions & 2 deletions pkg/util/controller/testutil/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ type FakeNodeHandler struct {
*fake.Clientset

// Input: Hooks determine if request is valid or not
CreateHook func(*FakeNodeHandler, *v1.Node) bool
Existing []*v1.Node
CreateHook func(*FakeNodeHandler, *v1.Node) bool
Existing []*v1.Node
PatchErrorCount int

// Output
CreatedNodes []*v1.Node
Expand Down Expand Up @@ -274,6 +275,11 @@ func (m *FakeNodeHandler) Patch(_ context.Context, name string, pt types.PatchTy
}
m.lock.Unlock()
}()

if m.RequestCount < m.PatchErrorCount {
return nil, fmt.Errorf("patch failed count is %d, which is smaller than %d", m.RequestCount, m.PatchErrorCount)
}

var nodeCopy v1.Node
for i := range m.Existing {
if m.Existing[i].Name == name {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func PatchNodeCIDRs(c clientset.Interface, node types.NodeName, cidrs []string)
if err != nil {
return fmt.Errorf("failed to json.Marshal CIDR: %w", err)
}
klog.V(4).Infof("cidrs patch bytes are:%s", string(patchBytes))
klog.V(4).Infof("cidrs patch bytes for node %s are:%s", string(node), string(patchBytes))
if _, err := c.CoreV1().Nodes().Patch(context.TODO(), string(node), types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
return fmt.Errorf("failed to patch node CIDR: %w", err)
}
Expand Down

0 comments on commit b88f513

Please sign in to comment.