Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
Merge pull request #532 from mneverov/fix-epslices
Browse files Browse the repository at this point in the history
Fix epslices
  • Loading branch information
k8s-ci-robot committed Sep 30, 2023
2 parents 307a4dd + 6ec42e7 commit 6ccd640
Show file tree
Hide file tree
Showing 11 changed files with 92 additions and 36 deletions.
2 changes: 2 additions & 0 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ require (
github.com/golang/protobuf v1.5.3
google.golang.org/grpc v1.50.0
google.golang.org/protobuf v1.28.1
k8s.io/klog/v2 v2.80.1
)

require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/sys v0.11.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions api/go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
Expand All @@ -15,3 +16,4 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w=
google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
k8s.io/klog/v2 v2.80.1 h1:atnLQ121W371wYYFawwYx1aEY2eUfs4l3J72wtgAwV4=
42 changes: 33 additions & 9 deletions api/localv1/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package localv1

import (
"fmt"
"k8s.io/klog/v2"
"net"
)

Expand All @@ -29,30 +31,52 @@ func (ep *Endpoint) AddAddress(s string) (ip net.IP) {
return ep.IPs.Add(s)
}

func (ep *Endpoint) PortMapping(port *PortMapping) (target int32) {
target = port.TargetPort
if port.TargetPortName != "" {
func (ep *Endpoint) PortMapping(port *PortMapping) (int32, error) {
nameToFind := ""
if port.Name != "" {
nameToFind = port.Name
} else if port.TargetPortName != "" {
nameToFind = port.TargetPortName
}

if nameToFind != "" {
for _, override := range ep.PortOverrides {
if override.Name == port.Name {
target = override.Port
break
if override.Name == nameToFind {
return override.Port, nil
}
}
return 0, fmt.Errorf("not found %s in port overrides", nameToFind)
}
return

if port.TargetPort > 0 {
return port.TargetPort, nil
}

return 0, fmt.Errorf("port mapping is undefined")
}

func (ep *Endpoint) PortMappings(ports []*PortMapping) (mapping map[int32]int32) {
mapping = make(map[int32]int32, len(ports))
for _, port := range ports {
mapping[port.Port] = ep.PortMapping(port)
p, err := ep.PortMapping(port)
if err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}
mapping[port.Port] = p
}
return
}

func (ep *Endpoint) PortNameMappings(ports []*PortMapping) (mapping map[string]int32) {
mapping = make(map[string]int32, len(ports))
for _, port := range ports {
mapping[port.Name] = ep.PortMapping(port)
p, err := ep.PortMapping(port)
if err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}
mapping[port.Name] = p
}
return
}
29 changes: 24 additions & 5 deletions api/localv1/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,22 @@ import "fmt"

func ExampleEndpointPortMapping() {
ports := []*PortMapping{
// name doesn't match -> ignore the rest
{Name: "http", TargetPortName: "t-http", TargetPort: 8080},
// name matches -> ignore the rest
{Name: "http2", TargetPortName: "t-http2", TargetPort: 800},
{Name: "metrics", TargetPortName: "t-metrics"},
// name matches -> ignore the rest
{Name: "metrics", TargetPortName: "http2"},
// name matches -> ignore the rest
{Name: "metrics", TargetPort: 80},
// name matches
{Name: "metrics"},
// targetPortName matches, no name -> ignore TargetPort
{TargetPortName: "metrics", TargetPort: 8080},
// targetPortName doesn't match, no name -> ignore targetPort
{TargetPortName: "t-metrics", TargetPort: 8080},
// nothing to match -> err expected
{},
}

ep := &Endpoint{
Expand All @@ -33,11 +46,17 @@ func ExampleEndpointPortMapping() {
}

for _, port := range ports {
fmt.Println(port.Name, ep.PortMapping(port))
p, err := ep.PortMapping(port)
fmt.Println(port.Name, p, err)
}

// Output:
// http 8080
// http2 888
// metrics 1011
// http 0 not found http in port overrides
// http2 888 <nil>
// metrics 1011 <nil>
// metrics 1011 <nil>
// metrics 1011 <nil>
// 1011 <nil>
// 0 not found t-metrics in port overrides
// 0 port mapping is undefined
}
17 changes: 8 additions & 9 deletions backends/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,10 +647,15 @@ func (t *iptables) createEndpointsChain(svcInfo *serviceInfo, allEndpoints *endp
ep = epInfo.IPs.V4[0]
}

targetPort := epInfo.PortMapping(&localv1.PortMapping{
targetPort, err := epInfo.PortMapping(&localv1.PortMapping{
Name: svcInfo.portName,
TargetPortName: svcInfo.targetPortName,
TargetPort: int32(svcInfo.targetPort),
})
if err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}
endpointPortMap[ep] = targetPort
endpoints = append(endpoints, &ep)

Expand Down Expand Up @@ -748,21 +753,15 @@ func (t *iptables) writeDNATRules(svcInfo *serviceInfo, svcName types.Namespaced

targetPort := t.getTargetPort(svcInfo, endpointPortMap, *epIP)

// this seems very sly to me. Doing this because there were 2 entries being added
// one with the right target port and one with zero
// write better logic or verify how baseServiceInfo & endpointPortMap are populated
if targetPort == 0 {
continue
}
// DNAT to final destination.
args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", net.JoinHostPort(*epIP, strconv.Itoa(targetPort)))
t.natRules.Write(args)
}
}

// if the targetPort is string, fetch the value from endpointPortMap
// if the targetPort is string or portName exists, fetch the value from endpointPortMap
func (t *iptables) getTargetPort(svcInfo *serviceInfo, endpointPortMap map[string]int32, endpoint string) int {
if svcInfo.TargetPortName() != "" {
if svcInfo.PortName() != "" || svcInfo.TargetPortName() != "" {
return int(endpointPortMap[endpoint])
}
return svcInfo.TargetPort()
Expand Down
6 changes: 4 additions & 2 deletions backends/nft/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nft
import (
"encoding/hex"
"fmt"
"k8s.io/klog/v2"
"net/netip"
"strconv"

Expand Down Expand Up @@ -63,8 +64,9 @@ func (ctx *renderContext) addEndpointChain(svc *localv1.Service, epIP EpIP, svcC
continue
}

targetPort := epIP.Endpoint.PortMapping(port)
if targetPort == 0 {
targetPort, err := epIP.Endpoint.PortMapping(port)
if err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}

Expand Down
4 changes: 3 additions & 1 deletion backends/nft/svc-chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package nft

import (
"k8s.io/klog/v2"
"strconv"

localv1 "sigs.k8s.io/kpng/api/localv1"
Expand Down Expand Up @@ -55,7 +56,8 @@ func (ctx *renderContext) addSvcChain(svc *localv1.Service, epIPs []EpIP) {
// filter endpoint based on port availability
subset := make([]EpIP, 0, len(epIPs))
for _, epIP := range epIPs {
if epIP.Endpoint.PortMapping(port) == 0 {
if _, err := epIP.Endpoint.PortMapping(port); err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}
subset = append(subset, epIP)
Expand Down
11 changes: 6 additions & 5 deletions client/plugins/conntrack/conntrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,15 @@ func (ct Conntrack) Callback(ch <-chan *client.ServiceEndpoints) {

for _, ep := range seps.Endpoints {
for _, epIP := range ep.IPs.All() {
p, err := ep.PortMapping(svcPort)
if err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}
flow := Flow{
IPPort: ipp,
EndpointIP: epIP,
TargetPort: ep.PortMapping(svcPort),
}

if flow.TargetPort == 0 {
continue // no target port found
TargetPort: p,
}

ct.flows.Get(flow.Key()).Set(flow)
Expand Down
8 changes: 7 additions & 1 deletion client/plugins/conntrack/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package conntrack

import (
"k8s.io/klog/v2"
"sigs.k8s.io/kpng/api/localv1"
"sigs.k8s.io/kpng/client/localsink"
)
Expand Down Expand Up @@ -89,7 +90,12 @@ func (ps *Sink) DeleteEndpoint(namespace, serviceName, key string) {
for _, epIP := range ep.IPs.All() {
targetPort = port.Port
if port.Port == 0 {
targetPort = int32(ep.PortMapping(port))
p, err := ep.PortMapping(port)
if err != nil {
klog.V(1).InfoS("failed to map port", "err", err)
continue
}
targetPort = p
}
flow := Flow{
IPPort: IPPort{Protocol: port.Protocol, DnatIP: svcIP, Port: targetPort},
Expand Down
5 changes: 2 additions & 3 deletions server/jobs/kube2store/slice-event-handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func (h sliceEventHandler) OnAdd(obj interface{}) {
ServiceName: serviceName,
SourceName: eps.Name,
Endpoint: &localv1.Endpoint{},
Conditions: &globalv1.EndpointConditions{},
Topology: &globalv1.TopologyInfo{},
}

Expand Down Expand Up @@ -85,8 +84,8 @@ func (h sliceEventHandler) OnAdd(obj interface{}) {
sort.Strings(info.Hints.Zones) // stable zone order
}

if r := sliceEndpoint.Conditions.Ready; r != nil && *r {
info.Conditions.Ready = true
if r := sliceEndpoint.Conditions.Ready; r != nil {
info.Conditions = &globalv1.EndpointConditions{Ready: *r}
}

for _, addr := range sliceEndpoint.Addresses {
Expand Down
2 changes: 1 addition & 1 deletion server/pkg/endpoints/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func ForNode(tx *proxystore.Tx, si *globalv1.ServiceInfo, nodeName string) (endp

info.Endpoint.Local = info.Topology.Node == nodeName

if !info.Conditions.Ready {
if info.Conditions != nil && !info.Conditions.Ready {
return
}

Expand Down

0 comments on commit 6ccd640

Please sign in to comment.