Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert cmd/kubelet/app/server.go to structured logging #98334

Merged
merged 1 commit into from Mar 17, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
108 changes: 62 additions & 46 deletions cmd/kubelet/app/server.go
Expand Up @@ -25,6 +25,7 @@ import (
"math"
"net"
"net/http"
"os"
"path"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -116,7 +117,8 @@ func NewKubeletCommand() *cobra.Command {
kubeletConfig, err := options.NewKubeletConfiguration()
// programmer error
if err != nil {
klog.Fatal(err)
klog.ErrorS(err, "Failed to create a new kubelet configuration")
os.Exit(1)
}

cmd := &cobra.Command{
Expand Down Expand Up @@ -151,21 +153,24 @@ HTTP server: The kubelet can also listen for HTTP and respond to a simple API
Run: func(cmd *cobra.Command, args []string) {
// initial flag parse, since we disable cobra's flag parsing
if err := cleanFlagSet.Parse(args); err != nil {
klog.ErrorS(err, "Failed to parse kubelet flag")
cmd.Usage()
klog.Fatal(err)
os.Exit(1)
}

// check if there are non-flag arguments in the command line
cmds := cleanFlagSet.Args()
if len(cmds) > 0 {
klog.ErrorS(nil, "Unknown command", "command", cmds[0])

This comment was marked as resolved.

cmd.Usage()
klog.Fatalf("unknown command: %s", cmds[0])
os.Exit(1)
}

// short-circuit on help
help, err := cleanFlagSet.GetBool("help")
if err != nil {
klog.Fatal(`"help" flag is non-bool, programmer error, please correct`)
klog.InfoS(`"help" flag is non-bool, programmer error, please correct`)
os.Exit(1)
}
if help {
cmd.Help()
Expand All @@ -178,44 +183,50 @@ HTTP server: The kubelet can also listen for HTTP and respond to a simple API

// set feature gates from initial flags-based config
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
klog.ErrorS(err, "Failed to set feature gates from initial flags-based config")
os.Exit(1)
}

// validate the initial KubeletFlags
if err := options.ValidateKubeletFlags(kubeletFlags); err != nil {
klog.Fatal(err)
klog.ErrorS(err, "Failed to validate kubelet flags")
os.Exit(1)
}

if kubeletFlags.ContainerRuntime == "remote" && cleanFlagSet.Changed("pod-infra-container-image") {
klog.Warning("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
klog.InfoS("Warning: For remote container runtime, --pod-infra-container-image is ignored in kubelet, which should be set in that remote runtime instead")
}

// load kubelet config file, if provided
if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
kubeletConfig, err = loadConfigFile(configFile)
if err != nil {
klog.Fatal(err)
klog.ErrorS(err, "Failed to load kubelet config file", "path", configFile)
os.Exit(1)
}
// We must enforce flag precedence by re-parsing the command line into the new object.
// This is necessary to preserve backwards-compatibility across binary upgrades.
// See issue #56171 for more details.
if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
klog.Fatal(err)
klog.ErrorS(err, "Failed to precedence kubeletConfigFlag")
os.Exit(1)
}
// update feature gates based on new config
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
klog.ErrorS(err, "Failed to set feature gates from initial flags-based config")
os.Exit(1)
}
}

// We always validate the local configuration (command line + config file).
// This is the default "last-known-good" config for dynamic config, and must always remain valid.
if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
klog.Fatal(err)
klog.ErrorS(err, "Failed to validate kubelet configuration", "path", kubeletConfig)
os.Exit(1)
}

if (kubeletConfig.KubeletCgroups != "" && kubeletConfig.KubeReservedCgroup != "") && (0 != strings.Index(kubeletConfig.KubeletCgroups, kubeletConfig.KubeReservedCgroup)) {
klog.Warning("unsupported configuration:KubeletCgroups is not within KubeReservedCgroup")
klog.InfoS("unsupported configuration:KubeletCgroups is not within KubeReservedCgroup")
}

// use dynamic kubelet config, if enabled
Expand All @@ -231,15 +242,17 @@ HTTP server: The kubelet can also listen for HTTP and respond to a simple API
return kubeletConfigFlagPrecedence(kc, args)
})
if err != nil {
klog.Fatal(err)
klog.ErrorS(err, "Failed to bootstrap a configuration controller", "dynamicConfigDir", dynamicConfigDir)
os.Exit(1)
}
// If we should just use our existing, local config, the controller will return a nil config
if dynamicKubeletConfig != nil {
kubeletConfig = dynamicKubeletConfig
// Note: flag precedence was already enforced in the controller, prior to validation,
// by our above transform function. Now we simply update feature gates from the new config.
if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
klog.Fatal(err)
klog.ErrorS(err, "Failed to set feature gates from initial flags-based config")
os.Exit(1)
}
}
}
Expand All @@ -253,7 +266,8 @@ HTTP server: The kubelet can also listen for HTTP and respond to a simple API
// use kubeletServer to construct the default KubeletDeps
kubeletDeps, err := UnsecuredDependencies(kubeletServer, utilfeature.DefaultFeatureGate)
if err != nil {
klog.Fatal(err)
klog.ErrorS(err, "Failed to construct kubelet dependencies")
os.Exit(1)
}

// add the kubelet config controller to kubeletDeps
Expand All @@ -274,8 +288,10 @@ HTTP server: The kubelet can also listen for HTTP and respond to a simple API
klog.V(5).Infof("KubeletConfiguration: %#v", config)

// run the kubelet
klog.V(5).InfoS("KubeletConfiguration", "configuration", kubeletServer.KubeletConfiguration)
if err := Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate); err != nil {
klog.Fatal(err)
klog.ErrorS(err, "Failed to run kubelet")
os.Exit(1)
}
},
}
Expand Down Expand Up @@ -422,7 +438,7 @@ func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
logOption.LogSanitization = s.Logging.Sanitization
logOption.Apply()
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
klog.InfoS("Kubelet version", "kubeletVersion", version.Get())
if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
return fmt.Errorf("failed OS init: %v", err)
}
Expand All @@ -448,11 +464,11 @@ func setConfigz(cz *configz.Config, kc *kubeletconfiginternal.KubeletConfigurati
func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
cz, err := configz.New("kubeletconfig")
if err != nil {
klog.Errorf("unable to register configz: %s", err)
klog.ErrorS(err, "Failed to register configz")
return err
}
if err := setConfigz(cz, kc); err != nil {
klog.Errorf("unable to register config: %s", err)
klog.ErrorS(err, "Failed to register config")
return err
}
return nil
Expand All @@ -467,10 +483,10 @@ func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName)
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
eventBroadcaster.StartStructuredLogging(3)
if kubeDeps.EventClient != nil {
klog.V(4).Infof("Sending events to api server.")
klog.V(4).InfoS("Sending events to api server")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
} else {
klog.Warning("No api server defined - no events will be sent to API server.")
klog.InfoS("No api server defined - no events will be sent to API server")
}
}

Expand All @@ -491,12 +507,12 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
}
done := make(chan struct{})
if s.LockFilePath != "" {
klog.Infof("acquiring file lock on %q", s.LockFilePath)
klog.InfoS("Acquiring file lock", "path", s.LockFilePath)
if err := flock.Acquire(s.LockFilePath); err != nil {
return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
}
if s.ExitOnLockContention {
klog.Infof("watching for inotify events for: %v", s.LockFilePath)
klog.InfoS("Watching for inotify events", "path", s.LockFilePath)
if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
return err
}
Expand All @@ -506,7 +522,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
// Register current configuration with /configz endpoint
err = initConfigz(&s.KubeletConfiguration)
if err != nil {
klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
klog.ErrorS(err, "Failed to register kubelet configuration with configz")
}

if len(s.ShowHiddenMetricsForVersion) > 0 {
Expand Down Expand Up @@ -534,7 +550,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
return err
}
if cloud != nil {
klog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
klog.V(2).InfoS("Successfully initialized cloud provider", "cloudProvider", s.CloudProvider, "cloudConfigFile", s.CloudConfigFile)
}
kubeDeps.Cloud = cloud
}
Expand All @@ -555,7 +571,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
kubeDeps.KubeClient = nil
kubeDeps.EventClient = nil
kubeDeps.HeartbeatClient = nil
klog.Warningf("standalone mode, no API client")
klog.InfoS("Standalone mode, no API client")

case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
clientConfig, closeAllConns, err := buildKubeletClientConfig(ctx, s, nodeName)
Expand Down Expand Up @@ -611,14 +627,14 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
cgroupRoots = append(cgroupRoots, nodeAllocatableRoot)
kubeletCgroup, err := cm.GetKubeletContainer(s.KubeletCgroups)
if err != nil {
klog.Warningf("failed to get the kubelet's cgroup: %v. Kubelet system container metrics may be missing.", err)
klog.InfoS("Failed to get the kubelet's cgroup. Kubelet system container metrics may be missing.", "err", err)
} else if kubeletCgroup != "" {
cgroupRoots = append(cgroupRoots, kubeletCgroup)
}

runtimeCgroup, err := cm.GetRuntimeContainer(s.ContainerRuntime, s.RuntimeCgroups)
if err != nil {
klog.Warningf("failed to get the container runtime's cgroup: %v. Runtime system container metrics may be missing.", err)
klog.InfoS("Failed to get the container runtime's cgroup. Runtime system container metrics may be missing.", "err", err)
} else if runtimeCgroup != "" {
// RuntimeCgroups is optional, so ignore if it isn't specified
cgroupRoots = append(cgroupRoots, runtimeCgroup)
Expand All @@ -642,7 +658,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend

if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" {
klog.Info("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
klog.InfoS("--cgroups-per-qos enabled, but --cgroup-root was not specified. defaulting to /")
s.CgroupRoot = "/"
}

Expand All @@ -652,22 +668,22 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
machineInfo, err := kubeDeps.CAdvisorInterface.MachineInfo()
if err != nil {
// if can't use CAdvisor here, fall back to non-explicit cpu list behavor
klog.Warning("Failed to get MachineInfo, set reservedSystemCPUs to empty")
klog.InfoS("Failed to get MachineInfo, set reservedSystemCPUs to empty")
reservedSystemCPUs = cpuset.NewCPUSet()
} else {
var errParse error
reservedSystemCPUs, errParse = cpuset.Parse(s.ReservedSystemCPUs)
if errParse != nil {
// invalid cpu list is provided, set reservedSystemCPUs to empty, so it won't overwrite kubeReserved/systemReserved
klog.Infof("Invalid ReservedSystemCPUs \"%s\"", s.ReservedSystemCPUs)
klog.InfoS("Invalid ReservedSystemCPUs", "systemReservedCPUs", s.ReservedSystemCPUs)
return errParse
}
reservedList := reservedSystemCPUs.ToSlice()
first := reservedList[0]
last := reservedList[len(reservedList)-1]
if first < 0 || last >= machineInfo.NumCores {
// the specified cpuset is outside of the range of what the machine has
klog.Infof("Invalid cpuset specified by --reserved-cpus")
klog.InfoS("Invalid cpuset specified by --reserved-cpus")
return fmt.Errorf("Invalid cpuset %q specified by --reserved-cpus", s.ReservedSystemCPUs)
}
}
Expand All @@ -677,15 +693,15 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend

if reservedSystemCPUs.Size() > 0 {
// at cmd option valication phase it is tested either --system-reserved-cgroup or --kube-reserved-cgroup is specified, so overwrite should be ok
klog.Infof("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved=\"%v\", SystemReserved=\"%v\".", s.KubeReserved, s.SystemReserved)
klog.InfoS("Option --reserved-cpus is specified, it will overwrite the cpu setting in KubeReserved and SystemReserved", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
if s.KubeReserved != nil {
delete(s.KubeReserved, "cpu")
}
if s.SystemReserved == nil {
s.SystemReserved = make(map[string]string)
}
s.SystemReserved["cpu"] = strconv.Itoa(reservedSystemCPUs.Size())
klog.Infof("After cpu setting is overwritten, KubeReserved=\"%v\", SystemReserved=\"%v\"", s.KubeReserved, s.SystemReserved)
klog.InfoS("After cpu setting is overwritten", "kubeReservedCPUs", s.KubeReserved, "systemReservedCPUs", s.SystemReserved)
}

kubeReserved, err := parseResourceList(s.KubeReserved)
Expand Down Expand Up @@ -758,7 +774,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
// TODO(vmarmol): Do this through container config.
oomAdjuster := kubeDeps.OOMAdjuster
if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
klog.Warning(err)
klog.InfoS("Failed to ApplyOOMScoreAdj", "err", err)
}

err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration,
Expand Down Expand Up @@ -790,7 +806,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
go wait.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
if err != nil {
klog.Errorf("Starting healthz server failed: %v", err)
klog.ErrorS(err, "Failed to start healthz server")
}
}, 5*time.Second, wait.NeverStop)
}
Expand Down Expand Up @@ -833,7 +849,7 @@ func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, nod
// which provides a high powered kubeconfig on the master with cert/key data, we must
// bootstrap the cert manager with the contents of the initial client config.

klog.Infof("Client rotation is on, will bootstrap in background")
klog.InfoS("Client rotation is on, will bootstrap in background")
certConfig, clientConfig, err := bootstrap.LoadClientConfig(s.KubeConfig, s.BootstrapKubeconfig, s.CertDirectory)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -877,7 +893,7 @@ func buildKubeletClientConfig(ctx context.Context, s *options.KubeletServer, nod
return nil, nil, err
}

klog.V(2).Info("Starting client certificate rotation.")
klog.V(2).InfoS("Starting client certificate rotation")
clientCertificateManager.Start()

return transportConfig, closeAllConns, nil
Expand Down Expand Up @@ -971,7 +987,7 @@ func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName
return "", fmt.Errorf("error fetching current node name from cloud provider: %v", err)
}

klog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
klog.V(2).InfoS("Cloud provider determined current node", "nodeName", klog.KRef("", string(nodeName)))

return nodeName, nil
}
Expand Down Expand Up @@ -1005,7 +1021,7 @@ func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletCo
return nil, err
}

klog.V(4).Infof("Using self-signed cert (%s, %s)", kc.TLSCertFile, kc.TLSPrivateKeyFile)
klog.V(4).InfoS("Using self-signed cert", "TLSCertFile", kc.TLSCertFile, "TLSPrivateKeyFile", kc.TLSPrivateKeyFile)
}
}

Expand All @@ -1019,7 +1035,7 @@ func InitializeTLS(kf *options.KubeletFlags, kc *kubeletconfiginternal.KubeletCo
for i := 0; i < len(tlsCipherSuites); i++ {
for cipherName, cipherID := range insecureCiphers {
if tlsCipherSuites[i] == cipherID {
klog.Warningf("Use of insecure cipher '%s' detected.", cipherName)
klog.InfoS("Use of insecure cipher detected.", "cipher", cipherName)
}
}
}
Expand Down Expand Up @@ -1092,7 +1108,7 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie
for _, ip := range strings.Split(kubeServer.NodeIP, ",") {
parsedNodeIP := net.ParseIP(strings.TrimSpace(ip))
if parsedNodeIP == nil {
klog.Warningf("Could not parse --node-ip value %q; ignoring", ip)
klog.InfoS("Could not parse --node-ip ignoring", "IP", ip)
} else {
nodeIPs = append(nodeIPs, parsedNodeIP)
}
Expand All @@ -1113,7 +1129,7 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie
})

credentialprovider.SetPreferredDockercfgPath(kubeServer.RootDirectory)
klog.V(2).Infof("Using root directory: %v", kubeServer.RootDirectory)
klog.V(2).InfoS("Using root directory", "path", kubeServer.RootDirectory)

if kubeDeps.OSInterface == nil {
kubeDeps.OSInterface = kubecontainer.RealOS{}
Expand Down Expand Up @@ -1161,18 +1177,18 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie
podCfg := kubeDeps.PodConfig

if err := rlimit.SetNumFiles(uint64(kubeServer.MaxOpenFiles)); err != nil {
klog.Errorf("Failed to set rlimit on max file handles: %v", err)
klog.ErrorS(err, "Failed to set rlimit on max file handles")
}

// process pods and exit.
if runOnce {
if _, err := k.RunOnce(podCfg.Updates()); err != nil {
return fmt.Errorf("runonce failed: %v", err)
}
klog.Info("Started kubelet as runonce")
klog.InfoS("Started kubelet as runonce")
} else {
startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
klog.Info("Started kubelet")
klog.InfoS("Started kubelet")
}
return nil
}
Expand Down