Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrated server.go, ipvs/proxier.go(partial) to structured logging #105769

Merged
9 changes: 5 additions & 4 deletions cmd/kube-proxy/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func NewOptions() *Options {
// Complete completes all the required options.
func (o *Options) Complete() error {
if len(o.ConfigFile) == 0 && len(o.WriteConfigTo) == 0 {
klog.InfoS("WARNING: all flags other than --config, --write-config-to, and --cleanup are deprecated. Please begin using a config file ASAP")
klog.InfoS("Warning, all flags other than --config, --write-config-to, and --cleanup are deprecated, please begin using a config file ASAP")
o.config.HealthzBindAddress = addressFromDeprecatedFlags(o.config.HealthzBindAddress, o.healthzPort)
o.config.MetricsBindAddress = addressFromDeprecatedFlags(o.config.MetricsBindAddress, o.metricsPort)
}
Expand Down Expand Up @@ -498,7 +498,8 @@ with the apiserver API to configure the proxy.`,
}

if err := opts.Run(); err != nil {
klog.Exit(err)
klog.ErrorS(err, "Error running ProxyServer")
os.Exit(1)
}
},
Args: func(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -560,7 +561,7 @@ func createClients(config componentbaseconfig.ClientConnectionConfiguration, mas
var err error

if len(config.Kubeconfig) == 0 && len(masterOverride) == 0 {
klog.InfoS("Neither kubeconfig file nor master URL was specified. Falling back to in-cluster config")
klog.InfoS("Neither kubeconfig file nor master URL was specified, falling back to in-cluster config")
kubeConfig, err = rest.InClusterConfig()
} else {
// This creates a client, first loading any specified kubeconfig
Expand Down Expand Up @@ -843,7 +844,7 @@ func detectNodeIP(client clientset.Interface, hostname, bindAddress string) net.
nodeIP = utilnode.GetNodeIP(client, hostname)
}
if nodeIP == nil {
klog.V(0).Infof("can't determine this node's IP, assuming 127.0.0.1; if this is incorrect, please set the --bind-address flag")
klog.V(0).InfoS("Can't determine this node's IP, assuming 127.0.0.1; if this is incorrect, please set the --bind-address flag")
nodeIP = netutils.ParseIPSloppy("127.0.0.1")
}
return nodeIP
Expand Down
67 changes: 33 additions & 34 deletions pkg/proxy/ipvs/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func NewProxier(ipt utiliptables.Interface,
// are connected to a Linux bridge (but not SDN bridges). Until most
// plugins handle this, log when config is missing
if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
klog.InfoS("Missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
klog.InfoS("Missing br-netfilter module or unset sysctl br-nf-call-iptables, proxy may not work as intended")
}

// Set the conntrack sysctl we need for
Expand All @@ -378,7 +378,7 @@ func NewProxier(ipt utiliptables.Interface,
return nil, fmt.Errorf("error parsing kernel version %q: %v", kernelVersionStr, err)
}
if kernelVersion.LessThan(version.MustParseGeneric(connReuseMinSupportedKernelVersion)) {
klog.ErrorS(nil, fmt.Sprintf("can't set sysctl %s, kernel version must be at least %s", sysctlConnReuse, connReuseMinSupportedKernelVersion))
klog.ErrorS(nil, "Can't set sysctl, kernel version doesn't satisfy minimum version requirements", "sysctl", sysctlConnReuse, "minimumKernelVersion", connReuseMinSupportedKernelVersion)
} else if kernelVersion.AtLeast(version.MustParseGeneric(connReuseFixedKernelVersion)) {
// https://github.com/kubernetes/kubernetes/issues/93297
klog.V(2).InfoS("Left as-is", "sysctl", sysctlConnReuse)
Expand Down Expand Up @@ -421,7 +421,7 @@ func NewProxier(ipt utiliptables.Interface,
// current system timeout should be preserved
if tcpTimeout > 0 || tcpFinTimeout > 0 || udpTimeout > 0 {
if err := ipvs.ConfigureTimeouts(tcpTimeout, tcpFinTimeout, udpTimeout); err != nil {
klog.ErrorS(err, "failed to configure IPVS timeouts")
klog.ErrorS(err, "Failed to configure IPVS timeouts")
}
}

Expand All @@ -434,7 +434,7 @@ func NewProxier(ipt utiliptables.Interface,
ipFamily = v1.IPv6Protocol
}

klog.V(2).InfoS("record nodeIP and family", "nodeIP", nodeIP, "family", ipFamily)
klog.V(2).InfoS("Record nodeIP and family", "nodeIP", nodeIP, "family", ipFamily)

if len(scheduler) == 0 {
klog.InfoS("IPVS scheduler not specified, use rr by default")
Expand All @@ -447,7 +447,7 @@ func NewProxier(ipt utiliptables.Interface,
nodePortAddresses = ipFamilyMap[ipFamily]
// Log the IPs not matching the ipFamily
if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(ipFamily)]; ok && len(ips) > 0 {
klog.InfoS("found node IPs of the wrong family", "ipFamily", ipFamily, "ips", strings.Join(ips, ","))
klog.InfoS("Found node IPs of the wrong family", "ipFamily", ipFamily, "IPs", ips)
}

// excludeCIDRs has been validated before, here we just parse it to IPNet list
Expand Down Expand Up @@ -620,7 +620,7 @@ func (handle *LinuxKernelHandler) GetModules() ([]string, error) {
// Find out loaded kernel modules. If this is a full static kernel it will try to verify if the module is compiled using /boot/config-KERNELVERSION
modulesFile, err := os.Open("/proc/modules")
if err == os.ErrNotExist {
klog.ErrorS(err, "Failed to read file /proc/modules. Assuming this is a kernel without loadable modules support enabled")
klog.ErrorS(err, "Failed to read file /proc/modules, assuming this is a kernel without loadable modules support enabled")
kernelConfigFile := fmt.Sprintf("/boot/config-%s", kernelVersionStr)
kConfig, err := ioutil.ReadFile(kernelConfigFile)
if err != nil {
Expand All @@ -646,7 +646,7 @@ func (handle *LinuxKernelHandler) GetModules() ([]string, error) {
builtinModsFilePath := fmt.Sprintf("/lib/modules/%s/modules.builtin", kernelVersionStr)
b, err := ioutil.ReadFile(builtinModsFilePath)
if err != nil {
klog.ErrorS(err, "Failed to read builtin modules file. You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", "filePath", builtinModsFilePath)
klog.ErrorS(err, "Failed to read builtin modules file, you can ignore this message when kube-proxy is running inside container without mounting /lib/modules", "filePath", builtinModsFilePath)
}

for _, module := range ipvsModules {
Expand All @@ -656,8 +656,8 @@ func (handle *LinuxKernelHandler) GetModules() ([]string, error) {
// Try to load the required IPVS kernel modules if not built in
err := handle.executor.Command("modprobe", "--", module).Run()
if err != nil {
klog.InfoS("Failed to load kernel module with modprobe. "+
"You can ignore this message when kube-proxy is running inside container without mounting /lib/modules", "moduleName", module)
klog.InfoS("Failed to load kernel module with modprobe, "+
"you can ignore this message when kube-proxy is running inside container without mounting /lib/modules", "moduleName", module)
} else {
lmods = append(lmods, module)
}
Expand Down Expand Up @@ -804,15 +804,15 @@ func CleanupLeftovers(ipvs utilipvs.Interface, ipt utiliptables.Interface, ipset
if ipvs != nil {
err := ipvs.Flush()
if err != nil {
klog.ErrorS(err, "Error flushing IPVS rules")
klog.ErrorS(err, "Error flushing ipvs rules")
encounteredError = true
}
}
// Delete dummy interface created by ipvs Proxier.
nl := NewNetLinkHandle(false)
err := nl.DeleteDummyDevice(DefaultDummyDevice)
if err != nil {
klog.ErrorS(err, "Error deleting dummy device created by IPVS proxier", "device", DefaultDummyDevice)
klog.ErrorS(err, "Error deleting dummy device created by ipvs proxier", "device", DefaultDummyDevice)
encounteredError = true
}
// Clear iptables created by ipvs Proxier.
Expand Down Expand Up @@ -1020,7 +1020,7 @@ func (proxier *Proxier) syncProxyRules() {
// merge stale services gathered from updateEndpointsMap
for _, svcPortName := range endpointUpdateResult.StaleServiceNames {
if svcInfo, ok := proxier.serviceMap[svcPortName]; ok && svcInfo != nil && conntrack.IsClearConntrackNeeded(svcInfo.Protocol()) {
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "svcPortName", svcPortName.String(), "clusterIP", svcInfo.ClusterIP().String())
klog.V(2).InfoS("Stale service", "protocol", strings.ToLower(string(svcInfo.Protocol())), "servicePortName", svcPortName, "clusterIP", svcInfo.ClusterIP())
staleServices.Insert(svcInfo.ClusterIP().String())
for _, extIP := range svcInfo.ExternalIPStrings() {
staleServices.Insert(extIP)
Expand All @@ -1031,7 +1031,7 @@ func (proxier *Proxier) syncProxyRules() {
}
}

klog.V(3).InfoS("Syncing ipvs Proxier rules")
klog.V(3).InfoS("Syncing ipvs proxier rules")

// Begin install iptables

Expand Down Expand Up @@ -1074,7 +1074,7 @@ func (proxier *Proxier) syncProxyRules() {

bindedAddresses, err := proxier.ipGetter.BindedIPs()
if err != nil {
klog.ErrorS(err, "error listing addresses binded to dummy interface")
klog.ErrorS(err, "Error listing addresses binded to dummy interface")
}

hasNodePort := false
Expand All @@ -1098,7 +1098,7 @@ func (proxier *Proxier) syncProxyRules() {
if hasNodePort {
nodeAddrSet, err := utilproxy.GetNodeAddresses(proxier.nodePortAddresses, proxier.networkInterfacer)
if err != nil {
klog.ErrorS(err, "Failed to get node ip address matching nodeport cidr")
klog.ErrorS(err, "Failed to get node IP address matching nodeport cidr")
} else {
nodeAddresses = nodeAddrSet.List()
for _, address := range nodeAddresses {
Expand Down Expand Up @@ -1135,7 +1135,7 @@ func (proxier *Proxier) syncProxyRules() {
for svcName, svc := range proxier.serviceMap {
svcInfo, ok := svc.(*serviceInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast serviceInfo", "svcName", svcName.String())
klog.ErrorS(nil, "Failed to cast serviceInfo", "serviceName", svcName)
continue
}
isIPv6 := netutils.IsIPv6(svcInfo.ClusterIP())
Expand All @@ -1152,7 +1152,7 @@ func (proxier *Proxier) syncProxyRules() {
for _, e := range proxier.endpointsMap[svcName] {
ep, ok := e.(*proxy.BaseEndpointInfo)
if !ok {
klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e.String())
klog.ErrorS(nil, "Failed to cast BaseEndpointInfo", "endpoint", e)
continue
}
if !ep.IsLocal {
Expand All @@ -1172,7 +1172,7 @@ func (proxier *Proxier) syncProxyRules() {
SetType: utilipset.HashIPPortIP,
}
if valid := proxier.ipsetList[kubeLoopBackIPSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name)
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoopBackIPSet].Name)
continue
}
proxier.ipsetList[kubeLoopBackIPSet].activeEntries.Insert(entry.String())
Expand All @@ -1189,7 +1189,7 @@ func (proxier *Proxier) syncProxyRules() {
// add service Cluster IP:Port to kubeServiceAccess ip set for the purpose of solving hairpin.
// proxier.kubeServiceAccessSet.activeEntries.Insert(entry.String())
if valid := proxier.ipsetList[kubeClusterIPSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeClusterIPSet].Name)
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeClusterIPSet].Name)
continue
}
proxier.ipsetList[kubeClusterIPSet].activeEntries.Insert(entry.String())
Expand All @@ -1216,10 +1216,10 @@ func (proxier *Proxier) syncProxyRules() {
internalNodeLocal = true
}
if err := proxier.syncEndpoint(svcName, internalNodeLocal, serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
klog.ErrorS(err, "Failed to sync endpoint for service", "serviceName", svcName, "virtualServer", serv)
}
} else {
klog.ErrorS(err, "Failed to sync service", "service", serv.String())
klog.ErrorS(err, "Failed to sync service", "serviceName", svcName, "virtualServer", serv)
}

// Capture externalIPs.
Expand All @@ -1237,7 +1237,7 @@ func (proxier *Proxier) syncProxyRules() {
Protocol: netutils.Protocol(svcInfo.Protocol()),
}
if proxier.portsMap[lp] != nil {
klog.V(4).InfoS("Port was open before and is still needed", "port", lp.String())
klog.V(4).InfoS("Port was open before and is still needed", "port", lp)
replacementPortsMap[lp] = proxier.portsMap[lp]
} else {
socket, err := proxier.portMapper.OpenLocalPort(&lp)
Expand All @@ -1251,10 +1251,10 @@ func (proxier *Proxier) syncProxyRules() {
UID: types.UID(proxier.hostname),
Namespace: "",
}, nil, v1.EventTypeWarning, err.Error(), "SyncProxyRules", msg)
klog.ErrorS(err, "can't open port, skipping it", "port", lp.String())
klog.ErrorS(err, "Can't open port, skipping it", "port", lp)
continue
}
klog.V(2).InfoS("Opened local port", "port", lp.String())
klog.V(2).InfoS("Opened local port", "port", lp)
replacementPortsMap[lp] = socket
}
} // We're holding the port, so it's OK to install IPVS rules.
Expand All @@ -1269,14 +1269,14 @@ func (proxier *Proxier) syncProxyRules() {

if svcInfo.NodeLocalExternal() {
if valid := proxier.ipsetList[kubeExternalIPLocalSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name)
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeExternalIPLocalSet].Name)
continue
}
proxier.ipsetList[kubeExternalIPLocalSet].activeEntries.Insert(entry.String())
} else {
// We have to SNAT packets to external IPs.
if valid := proxier.ipsetList[kubeExternalIPSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeExternalIPSet].Name)
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeExternalIPSet].Name)
continue
}
proxier.ipsetList[kubeExternalIPSet].activeEntries.Insert(entry.String())
Expand All @@ -1298,10 +1298,10 @@ func (proxier *Proxier) syncProxyRules() {
activeBindAddrs[serv.Address.String()] = true

if err := proxier.syncEndpoint(svcName, svcInfo.NodeLocalExternal(), serv); err != nil {
klog.ErrorS(err, "Failed to sync endpoint for service", "service", serv.String())
klog.ErrorS(err, "Failed to sync endpoint for service", "serviceName", svcName, "virtualServer", serv)
}
} else {
klog.ErrorS(err, "Failed to sync service", "service", serv.String())
klog.ErrorS(err, "Failed to sync service", "service", svcName, "virtualServer", serv)
}
}

Expand All @@ -1320,14 +1320,14 @@ func (proxier *Proxier) syncProxyRules() {
// If we are proxying globally, we need to masquerade in case we cross nodes.
// If we are proxying only locally, we can retain the source IP.
if valid := proxier.ipsetList[kubeLoadBalancerSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name)
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSet].Name)
continue
}
proxier.ipsetList[kubeLoadBalancerSet].activeEntries.Insert(entry.String())
// insert loadbalancer entry to lbIngressLocalSet if service externaltrafficpolicy=local
if svcInfo.NodeLocalExternal() {
if valid := proxier.ipsetList[kubeLoadBalancerLocalSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name)
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerLocalSet].Name)
continue
}
proxier.ipsetList[kubeLoadBalancerLocalSet].activeEntries.Insert(entry.String())
Expand All @@ -1337,7 +1337,7 @@ func (proxier *Proxier) syncProxyRules() {
// This currently works for loadbalancers that preserves source ips.
// For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply.
if valid := proxier.ipsetList[kubeLoadbalancerFWSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadbalancerFWSet].Name)
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadbalancerFWSet].Name)
continue
}
proxier.ipsetList[kubeLoadbalancerFWSet].activeEntries.Insert(entry.String())
Expand All @@ -1353,7 +1353,7 @@ func (proxier *Proxier) syncProxyRules() {
}
// enumerate all white list source cidr
if valid := proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].Name)
continue
}
proxier.ipsetList[kubeLoadBalancerSourceCIDRSet].activeEntries.Insert(entry.String())
Expand All @@ -1377,13 +1377,12 @@ func (proxier *Proxier) syncProxyRules() {
}
// enumerate all white list source ip
if valid := proxier.ipsetList[kubeLoadBalancerSourceIPSet].validateEntry(entry); !valid {
klog.ErrorS(nil, "error adding entry to ipset", "entry", entry.String(), "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)
klog.ErrorS(nil, "Error adding entry to ipset", "entry", entry, "ipset", proxier.ipsetList[kubeLoadBalancerSourceIPSet].Name)
continue
}
proxier.ipsetList[kubeLoadBalancerSourceIPSet].activeEntries.Insert(entry.String())
}
}

// ipvs call
serv := &utilipvs.VirtualServer{
Address: netutils.ParseIPSloppy(ingress),
Expand Down