diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index d8d15144d85a6..1b098a20c7544 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -871,7 +871,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err) } - if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil { + if err := RunKubelet(ctx, s, kubeDeps, s.RunOnce); err != nil { return err } @@ -1202,7 +1202,7 @@ func setContentTypeForClient(cfg *restclient.Config, contentType string) { // 3 Standalone 'kubernetes' binary // // Eventually, #2 will be replaced with instances of #3 -func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error { +func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error { hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride) if err != nil { return err @@ -1214,12 +1214,15 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie } hostnameOverridden := len(kubeServer.HostnameOverride) > 0 // Setup event recorder if required. - makeEventRecorder(context.TODO(), kubeDeps, nodeName) + makeEventRecorder(ctx, kubeDeps, nodeName) - nodeIPs, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider) + nodeIPs, invalidNodeIps, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider) if err != nil { return fmt.Errorf("bad --node-ip %q: %v", kubeServer.NodeIP, err) } + if len(invalidNodeIps) != 0 { + klog.FromContext(ctx).Info("Could not parse some node IP(s), ignoring them", "IPs", invalidNodeIps) + } capabilities.Initialize(capabilities.Capabilities{ AllowPrivileged: true, diff --git a/cmd/kubemark/app/hollow_node.go b/cmd/kubemark/app/hollow_node.go index 62de5839bc67a..26cf4c4ca9e87 100644 --- a/cmd/kubemark/app/hollow_node.go +++ b/cmd/kubemark/app/hollow_node.go @@ -157,7 +157,7 @@ func NewHollowNodeCommand() *cobra.Command { RunE: func(cmd *cobra.Command, args []string) error { verflag.PrintAndExitIfRequested() cliflag.PrintFlags(cmd.Flags()) - return run(s) + return run(cmd.Context(), s) }, Args: func(cmd *cobra.Command, args []string) error { for _, arg := range args { @@ -176,7 +176,7 @@ func NewHollowNodeCommand() *cobra.Command { return cmd } -func run(config *hollowNodeConfig) error { +func run(ctx context.Context, config *hollowNodeConfig) error { // To help debugging, immediately log version and print flags. klog.Infof("Version: %+v", version.Get()) @@ -264,7 +264,7 @@ func run(config *hollowNodeConfig) error { runtimeService, containerManager, ) - hollowKubelet.Run() + hollowKubelet.Run(ctx) } if config.Morph == "proxy" { diff --git a/cmd/kubemark/app/hollow_node_test.go b/cmd/kubemark/app/hollow_node_test.go index ba6c38b683330..c399433c4abbd 100644 --- a/cmd/kubemark/app/hollow_node_test.go +++ b/cmd/kubemark/app/hollow_node_test.go @@ -24,6 +24,8 @@ import ( "path/filepath" "testing" "time" + + "k8s.io/kubernetes/test/utils/ktesting" ) const fakeKubeconfig = ` @@ -80,7 +82,8 @@ func TestHollowNode(t *testing.T) { go func() { data, err := os.ReadFile(kubeconfigPath) t.Logf("read %d, err=%v\n", len(data), err) - errCh <- run(s) + ctx := ktesting.Init(t) + errCh <- run(ctx, s) }() select { diff --git a/hack/golangci-hints.yaml b/hack/golangci-hints.yaml index 3b30ada8e029f..792c9b6024504 100644 --- a/hack/golangci-hints.yaml +++ b/hack/golangci-hints.yaml @@ -131,6 +131,7 @@ linters-settings: # please keep this alphabetized contextual k8s.io/client-go/metadata/.* contextual k8s.io/client-go/tools/events/.* contextual k8s.io/client-go/tools/record/.* + contextual k8s.io/component-helpers/.* contextual k8s.io/dynamic-resource-allocation/.* contextual k8s.io/kubernetes/cmd/kube-proxy/.* contextual k8s.io/kubernetes/cmd/kube-scheduler/.* diff --git a/hack/golangci-strict.yaml b/hack/golangci-strict.yaml index 7d9615a1473ff..b72ea31259359 100644 --- a/hack/golangci-strict.yaml +++ b/hack/golangci-strict.yaml @@ -177,6 +177,7 @@ linters-settings: # please keep this alphabetized contextual k8s.io/client-go/metadata/.* contextual k8s.io/client-go/tools/events/.* contextual k8s.io/client-go/tools/record/.* + contextual k8s.io/component-helpers/.* contextual k8s.io/dynamic-resource-allocation/.* contextual k8s.io/kubernetes/cmd/kube-proxy/.* contextual k8s.io/kubernetes/cmd/kube-scheduler/.* diff --git a/hack/golangci.yaml b/hack/golangci.yaml index c099e20a6c86d..5f96b17f2e4e6 100644 --- a/hack/golangci.yaml +++ b/hack/golangci.yaml @@ -180,6 +180,7 @@ linters-settings: # please keep this alphabetized contextual k8s.io/client-go/metadata/.* contextual k8s.io/client-go/tools/events/.* contextual k8s.io/client-go/tools/record/.* + contextual k8s.io/component-helpers/.* contextual k8s.io/dynamic-resource-allocation/.* contextual k8s.io/kubernetes/cmd/kube-proxy/.* contextual k8s.io/kubernetes/cmd/kube-scheduler/.* diff --git a/hack/logcheck.conf b/hack/logcheck.conf index bee1fdc1026f7..d8e835dea0e9a 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -28,6 +28,7 @@ contextual k8s.io/apimachinery/pkg/util/runtime/.* contextual k8s.io/client-go/metadata/.* contextual k8s.io/client-go/tools/events/.* contextual k8s.io/client-go/tools/record/.* +contextual k8s.io/component-helpers/.* contextual k8s.io/dynamic-resource-allocation/.* contextual k8s.io/kubernetes/cmd/kube-proxy/.* contextual k8s.io/kubernetes/cmd/kube-scheduler/.* diff --git a/pkg/controller/nodeipam/ipam/cidr_allocator.go b/pkg/controller/nodeipam/ipam/cidr_allocator.go index 0a69f0ded2811..2143c19b201eb 100644 --- a/pkg/controller/nodeipam/ipam/cidr_allocator.go +++ b/pkg/controller/nodeipam/ipam/cidr_allocator.go @@ -86,7 +86,7 @@ type CIDRAllocator interface { // AllocateOrOccupyCIDR looks at the given node, assigns it a valid // CIDR if it doesn't currently have one or mark the CIDR as used if // the node already have one. - AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error + AllocateOrOccupyCIDR(ctx context.Context, node *v1.Node) error // ReleaseCIDR releases the CIDR of the removed node. ReleaseCIDR(logger klog.Logger, node *v1.Node) error // Run starts all the working logic of the allocator. diff --git a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go index c9be736d4d963..687cfe2afe2d9 100644 --- a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go +++ b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go @@ -117,18 +117,18 @@ func NewCloudCIDRAllocator(ctx context.Context, client clientset.Interface, clou nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controllerutil.CreateAddNodeHandler( func(node *v1.Node) error { - return ca.AllocateOrOccupyCIDR(logger, node) + return ca.AllocateOrOccupyCIDR(ctx, node) }), UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error { if newNode.Spec.PodCIDR == "" { - return ca.AllocateOrOccupyCIDR(logger, newNode) + return ca.AllocateOrOccupyCIDR(ctx, newNode) } // Even if PodCIDR is assigned, but NetworkUnavailable condition is // set to true, we need to process the node to set the condition. networkUnavailableTaint := &v1.Taint{Key: v1.TaintNodeNetworkUnavailable, Effect: v1.TaintEffectNoSchedule} _, cond := controllerutil.GetNodeCondition(&newNode.Status, v1.NodeNetworkUnavailable) if cond == nil || cond.Status != v1.ConditionFalse || utiltaints.TaintExists(newNode.Spec.Taints, networkUnavailableTaint) { - return ca.AllocateOrOccupyCIDR(logger, newNode) + return ca.AllocateOrOccupyCIDR(ctx, newNode) } return nil }), @@ -173,7 +173,7 @@ func (ca *cloudCIDRAllocator) worker(ctx context.Context) { logger.Info("Channel nodeCIDRUpdateChannel was unexpectedly closed") return } - if err := ca.updateCIDRAllocation(logger, workItem); err == nil { + if err := ca.updateCIDRAllocation(ctx, workItem); err == nil { logger.V(3).Info("Updated CIDR", "workItem", workItem) } else { logger.Error(err, "Error updating CIDR", "workItem", workItem) @@ -243,10 +243,12 @@ func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) { // WARNING: If you're adding any return calls or defer any more work from this // function you have to make sure to update nodesInProcessing properly with the // disposition of the node when the work is done. -func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error { +func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(ctx context.Context, node *v1.Node) error { if node == nil { return nil } + + logger := klog.FromContext(ctx) if !ca.insertNodeToProcessing(node.Name) { logger.V(2).Info("Node is already in a process of CIDR assignment", "node", klog.KObj(node)) return nil @@ -258,7 +260,8 @@ func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1. } // updateCIDRAllocation assigns CIDR to Node and sends an update to the API server. -func (ca *cloudCIDRAllocator) updateCIDRAllocation(logger klog.Logger, nodeName string) error { +func (ca *cloudCIDRAllocator) updateCIDRAllocation(ctx context.Context, nodeName string) error { + logger := klog.FromContext(ctx) node, err := ca.nodeLister.Get(nodeName) if err != nil { if errors.IsNotFound(err) { @@ -305,7 +308,7 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(logger klog.Logger, nodeName // See https://github.com/kubernetes/kubernetes/pull/42147#discussion_r103357248 } for i := 0; i < cidrUpdateRetries; i++ { - if err = nodeutil.PatchNodeCIDRs(ca.client, types.NodeName(node.Name), cidrStrings); err == nil { + if err = nodeutil.PatchNodeCIDRs(ctx, ca.client, types.NodeName(node.Name), cidrStrings); err == nil { logger.Info("Set the node PodCIDRs", "node", klog.KObj(node), "cidrStrings", cidrStrings) break } diff --git a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go index 35a1b30a55930..566aa8e153043 100644 --- a/pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go +++ b/pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go @@ -51,14 +51,16 @@ func TestBoundedRetries(t *testing.T) { nodesSynced: sharedInfomer.Core().V1().Nodes().Informer().HasSynced, nodesInProcessing: map[string]*nodeProcessingInfo{}, } - logger, ctx := ktesting.NewTestContext(t) + _, ctx := ktesting.NewTestContext(t) go ca.worker(ctx) nodeName := "testNode" - ca.AllocateOrOccupyCIDR(logger, &v1.Node{ + if err := ca.AllocateOrOccupyCIDR(ctx, &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: nodeName, }, - }) + }); err != nil { + t.Errorf("unexpected error in AllocateOrOccupyCIDR: %v", err) + } for hasNodeInProcessing(ca, nodeName) { // wait for node to finish processing (should terminate and not time out) } diff --git a/pkg/controller/nodeipam/ipam/range_allocator.go b/pkg/controller/nodeipam/ipam/range_allocator.go index f3a8b36db2cf3..a79982a6db2d5 100644 --- a/pkg/controller/nodeipam/ipam/range_allocator.go +++ b/pkg/controller/nodeipam/ipam/range_allocator.go @@ -61,6 +61,8 @@ type rangeAllocator struct { queue workqueue.RateLimitingInterface } +var _ CIDRAllocator = &rangeAllocator{} + // NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDRs for node (one from each of clusterCIDRs) // Caller must ensure subNetMaskSize is not less than cluster CIDR mask size. // Caller must always pass in a list of existing nodes so the new allocator. @@ -228,7 +230,7 @@ func (r *rangeAllocator) processNextNodeWorkItem(ctx context.Context) bool { } // Run the syncHandler, passing it the namespace/name string of the // Foo resource to be synced. - if err := r.syncNode(logger, key); err != nil { + if err := r.syncNode(ctx, key); err != nil { // Put the item back on the queue to handle any transient errors. r.queue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) @@ -248,7 +250,8 @@ func (r *rangeAllocator) processNextNodeWorkItem(ctx context.Context) bool { return true } -func (r *rangeAllocator) syncNode(logger klog.Logger, key string) error { +func (r *rangeAllocator) syncNode(ctx context.Context, key string) error { + logger := klog.FromContext(ctx) startTime := time.Now() defer func() { logger.V(4).Info("Finished syncing Node request", "node", key, "elapsed", time.Since(startTime)) @@ -269,7 +272,7 @@ func (r *rangeAllocator) syncNode(logger klog.Logger, key string) error { logger.V(3).Info("node is being deleted", "node", key) return r.ReleaseCIDR(logger, node) } - return r.AllocateOrOccupyCIDR(logger, node) + return r.AllocateOrOccupyCIDR(ctx, node) } // marks node.PodCIDRs[...] as used in allocator's tracked cidrSet @@ -299,7 +302,7 @@ func (r *rangeAllocator) occupyCIDRs(node *v1.Node) error { // WARNING: If you're adding any return calls or defer any more work from this // function you have to make sure to update nodesInProcessing properly with the // disposition of the node when the work is done. -func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error { +func (r *rangeAllocator) AllocateOrOccupyCIDR(ctx context.Context, node *v1.Node) error { if node == nil { return nil } @@ -308,6 +311,7 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) return r.occupyCIDRs(node) } + logger := klog.FromContext(ctx) allocatedCIDRs := make([]*net.IPNet, len(r.cidrSets)) for idx := range r.cidrSets { @@ -321,7 +325,7 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) //queue the assignment logger.V(4).Info("Putting node with CIDR into the work queue", "node", klog.KObj(node), "CIDRs", allocatedCIDRs) - return r.updateCIDRsAllocation(logger, node.Name, allocatedCIDRs) + return r.updateCIDRsAllocation(ctx, node.Name, allocatedCIDRs) } // ReleaseCIDR marks node.podCIDRs[...] as unused in our tracked cidrSets @@ -373,9 +377,10 @@ func (r *rangeAllocator) filterOutServiceRange(logger klog.Logger, serviceCIDR * } // updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server. -func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, nodeName string, allocatedCIDRs []*net.IPNet) error { +func (r *rangeAllocator) updateCIDRsAllocation(ctx context.Context, nodeName string, allocatedCIDRs []*net.IPNet) error { var err error var node *v1.Node + logger := klog.FromContext(ctx) cidrsString := ipnetToStringList(allocatedCIDRs) node, err = r.nodeLister.Get(nodeName) if err != nil { @@ -413,7 +418,7 @@ func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, nodeName stri // If we reached here, it means that the node has no CIDR currently assigned. So we set it. for i := 0; i < cidrUpdateRetries; i++ { - if err = nodeutil.PatchNodeCIDRs(r.client, types.NodeName(node.Name), cidrsString); err == nil { + if err = nodeutil.PatchNodeCIDRs(ctx, r.client, types.NodeName(node.Name), cidrsString); err == nil { logger.Info("Set node PodCIDR", "node", klog.KObj(node), "podCIDRs", cidrsString) return nil } diff --git a/pkg/controller/nodeipam/ipam/range_allocator_test.go b/pkg/controller/nodeipam/ipam/range_allocator_test.go index a5d594838759d..109da26ea2c6c 100644 --- a/pkg/controller/nodeipam/ipam/range_allocator_test.go +++ b/pkg/controller/nodeipam/ipam/range_allocator_test.go @@ -509,7 +509,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) { } // test function - logger, tCtx := ktesting.NewTestContext(t) + _, tCtx := ktesting.NewTestContext(t) testFunc := func(tc testCase) { fakeNodeInformer := test.FakeNodeInformer(tc.fakeNodeHandler) nodeList, _ := tc.fakeNodeHandler.List(tCtx, metav1.ListOptions{}) @@ -547,7 +547,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) { if node.Spec.PodCIDRs == nil { updateCount++ } - if err := allocator.AllocateOrOccupyCIDR(logger, node); err != nil { + if err := allocator.AllocateOrOccupyCIDR(tCtx, node); err != nil { t.Errorf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) } } @@ -610,7 +610,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) { }, }, } - logger, tCtx := ktesting.NewTestContext(t) + _, tCtx := ktesting.NewTestContext(t) testFunc := func(tc testCase) { // Initialize the range allocator. allocator, err := NewCIDRRangeAllocator(tCtx, tc.fakeNodeHandler, test.FakeNodeInformer(tc.fakeNodeHandler), tc.allocatorParams, nil) @@ -639,7 +639,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) { } } } - if err := allocator.AllocateOrOccupyCIDR(logger, tc.fakeNodeHandler.Existing[0]); err == nil { + if err := allocator.AllocateOrOccupyCIDR(tCtx, tc.fakeNodeHandler.Existing[0]); err == nil { t.Errorf("%v: unexpected success in AllocateOrOccupyCIDR: %v", tc.description, err) } // We don't expect any updates, so just sleep for some time @@ -782,7 +782,7 @@ func TestReleaseCIDRSuccess(t *testing.T) { } } - err := allocator.AllocateOrOccupyCIDR(logger, tc.fakeNodeHandler.Existing[0]) + err := allocator.AllocateOrOccupyCIDR(tCtx, tc.fakeNodeHandler.Existing[0]) if len(tc.expectedAllocatedCIDRFirstRound) != 0 { if err != nil { t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) @@ -812,7 +812,7 @@ func TestReleaseCIDRSuccess(t *testing.T) { t.Fatalf("%v: unexpected error in ReleaseCIDR: %v", tc.description, err) } } - if err = allocator.AllocateOrOccupyCIDR(logger, tc.fakeNodeHandler.Existing[0]); err != nil { + if err = allocator.AllocateOrOccupyCIDR(tCtx, tc.fakeNodeHandler.Existing[0]); err != nil { t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err) } if err := test.WaitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil { diff --git a/pkg/kubemark/hollow_kubelet.go b/pkg/kubemark/hollow_kubelet.go index 91e47a01bb517..14c551c79f4a1 100644 --- a/pkg/kubemark/hollow_kubelet.go +++ b/pkg/kubemark/hollow_kubelet.go @@ -17,6 +17,7 @@ limitations under the License. package kubemark import ( + "context" "fmt" "time" @@ -126,8 +127,8 @@ func NewHollowKubelet( } // Starts this HollowKubelet and blocks. -func (hk *HollowKubelet) Run() { - if err := kubeletapp.RunKubelet(&options.KubeletServer{ +func (hk *HollowKubelet) Run(ctx context.Context) { + if err := kubeletapp.RunKubelet(ctx, &options.KubeletServer{ KubeletFlags: *hk.KubeletFlags, KubeletConfiguration: *hk.KubeletConfiguration, }, hk.KubeletDeps, false); err != nil { diff --git a/staging/src/k8s.io/component-helpers/apimachinery/lease/controller_test.go b/staging/src/k8s.io/component-helpers/apimachinery/lease/controller_test.go index 2729ba8edfa28..fc68cf146588d 100644 --- a/staging/src/k8s.io/component-helpers/apimachinery/lease/controller_test.go +++ b/staging/src/k8s.io/component-helpers/apimachinery/lease/controller_test.go @@ -198,7 +198,8 @@ func TestNewNodeLease(t *testing.T) { for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { - tc.controller.newLeasePostProcessFunc = setNodeOwnerFunc(tc.controller.client, node.Name) + logger, _ := ktesting.NewTestContext(t) + tc.controller.newLeasePostProcessFunc = setNodeOwnerFunc(logger, tc.controller.client, node.Name) tc.controller.leaseNamespace = corev1.NamespaceNodeLease newLease, _ := tc.controller.newLease(tc.base) if newLease == tc.base { @@ -286,7 +287,7 @@ func TestRetryUpdateNodeLease(t *testing.T) { } for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { - _, ctx := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) cl := tc.client if tc.updateReactor != nil { cl.PrependReactor("update", "leases", tc.updateReactor) @@ -302,7 +303,7 @@ func TestRetryUpdateNodeLease(t *testing.T) { leaseNamespace: corev1.NamespaceNodeLease, leaseDurationSeconds: 10, onRepeatedHeartbeatFailure: tc.onRepeatedHeartbeatFailure, - newLeasePostProcessFunc: setNodeOwnerFunc(cl, node.Name), + newLeasePostProcessFunc: setNodeOwnerFunc(logger, cl, node.Name), } if err := c.retryUpdateLease(ctx, nil); tc.expectErr != (err != nil) { t.Fatalf("got %v, expected %v", err != nil, tc.expectErr) @@ -422,7 +423,7 @@ func TestUpdateUsingLatestLease(t *testing.T) { } for _, tc := range cases { t.Run(tc.desc, func(t *testing.T) { - _, ctx := ktesting.NewTestContext(t) + logger, ctx := ktesting.NewTestContext(t) cl := fake.NewSimpleClientset(tc.existingObjs...) if tc.updateReactor != nil { cl.PrependReactor("update", "leases", tc.updateReactor) @@ -441,7 +442,7 @@ func TestUpdateUsingLatestLease(t *testing.T) { leaseNamespace: corev1.NamespaceNodeLease, leaseDurationSeconds: 10, latestLease: tc.latestLease, - newLeasePostProcessFunc: setNodeOwnerFunc(cl, node.Name), + newLeasePostProcessFunc: setNodeOwnerFunc(logger, cl, node.Name), } c.sync(ctx) @@ -461,7 +462,7 @@ func TestUpdateUsingLatestLease(t *testing.T) { // setNodeOwnerFunc helps construct a newLeasePostProcessFunc which sets // a node OwnerReference to the given lease object -func setNodeOwnerFunc(c clientset.Interface, nodeName string) func(lease *coordinationv1.Lease) error { +func setNodeOwnerFunc(logger klog.Logger, c clientset.Interface, nodeName string) func(lease *coordinationv1.Lease) error { return func(lease *coordinationv1.Lease) error { // Setting owner reference needs node's UID. Note that it is different from // kubelet.nodeRef.UID. When lease is initially created, it is possible that @@ -478,7 +479,7 @@ func setNodeOwnerFunc(c clientset.Interface, nodeName string) func(lease *coordi }, } } else { - klog.Errorf("failed to get node %q when trying to set owner ref to the node lease: %v", nodeName, err) + logger.Error(err, "failed to get node when trying to set owner ref to the node lease", "node", nodeName) return err } } diff --git a/staging/src/k8s.io/component-helpers/node/util/cidr.go b/staging/src/k8s.io/component-helpers/node/util/cidr.go index 4d626ee0041e3..4aa1c2bfe70c0 100644 --- a/staging/src/k8s.io/component-helpers/node/util/cidr.go +++ b/staging/src/k8s.io/component-helpers/node/util/cidr.go @@ -37,7 +37,7 @@ type nodeSpecForMergePatch struct { } // PatchNodeCIDRs patches the specified node.CIDR=cidrs[0] and node.CIDRs to the given value. -func PatchNodeCIDRs(c clientset.Interface, node types.NodeName, cidrs []string) error { +func PatchNodeCIDRs(ctx context.Context, c clientset.Interface, node types.NodeName, cidrs []string) error { // set the pod cidrs list and set the old pod cidr field patch := nodeForCIDRMergePatch{ Spec: nodeSpecForMergePatch{ @@ -50,8 +50,8 @@ func PatchNodeCIDRs(c clientset.Interface, node types.NodeName, cidrs []string) if err != nil { return fmt.Errorf("failed to json.Marshal CIDR: %v", err) } - klog.V(4).Infof("cidrs patch bytes are:%s", string(patchBytes)) - if _, err := c.CoreV1().Nodes().Patch(context.TODO(), string(node), types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil { + klog.FromContext(ctx).V(4).Info("cidrs patch bytes", "patchBytes", string(patchBytes)) + if _, err := c.CoreV1().Nodes().Patch(ctx, string(node), types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil { return fmt.Errorf("failed to patch node CIDR: %v", err) } return nil diff --git a/staging/src/k8s.io/component-helpers/node/util/ips.go b/staging/src/k8s.io/component-helpers/node/util/ips.go index cf63cc3829e10..9c3d4a240363c 100644 --- a/staging/src/k8s.io/component-helpers/node/util/ips.go +++ b/staging/src/k8s.io/component-helpers/node/util/ips.go @@ -21,7 +21,6 @@ import ( "net" "strings" - "k8s.io/klog/v2" netutils "k8s.io/utils/net" ) @@ -31,8 +30,9 @@ const ( ) // parseNodeIP implements ParseNodeIPArgument and ParseNodeIPAnnotation -func parseNodeIP(nodeIP string, allowDual, sloppy bool) ([]net.IP, error) { +func parseNodeIP(nodeIP string, allowDual, sloppy bool) ([]net.IP, []string, error) { var nodeIPs []net.IP + var invalidIPs []string if nodeIP != "" || !sloppy { for _, ip := range strings.Split(nodeIP, ",") { if sloppy { @@ -40,10 +40,9 @@ func parseNodeIP(nodeIP string, allowDual, sloppy bool) ([]net.IP, error) { } parsedNodeIP := netutils.ParseIPSloppy(ip) if parsedNodeIP == nil { - if sloppy { - klog.InfoS("Could not parse node IP. Ignoring", "IP", ip) - } else { - return nil, fmt.Errorf("could not parse %q", ip) + invalidIPs = append(invalidIPs, ip) + if !sloppy { + return nil, invalidIPs, fmt.Errorf("could not parse %q", ip) } } else { nodeIPs = append(nodeIPs, parsedNodeIP) @@ -52,20 +51,22 @@ func parseNodeIP(nodeIP string, allowDual, sloppy bool) ([]net.IP, error) { } if len(nodeIPs) > 2 || (len(nodeIPs) == 2 && netutils.IsIPv6(nodeIPs[0]) == netutils.IsIPv6(nodeIPs[1])) { - return nil, fmt.Errorf("must contain either a single IP or a dual-stack pair of IPs") + return nil, invalidIPs, fmt.Errorf("must contain either a single IP or a dual-stack pair of IPs") } else if len(nodeIPs) == 2 && !allowDual { - return nil, fmt.Errorf("dual-stack not supported in this configuration") + return nil, invalidIPs, fmt.Errorf("dual-stack not supported in this configuration") } else if len(nodeIPs) == 2 && (nodeIPs[0].IsUnspecified() || nodeIPs[1].IsUnspecified()) { - return nil, fmt.Errorf("dual-stack node IP cannot include '0.0.0.0' or '::'") + return nil, invalidIPs, fmt.Errorf("dual-stack node IP cannot include '0.0.0.0' or '::'") } - return nodeIPs, nil + return nodeIPs, invalidIPs, nil } -// ParseNodeIPArgument parses kubelet's --node-ip argument. If nodeIP contains invalid -// values, they will be logged and ignored. Dual-stack node IPs are allowed if -// cloudProvider is unset or `"external"`. -func ParseNodeIPArgument(nodeIP, cloudProvider string) ([]net.IP, error) { +// ParseNodeIPArgument parses kubelet's --node-ip argument. +// If nodeIP contains invalid values, they will be returned as strings. +// This is done also when an error is returned. +// The caller then can decide what to do with the invalid values. +// Dual-stack node IPs are allowed if cloudProvider is unset or `"external"`. +func ParseNodeIPArgument(nodeIP, cloudProvider string) ([]net.IP, []string, error) { var allowDualStack bool if cloudProvider == cloudProviderNone || cloudProvider == cloudProviderExternal { allowDualStack = true @@ -77,5 +78,6 @@ func ParseNodeIPArgument(nodeIP, cloudProvider string) ([]net.IP, error) { // which can be either a single IP address or a comma-separated pair of IP addresses. // Unlike with ParseNodeIPArgument, invalid values are considered an error. func ParseNodeIPAnnotation(nodeIP string) ([]net.IP, error) { - return parseNodeIP(nodeIP, true, false) + nodeIps, _, err := parseNodeIP(nodeIP, true, false) + return nodeIps, err } diff --git a/staging/src/k8s.io/component-helpers/node/util/ips_test.go b/staging/src/k8s.io/component-helpers/node/util/ips_test.go index 4449bac84c047..e4ca397bac376 100644 --- a/staging/src/k8s.io/component-helpers/node/util/ips_test.go +++ b/staging/src/k8s.io/component-helpers/node/util/ips_test.go @@ -28,11 +28,12 @@ import ( func TestParseNodeIPArgument(t *testing.T) { testCases := []struct { - desc string - in string - out []net.IP - err string - ssErr string + desc string + in string + out []net.IP + invalids []string + err string + ssErr string }{ { desc: "empty --node-ip", @@ -40,14 +41,16 @@ func TestParseNodeIPArgument(t *testing.T) { out: nil, }, { - desc: "just whitespace (ignored)", - in: " ", - out: nil, + desc: "just whitespace (ignored)", + in: " ", + out: nil, + invalids: []string{""}, }, { - desc: "garbage (ignored)", - in: "blah", - out: nil, + desc: "garbage (ignored)", + in: "blah", + out: nil, + invalids: []string{"blah"}, }, { desc: "single IPv4", @@ -71,14 +74,16 @@ func TestParseNodeIPArgument(t *testing.T) { }, }, { - desc: "single IPv4 invalid (ignored)", - in: "1.2.3", - out: nil, + desc: "single IPv4 invalid (ignored)", + in: "1.2.3", + out: nil, + invalids: []string{"1.2.3"}, }, { - desc: "single IPv4 CIDR (ignored)", - in: "1.2.3.0/24", - out: nil, + desc: "single IPv4 CIDR (ignored)", + in: "1.2.3.0/24", + out: nil, + invalids: []string{"1.2.3.0/24"}, }, { desc: "single IPv4 unspecified", @@ -93,6 +98,7 @@ func TestParseNodeIPArgument(t *testing.T) { out: []net.IP{ netutils.ParseIPSloppy("1.2.3.4"), }, + invalids: []string{"not-an-IPv6-address"}, }, { desc: "single IPv6", @@ -155,7 +161,8 @@ func TestParseNodeIPArgument(t *testing.T) { netutils.ParseIPSloppy("abcd::ef01"), netutils.ParseIPSloppy("1.2.3.4"), }, - ssErr: "not supported in this configuration", + invalids: []string{"something else"}, + ssErr: "not supported in this configuration", }, { desc: "triple stack!", @@ -177,9 +184,10 @@ func TestParseNodeIPArgument(t *testing.T) { for _, conf := range configurations { desc := fmt.Sprintf("%s, cloudProvider=%q", tc.desc, conf.cloudProvider) t.Run(desc, func(t *testing.T) { - parsed, err := ParseNodeIPArgument(tc.in, conf.cloudProvider) + parsed, invalidIPs, err := ParseNodeIPArgument(tc.in, conf.cloudProvider) expectedOut := tc.out + expectedInvalids := tc.invalids expectedErr := tc.err if !conf.dualStackSupported { @@ -194,6 +202,9 @@ func TestParseNodeIPArgument(t *testing.T) { if !reflect.DeepEqual(parsed, expectedOut) { t.Errorf("expected %#v, got %#v", expectedOut, parsed) } + if !reflect.DeepEqual(invalidIPs, expectedInvalids) { + t.Errorf("[invalidIps] expected %#v, got %#v", expectedInvalids, invalidIPs) + } if err != nil { if expectedErr == "" { t.Errorf("unexpected error %v", err)