diff --git a/pkg/proxy/userspace/proxysocket.go b/pkg/proxy/userspace/proxysocket.go index 1fadfc1398c0..e8a89b8e34f6 100644 --- a/pkg/proxy/userspace/proxysocket.go +++ b/pkg/proxy/userspace/proxysocket.go @@ -95,10 +95,10 @@ func TryConnectEndpoints(service proxy.ServicePortName, srcAddr net.Addr, protoc for _, dialTimeout := range EndpointDialTimeouts { endpoint, err := loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset) if err != nil { - klog.Errorf("Couldn't find an endpoint for %s: %v", service, err) + klog.ErrorS(err, "Couldn't find an endpoint for service", "service", service) return nil, err } - klog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) + klog.V(3).InfoS("Mapped service to endpoint", "service", service, "endpoint", endpoint) // TODO: This could spin up a new goroutine to make the outbound connection, // and keep accepting inbound traffic. outConn, err := net.DialTimeout(protocol, endpoint, dialTimeout) @@ -106,7 +106,7 @@ func TryConnectEndpoints(service proxy.ServicePortName, srcAddr net.Addr, protoc if isTooManyFDsError(err) { panic("Dial failed: " + err.Error()) } - klog.Errorf("Dial failed: %v", err) + klog.ErrorS(err, "Dial failed") sessionAffinityReset = true continue } @@ -135,13 +135,13 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *Serv // Then the service port was just closed so the accept failure is to be expected. return } - klog.Errorf("Accept failed: %v", err) + klog.ErrorS(err, "Accept failed") continue } - klog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr()) + klog.V(3).InfoS("Accepted TCP connection from remote", "remoteAddress", inConn.RemoteAddr(), "localAddress", inConn.LocalAddr()) outConn, err := TryConnectEndpoints(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", loadBalancer) if err != nil { - klog.Errorf("Failed to connect to balancer: %v", err) + klog.ErrorS(err, "Failed to connect to balancer") inConn.Close() continue } @@ -154,8 +154,8 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *Serv func ProxyTCP(in, out *net.TCPConn) { var wg sync.WaitGroup wg.Add(2) - klog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v", - in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) + klog.V(4).InfoS("Creating proxy between remote and local addresses", + "inRemoteAddress", in.RemoteAddr(), "inLocalAddress", in.LocalAddr(), "outLocalAddress", out.LocalAddr(), "outRemoteAddress", out.RemoteAddr()) go copyBytes("from backend", in, out, &wg) go copyBytes("to backend", out, in, &wg) wg.Wait() @@ -163,14 +163,14 @@ func ProxyTCP(in, out *net.TCPConn) { func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) { defer wg.Done() - klog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr()) + klog.V(4).InfoS("Copying remote address bytes", "direction", direction, "sourceRemoteAddress", src.RemoteAddr(), "destinationRemoteAddress", dest.RemoteAddr()) n, err := io.Copy(dest, src) if err != nil { if !isClosedError(err) { - klog.Errorf("I/O error: %v", err) + klog.ErrorS(err, "I/O error occurred") } } - klog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr()) + klog.V(4).InfoS("Copied remote address bytes", "bytes", n, "direction", direction, "sourceRemoteAddress", src.RemoteAddr(), "destinationRemoteAddress", dest.RemoteAddr()) dest.Close() src.Close() } @@ -215,11 +215,11 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *Serv if err != nil { if e, ok := err.(net.Error); ok { if e.Temporary() { - klog.V(1).Infof("ReadFrom had a temporary failure: %v", err) + klog.V(1).ErrorS(err, "ReadFrom had a temporary failure") continue } } - klog.Errorf("ReadFrom failed, exiting ProxyLoop: %v", err) + klog.ErrorS(err, "ReadFrom failed, exiting ProxyLoop") break } // If this is a client we know already, reuse the connection and goroutine. @@ -232,14 +232,14 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *Serv _, err = svrConn.Write(buffer[0:n]) if err != nil { if !logTimeout(err) { - klog.Errorf("Write failed: %v", err) + klog.ErrorS(err, "Write failed") // TODO: Maybe tear down the goroutine for this client/server pair? } continue } err = svrConn.SetDeadline(time.Now().Add(myInfo.Timeout)) if err != nil { - klog.Errorf("SetDeadline failed: %v", err) + klog.ErrorS(err, "SetDeadline failed") continue } } @@ -253,14 +253,14 @@ func (udp *udpProxySocket) getBackendConn(activeClients *ClientCache, cliAddr ne if !found { // TODO: This could spin up a new goroutine to make the outbound connection, // and keep accepting inbound traffic. - klog.V(3).Infof("New UDP connection from %s", cliAddr) + klog.V(3).InfoS("New UDP connection from client", "address", cliAddr) var err error svrConn, err = TryConnectEndpoints(service, cliAddr, "udp", loadBalancer) if err != nil { return nil, err } if err = svrConn.SetDeadline(time.Now().Add(timeout)); err != nil { - klog.Errorf("SetDeadline failed: %v", err) + klog.ErrorS(err, "SetDeadline failed") return nil, err } activeClients.Clients[cliAddr.String()] = svrConn @@ -281,19 +281,19 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ n, err := svrConn.Read(buffer[0:]) if err != nil { if !logTimeout(err) { - klog.Errorf("Read failed: %v", err) + klog.ErrorS(err, "Read failed") } break } err = svrConn.SetDeadline(time.Now().Add(timeout)) if err != nil { - klog.Errorf("SetDeadline failed: %v", err) + klog.ErrorS(err, "SetDeadline failed") break } _, err = udp.WriteTo(buffer[0:n], cliAddr) if err != nil { if !logTimeout(err) { - klog.Errorf("WriteTo failed: %v", err) + klog.ErrorS(err, "WriteTo failed") } break } diff --git a/pkg/proxy/userspace/roundrobin.go b/pkg/proxy/userspace/roundrobin.go index f9a12102d7f5..473d6f1d32ac 100644 --- a/pkg/proxy/userspace/roundrobin.go +++ b/pkg/proxy/userspace/roundrobin.go @@ -83,7 +83,7 @@ func NewLoadBalancerRR() *LoadBalancerRR { } func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType v1.ServiceAffinity, ttlSeconds int) error { - klog.V(4).Infof("LoadBalancerRR NewService %q", svcPort) + klog.V(4).InfoS("LoadBalancerRR NewService", "servicePortName", svcPort) lb.lock.Lock() defer lb.lock.Unlock() lb.newServiceInternal(svcPort, affinityType, ttlSeconds) @@ -98,7 +98,7 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affi if _, exists := lb.services[svcPort]; !exists { lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlSeconds)} - klog.V(4).Infof("LoadBalancerRR service %q did not exist, created", svcPort) + klog.V(4).InfoS("LoadBalancerRR service does not exist, created", "servicePortName", svcPort) } else if affinityType != "" { lb.services[svcPort].affinity.affinityType = affinityType } @@ -106,7 +106,7 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affi } func (lb *LoadBalancerRR) DeleteService(svcPort proxy.ServicePortName) { - klog.V(4).Infof("LoadBalancerRR DeleteService %q", svcPort) + klog.V(4).InfoS("LoadBalancerRR DeleteService", "servicePortName", svcPort) lb.lock.Lock() defer lb.lock.Unlock() delete(lb.services, svcPort) @@ -146,7 +146,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne if len(state.endpoints) == 0 { return "", ErrMissingEndpoints } - klog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", svcPort, srcAddr, state.endpoints) + klog.V(4).InfoS("NextEndpoint for service", "servicePortName", svcPort, "address", srcAddr, "endpoints", state.endpoints) sessionAffinityEnabled := isSessionAffinity(&state.affinity) @@ -164,7 +164,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne // Affinity wins. endpoint := sessionAffinity.endpoint sessionAffinity.lastUsed = time.Now() - klog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %#v: %s", svcPort, ipaddr, sessionAffinity, endpoint) + klog.V(4).InfoS("NextEndpoint for service from IP with sessionAffinity", "servicePortName", svcPort, "IP", ipaddr, "sessionAffinity", sessionAffinity, "endpoint", endpoint) return endpoint, nil } } @@ -183,7 +183,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne affinity.lastUsed = time.Now() affinity.endpoint = endpoint affinity.clientIP = ipaddr - klog.V(4).Infof("Updated affinity key %s: %#v", ipaddr, state.affinity.affinityMap[ipaddr]) + klog.V(4).InfoS("Updated affinity key", "IP", ipaddr, "affinityState", state.affinity.affinityMap[ipaddr]) } return endpoint, nil @@ -193,7 +193,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) { for _, affinity := range state.affinity.affinityMap { if affinity.endpoint == endpoint { - klog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, svcPort) + klog.V(4).InfoS("Removing client from affinityMap for service", "endpoint", affinity.endpoint, "servicePortName", svcPort) delete(state.affinity.affinityMap, affinity.clientIP) } } @@ -214,7 +214,7 @@ func (lb *LoadBalancerRR) removeStaleAffinity(svcPort proxy.ServicePortName, new } for _, existingEndpoint := range state.endpoints { if !newEndpointsSet.Has(existingEndpoint) { - klog.V(2).Infof("Delete endpoint %s for service %q", existingEndpoint, svcPort) + klog.V(2).InfoS("Delete endpoint for service", "endpoint", existingEndpoint, "servicePortName", svcPort) removeSessionAffinityByEndpoint(state, svcPort, existingEndpoint) } } @@ -232,7 +232,7 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) { state, exists := lb.services[svcPort] if !exists || state == nil || len(newEndpoints) > 0 { - klog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) + klog.V(1).InfoS("LoadBalancerRR: Setting endpoints service", "servicePortName", svcPort, "endpoints", newEndpoints) // OnEndpointsAdd can be called without NewService being called externally. // To be safe we will call it here. A new service will only be created // if one does not already exist. @@ -264,7 +264,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint } if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(stringslices.Clone(curEndpoints), newEndpoints) { - klog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) + klog.V(1).InfoS("LoadBalancerRR: Setting endpoints for service", "servicePortName", svcPort, "endpoints", newEndpoints) lb.removeStaleAffinity(svcPort, newEndpoints) // OnEndpointsUpdate can be called without NewService being called externally. // To be safe we will call it here. A new service will only be created @@ -292,7 +292,7 @@ func (lb *LoadBalancerRR) resetService(svcPort proxy.ServicePortName) { // If the service is still around, reset but don't delete. if state, ok := lb.services[svcPort]; ok { if len(state.endpoints) > 0 { - klog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort) + klog.V(2).InfoS("LoadBalancerRR: Removing endpoints service", "servicePortName", svcPort) state.endpoints = []string{} } state.index = 0 @@ -335,7 +335,7 @@ func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort proxy.ServicePortNa } for ip, affinity := range state.affinity.affinityMap { if int(time.Since(affinity.lastUsed).Seconds()) >= state.affinity.ttlSeconds { - klog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, svcPort) + klog.V(4).InfoS("Removing client from affinityMap for service", "IP", affinity.clientIP, "servicePortName", svcPort) delete(state.affinity.affinityMap, ip) } }