Skip to content

Commit

Permalink
component-helpers: Support structured and contextual logging (#120637)
Browse files Browse the repository at this point in the history
  • Loading branch information
bells17 committed Apr 24, 2024
1 parent 646fbe6 commit 1c917aa
Show file tree
Hide file tree
Showing 17 changed files with 113 additions and 78 deletions.
11 changes: 7 additions & 4 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
}

if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
if err := RunKubelet(ctx, s, kubeDeps, s.RunOnce); err != nil {
return err
}

Expand Down Expand Up @@ -1202,7 +1202,7 @@ func setContentTypeForClient(cfg *restclient.Config, contentType string) {
// 3 Standalone 'kubernetes' binary
//
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
func RunKubelet(ctx context.Context, kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
hostname, err := nodeutil.GetHostname(kubeServer.HostnameOverride)
if err != nil {
return err
Expand All @@ -1214,12 +1214,15 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie
}
hostnameOverridden := len(kubeServer.HostnameOverride) > 0
// Setup event recorder if required.
makeEventRecorder(context.TODO(), kubeDeps, nodeName)
makeEventRecorder(ctx, kubeDeps, nodeName)

nodeIPs, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider)
nodeIPs, invalidNodeIps, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider)
if err != nil {
return fmt.Errorf("bad --node-ip %q: %v", kubeServer.NodeIP, err)
}
if len(invalidNodeIps) != 0 {
klog.FromContext(ctx).Info("Could not parse some node IP(s), ignoring them", "IPs", invalidNodeIps)
}

capabilities.Initialize(capabilities.Capabilities{
AllowPrivileged: true,
Expand Down
6 changes: 3 additions & 3 deletions cmd/kubemark/app/hollow_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func NewHollowNodeCommand() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
verflag.PrintAndExitIfRequested()
cliflag.PrintFlags(cmd.Flags())
return run(s)
return run(cmd.Context(), s)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
Expand All @@ -176,7 +176,7 @@ func NewHollowNodeCommand() *cobra.Command {
return cmd
}

func run(config *hollowNodeConfig) error {
func run(ctx context.Context, config *hollowNodeConfig) error {
// To help debugging, immediately log version and print flags.
klog.Infof("Version: %+v", version.Get())

Expand Down Expand Up @@ -264,7 +264,7 @@ func run(config *hollowNodeConfig) error {
runtimeService,
containerManager,
)
hollowKubelet.Run()
hollowKubelet.Run(ctx)
}

if config.Morph == "proxy" {
Expand Down
5 changes: 4 additions & 1 deletion cmd/kubemark/app/hollow_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"path/filepath"
"testing"
"time"

"k8s.io/kubernetes/test/utils/ktesting"
)

const fakeKubeconfig = `
Expand Down Expand Up @@ -80,7 +82,8 @@ func TestHollowNode(t *testing.T) {
go func() {
data, err := os.ReadFile(kubeconfigPath)
t.Logf("read %d, err=%v\n", len(data), err)
errCh <- run(s)
ctx := ktesting.Init(t)
errCh <- run(ctx, s)
}()

select {
Expand Down
1 change: 1 addition & 0 deletions hack/golangci-hints.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/client-go/metadata/.*
contextual k8s.io/client-go/tools/events/.*
contextual k8s.io/client-go/tools/record/.*
contextual k8s.io/component-helpers/.*
contextual k8s.io/dynamic-resource-allocation/.*
contextual k8s.io/kubernetes/cmd/kube-proxy/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
Expand Down
1 change: 1 addition & 0 deletions hack/golangci-strict.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/client-go/metadata/.*
contextual k8s.io/client-go/tools/events/.*
contextual k8s.io/client-go/tools/record/.*
contextual k8s.io/component-helpers/.*
contextual k8s.io/dynamic-resource-allocation/.*
contextual k8s.io/kubernetes/cmd/kube-proxy/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
Expand Down
1 change: 1 addition & 0 deletions hack/golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ linters-settings: # please keep this alphabetized
contextual k8s.io/client-go/metadata/.*
contextual k8s.io/client-go/tools/events/.*
contextual k8s.io/client-go/tools/record/.*
contextual k8s.io/component-helpers/.*
contextual k8s.io/dynamic-resource-allocation/.*
contextual k8s.io/kubernetes/cmd/kube-proxy/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
Expand Down
1 change: 1 addition & 0 deletions hack/logcheck.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ contextual k8s.io/apimachinery/pkg/util/runtime/.*
contextual k8s.io/client-go/metadata/.*
contextual k8s.io/client-go/tools/events/.*
contextual k8s.io/client-go/tools/record/.*
contextual k8s.io/component-helpers/.*
contextual k8s.io/dynamic-resource-allocation/.*
contextual k8s.io/kubernetes/cmd/kube-proxy/.*
contextual k8s.io/kubernetes/cmd/kube-scheduler/.*
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/nodeipam/ipam/cidr_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type CIDRAllocator interface {
// AllocateOrOccupyCIDR looks at the given node, assigns it a valid
// CIDR if it doesn't currently have one or mark the CIDR as used if
// the node already have one.
AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error
AllocateOrOccupyCIDR(ctx context.Context, node *v1.Node) error
// ReleaseCIDR releases the CIDR of the removed node.
ReleaseCIDR(logger klog.Logger, node *v1.Node) error
// Run starts all the working logic of the allocator.
Expand Down
17 changes: 10 additions & 7 deletions pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,18 @@ func NewCloudCIDRAllocator(ctx context.Context, client clientset.Interface, clou
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controllerutil.CreateAddNodeHandler(
func(node *v1.Node) error {
return ca.AllocateOrOccupyCIDR(logger, node)
return ca.AllocateOrOccupyCIDR(ctx, node)
}),
UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
if newNode.Spec.PodCIDR == "" {
return ca.AllocateOrOccupyCIDR(logger, newNode)
return ca.AllocateOrOccupyCIDR(ctx, newNode)
}
// Even if PodCIDR is assigned, but NetworkUnavailable condition is
// set to true, we need to process the node to set the condition.
networkUnavailableTaint := &v1.Taint{Key: v1.TaintNodeNetworkUnavailable, Effect: v1.TaintEffectNoSchedule}
_, cond := controllerutil.GetNodeCondition(&newNode.Status, v1.NodeNetworkUnavailable)
if cond == nil || cond.Status != v1.ConditionFalse || utiltaints.TaintExists(newNode.Spec.Taints, networkUnavailableTaint) {
return ca.AllocateOrOccupyCIDR(logger, newNode)
return ca.AllocateOrOccupyCIDR(ctx, newNode)
}
return nil
}),
Expand Down Expand Up @@ -173,7 +173,7 @@ func (ca *cloudCIDRAllocator) worker(ctx context.Context) {
logger.Info("Channel nodeCIDRUpdateChannel was unexpectedly closed")
return
}
if err := ca.updateCIDRAllocation(logger, workItem); err == nil {
if err := ca.updateCIDRAllocation(ctx, workItem); err == nil {
logger.V(3).Info("Updated CIDR", "workItem", workItem)
} else {
logger.Error(err, "Error updating CIDR", "workItem", workItem)
Expand Down Expand Up @@ -243,10 +243,12 @@ func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) {
// WARNING: If you're adding any return calls or defer any more work from this
// function you have to make sure to update nodesInProcessing properly with the
// disposition of the node when the work is done.
func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error {
func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(ctx context.Context, node *v1.Node) error {
if node == nil {
return nil
}

logger := klog.FromContext(ctx)
if !ca.insertNodeToProcessing(node.Name) {
logger.V(2).Info("Node is already in a process of CIDR assignment", "node", klog.KObj(node))
return nil
Expand All @@ -258,7 +260,8 @@ func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.
}

// updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
func (ca *cloudCIDRAllocator) updateCIDRAllocation(logger klog.Logger, nodeName string) error {
func (ca *cloudCIDRAllocator) updateCIDRAllocation(ctx context.Context, nodeName string) error {
logger := klog.FromContext(ctx)
node, err := ca.nodeLister.Get(nodeName)
if err != nil {
if errors.IsNotFound(err) {
Expand Down Expand Up @@ -305,7 +308,7 @@ func (ca *cloudCIDRAllocator) updateCIDRAllocation(logger klog.Logger, nodeName
// See https://github.com/kubernetes/kubernetes/pull/42147#discussion_r103357248
}
for i := 0; i < cidrUpdateRetries; i++ {
if err = nodeutil.PatchNodeCIDRs(ca.client, types.NodeName(node.Name), cidrStrings); err == nil {
if err = nodeutil.PatchNodeCIDRs(ctx, ca.client, types.NodeName(node.Name), cidrStrings); err == nil {
logger.Info("Set the node PodCIDRs", "node", klog.KObj(node), "cidrStrings", cidrStrings)
break
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/nodeipam/ipam/cloud_cidr_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ func TestBoundedRetries(t *testing.T) {
nodesSynced: sharedInfomer.Core().V1().Nodes().Informer().HasSynced,
nodesInProcessing: map[string]*nodeProcessingInfo{},
}
logger, ctx := ktesting.NewTestContext(t)
_, ctx := ktesting.NewTestContext(t)
go ca.worker(ctx)
nodeName := "testNode"
ca.AllocateOrOccupyCIDR(logger, &v1.Node{
if err := ca.AllocateOrOccupyCIDR(ctx, &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: nodeName,
},
})
}); err != nil {
t.Errorf("unexpected error in AllocateOrOccupyCIDR: %v", err)
}
for hasNodeInProcessing(ca, nodeName) {
// wait for node to finish processing (should terminate and not time out)
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/controller/nodeipam/ipam/range_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ type rangeAllocator struct {
queue workqueue.RateLimitingInterface
}

var _ CIDRAllocator = &rangeAllocator{}

// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDRs for node (one from each of clusterCIDRs)
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
// Caller must always pass in a list of existing nodes so the new allocator.
Expand Down Expand Up @@ -228,7 +230,7 @@ func (r *rangeAllocator) processNextNodeWorkItem(ctx context.Context) bool {
}
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := r.syncNode(logger, key); err != nil {
if err := r.syncNode(ctx, key); err != nil {
// Put the item back on the queue to handle any transient errors.
r.queue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
Expand All @@ -248,7 +250,8 @@ func (r *rangeAllocator) processNextNodeWorkItem(ctx context.Context) bool {
return true
}

func (r *rangeAllocator) syncNode(logger klog.Logger, key string) error {
func (r *rangeAllocator) syncNode(ctx context.Context, key string) error {
logger := klog.FromContext(ctx)
startTime := time.Now()
defer func() {
logger.V(4).Info("Finished syncing Node request", "node", key, "elapsed", time.Since(startTime))
Expand All @@ -269,7 +272,7 @@ func (r *rangeAllocator) syncNode(logger klog.Logger, key string) error {
logger.V(3).Info("node is being deleted", "node", key)
return r.ReleaseCIDR(logger, node)
}
return r.AllocateOrOccupyCIDR(logger, node)
return r.AllocateOrOccupyCIDR(ctx, node)
}

// marks node.PodCIDRs[...] as used in allocator's tracked cidrSet
Expand Down Expand Up @@ -299,7 +302,7 @@ func (r *rangeAllocator) occupyCIDRs(node *v1.Node) error {
// WARNING: If you're adding any return calls or defer any more work from this
// function you have to make sure to update nodesInProcessing properly with the
// disposition of the node when the work is done.
func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error {
func (r *rangeAllocator) AllocateOrOccupyCIDR(ctx context.Context, node *v1.Node) error {
if node == nil {
return nil
}
Expand All @@ -308,6 +311,7 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node)
return r.occupyCIDRs(node)
}

logger := klog.FromContext(ctx)
allocatedCIDRs := make([]*net.IPNet, len(r.cidrSets))

for idx := range r.cidrSets {
Expand All @@ -321,7 +325,7 @@ func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node)

//queue the assignment
logger.V(4).Info("Putting node with CIDR into the work queue", "node", klog.KObj(node), "CIDRs", allocatedCIDRs)
return r.updateCIDRsAllocation(logger, node.Name, allocatedCIDRs)
return r.updateCIDRsAllocation(ctx, node.Name, allocatedCIDRs)
}

// ReleaseCIDR marks node.podCIDRs[...] as unused in our tracked cidrSets
Expand Down Expand Up @@ -373,9 +377,10 @@ func (r *rangeAllocator) filterOutServiceRange(logger klog.Logger, serviceCIDR *
}

// updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server.
func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, nodeName string, allocatedCIDRs []*net.IPNet) error {
func (r *rangeAllocator) updateCIDRsAllocation(ctx context.Context, nodeName string, allocatedCIDRs []*net.IPNet) error {
var err error
var node *v1.Node
logger := klog.FromContext(ctx)
cidrsString := ipnetToStringList(allocatedCIDRs)
node, err = r.nodeLister.Get(nodeName)
if err != nil {
Expand Down Expand Up @@ -413,7 +418,7 @@ func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, nodeName stri

// If we reached here, it means that the node has no CIDR currently assigned. So we set it.
for i := 0; i < cidrUpdateRetries; i++ {
if err = nodeutil.PatchNodeCIDRs(r.client, types.NodeName(node.Name), cidrsString); err == nil {
if err = nodeutil.PatchNodeCIDRs(ctx, r.client, types.NodeName(node.Name), cidrsString); err == nil {
logger.Info("Set node PodCIDR", "node", klog.KObj(node), "podCIDRs", cidrsString)
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/nodeipam/ipam/range_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
}

// test function
logger, tCtx := ktesting.NewTestContext(t)
_, tCtx := ktesting.NewTestContext(t)
testFunc := func(tc testCase) {
fakeNodeInformer := test.FakeNodeInformer(tc.fakeNodeHandler)
nodeList, _ := tc.fakeNodeHandler.List(tCtx, metav1.ListOptions{})
Expand Down Expand Up @@ -547,7 +547,7 @@ func TestAllocateOrOccupyCIDRSuccess(t *testing.T) {
if node.Spec.PodCIDRs == nil {
updateCount++
}
if err := allocator.AllocateOrOccupyCIDR(logger, node); err != nil {
if err := allocator.AllocateOrOccupyCIDR(tCtx, node); err != nil {
t.Errorf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err)
}
}
Expand Down Expand Up @@ -610,7 +610,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) {
},
},
}
logger, tCtx := ktesting.NewTestContext(t)
_, tCtx := ktesting.NewTestContext(t)
testFunc := func(tc testCase) {
// Initialize the range allocator.
allocator, err := NewCIDRRangeAllocator(tCtx, tc.fakeNodeHandler, test.FakeNodeInformer(tc.fakeNodeHandler), tc.allocatorParams, nil)
Expand Down Expand Up @@ -639,7 +639,7 @@ func TestAllocateOrOccupyCIDRFailure(t *testing.T) {
}
}
}
if err := allocator.AllocateOrOccupyCIDR(logger, tc.fakeNodeHandler.Existing[0]); err == nil {
if err := allocator.AllocateOrOccupyCIDR(tCtx, tc.fakeNodeHandler.Existing[0]); err == nil {
t.Errorf("%v: unexpected success in AllocateOrOccupyCIDR: %v", tc.description, err)
}
// We don't expect any updates, so just sleep for some time
Expand Down Expand Up @@ -782,7 +782,7 @@ func TestReleaseCIDRSuccess(t *testing.T) {
}
}

err := allocator.AllocateOrOccupyCIDR(logger, tc.fakeNodeHandler.Existing[0])
err := allocator.AllocateOrOccupyCIDR(tCtx, tc.fakeNodeHandler.Existing[0])
if len(tc.expectedAllocatedCIDRFirstRound) != 0 {
if err != nil {
t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err)
Expand Down Expand Up @@ -812,7 +812,7 @@ func TestReleaseCIDRSuccess(t *testing.T) {
t.Fatalf("%v: unexpected error in ReleaseCIDR: %v", tc.description, err)
}
}
if err = allocator.AllocateOrOccupyCIDR(logger, tc.fakeNodeHandler.Existing[0]); err != nil {
if err = allocator.AllocateOrOccupyCIDR(tCtx, tc.fakeNodeHandler.Existing[0]); err != nil {
t.Fatalf("%v: unexpected error in AllocateOrOccupyCIDR: %v", tc.description, err)
}
if err := test.WaitForUpdatedNodeWithTimeout(tc.fakeNodeHandler, 1, wait.ForeverTestTimeout); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/kubemark/hollow_kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package kubemark

import (
"context"
"fmt"
"time"

Expand Down Expand Up @@ -126,8 +127,8 @@ func NewHollowKubelet(
}

// Starts this HollowKubelet and blocks.
func (hk *HollowKubelet) Run() {
if err := kubeletapp.RunKubelet(&options.KubeletServer{
func (hk *HollowKubelet) Run(ctx context.Context) {
if err := kubeletapp.RunKubelet(ctx, &options.KubeletServer{
KubeletFlags: *hk.KubeletFlags,
KubeletConfiguration: *hk.KubeletConfiguration,
}, hk.KubeletDeps, false); err != nil {
Expand Down

0 comments on commit 1c917aa

Please sign in to comment.