New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
aggregator: unify resolver implementation and tests #46623
Changes from all commits
35335c0
d567594
b1f7087
8b409e8
c7d9a39
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,61 +24,66 @@ import ( | |
"strconv" | ||
|
||
"k8s.io/apimachinery/pkg/api/errors" | ||
utilnet "k8s.io/apimachinery/pkg/util/net" | ||
listersv1 "k8s.io/client-go/listers/core/v1" | ||
"k8s.io/client-go/pkg/api/v1" | ||
|
||
"k8s.io/apimachinery/pkg/util/intstr" | ||
) | ||
|
||
// findServicePort finds the service port by name or numerically. | ||
func findServicePort(svc *v1.Service, port intstr.IntOrString) (*v1.ServicePort, error) { | ||
for _, svcPort := range svc.Spec.Ports { | ||
if (port.Type == intstr.Int && int32(svcPort.Port) == port.IntVal) || (port.Type == intstr.String && svcPort.Name == port.StrVal) { | ||
return &svcPort, nil | ||
} | ||
} | ||
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %q found for service %q", port.String(), svc.Name)) | ||
} | ||
|
||
// ResourceLocation returns a URL to which one can send traffic for the specified service. | ||
func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.EndpointsLister, namespace, id string) (*url.URL, error) { | ||
// Allow ID as "svcname", "svcname:port", or "scheme:svcname:port". | ||
svcScheme, svcName, portStr, valid := utilnet.SplitSchemeNamePort(id) | ||
if !valid { | ||
return nil, errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id)) | ||
svc, err := services.Services(namespace).Get(id) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// If a port *number* was specified, find the corresponding service port name | ||
if portNum, err := strconv.ParseInt(portStr, 10, 64); err == nil { | ||
svc, err := services.Services(namespace).Get(svcName) | ||
if err != nil { | ||
return nil, err | ||
} | ||
found := false | ||
for _, svcPort := range svc.Spec.Ports { | ||
if int64(svcPort.Port) == portNum { | ||
// use the declared port's name | ||
portStr = svcPort.Name | ||
found = true | ||
break | ||
} | ||
} | ||
if !found { | ||
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no service port %d found for service %q", portNum, svcName)) | ||
} | ||
port := intstr.FromInt(443) | ||
svcPort, err := findServicePort(svc, port) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
eps, err := endpoints.Endpoints(namespace).Get(svcName) | ||
switch { | ||
case svc.Spec.Type == v1.ServiceTypeClusterIP, svc.Spec.Type == v1.ServiceTypeLoadBalancer, svc.Spec.Type == v1.ServiceTypeNodePort: | ||
// these are fine | ||
default: | ||
return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type) | ||
} | ||
|
||
eps, err := endpoints.Endpoints(namespace).Get(svc.Name) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if len(eps.Subsets) == 0 { | ||
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svcName)) | ||
return nil, errors.NewServiceUnavailable(fmt.Sprintf("no endpoints available for service %q", svc.Name)) | ||
} | ||
|
||
// Pick a random Subset to start searching from. | ||
ssSeed := rand.Intn(len(eps.Subsets)) | ||
|
||
// Find a Subset that has the port. | ||
for ssi := 0; ssi < len(eps.Subsets); ssi++ { | ||
ss := &eps.Subsets[(ssSeed+ssi)%len(eps.Subsets)] | ||
if len(ss.Addresses) == 0 { | ||
continue | ||
} | ||
for i := range ss.Ports { | ||
if ss.Ports[i].Name == portStr { | ||
if ss.Ports[i].Name == svcPort.Name { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not for now but there is something odd about the case where we are asked for a port number match, find the first matching port number and the pick all the routes which match that routes port name. If ports are consistently names there won't be an issue but if they aren't the behavior is going to be very hard to debug. |
||
// Pick a random address. | ||
ip := ss.Addresses[rand.Intn(len(ss.Addresses))].IP | ||
port := int(ss.Ports[i].Port) | ||
return &url.URL{ | ||
Scheme: svcScheme, | ||
Scheme: "https", | ||
Host: net.JoinHostPort(ip, strconv.Itoa(port)), | ||
}, nil | ||
} | ||
|
@@ -88,30 +93,35 @@ func ResolveEndpoint(services listersv1.ServiceLister, endpoints listersv1.Endpo | |
} | ||
|
||
func ResolveCluster(services listersv1.ServiceLister, namespace, id string) (*url.URL, error) { | ||
if len(id) == 0 { | ||
return &url.URL{Scheme: "https"}, nil | ||
svc, err := services.Services(namespace).Get(id) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
destinationHost := id + "." + namespace + ".svc" | ||
service, err := services.Services(namespace).Get(id) | ||
if err != nil { | ||
port := intstr.FromInt(443) | ||
|
||
switch { | ||
case svc.Spec.Type == v1.ServiceTypeClusterIP && svc.Spec.ClusterIP == v1.ClusterIPNone: | ||
return nil, fmt.Errorf(`cannot route to service with ClusterIP "None"`) | ||
// use IP from a clusterIP for these service types | ||
case svc.Spec.Type == v1.ServiceTypeClusterIP, svc.Spec.Type == v1.ServiceTypeLoadBalancer, svc.Spec.Type == v1.ServiceTypeNodePort: | ||
svcPort, err := findServicePort(svc, port) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &url.URL{ | ||
Scheme: "https", | ||
Host: destinationHost, | ||
Host: net.JoinHostPort(svc.Spec.ClusterIP, fmt.Sprintf("%d", svcPort.Port)), | ||
}, nil | ||
} | ||
switch { | ||
// use IP from a clusterIP for these service types | ||
case service.Spec.Type == v1.ServiceTypeClusterIP, | ||
service.Spec.Type == v1.ServiceTypeNodePort, | ||
service.Spec.Type == v1.ServiceTypeLoadBalancer: | ||
case svc.Spec.Type == v1.ServiceTypeExternalName: | ||
if port.Type != intstr.Int { | ||
return nil, fmt.Errorf("named ports not supported") | ||
} | ||
return &url.URL{ | ||
Scheme: "https", | ||
Host: service.Spec.ClusterIP, | ||
Host: net.JoinHostPort(svc.Spec.ExternalName, port.String()), | ||
}, nil | ||
default: | ||
return nil, fmt.Errorf("unsupported service type %q", svc.Spec.Type) | ||
} | ||
return &url.URL{ | ||
Scheme: "https", | ||
Host: destinationHost, | ||
}, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have change the meaning of the id field (it used to be scheme : service name : port) it is now just the service name. I believe the caller at either https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go#L153-L155 or https://github.com/kubernetes/kubernetes/blob/master/staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go#L116 will need to be fixed to reflect this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did. But I am struggling to find out where we decide/document that a service name (which comes from APIService as far as I see) can be in this colon syntax.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran your fix on my GCE cluster. Despite my worries looks like it works just fine.