Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish externalIPs of Nodes running Pods in headless service #1391

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/apis/externaldns/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func (cfg *Config) ParseFlags(args []string) error {
app.Flag("compatibility", "Process annotation semantics from legacy implementations (optional, options: mate, molecule, kops-dns-controller)").Default(defaultConfig.Compatibility).EnumVar(&cfg.Compatibility, "", "mate", "molecule", "kops-dns-controller")
app.Flag("ignore-ingress-rules-spec", "Ignore rules spec section in ingresses resources, applicable only for ingress sources (optional, default: false)").BoolVar(&cfg.IgnoreIngressRulesSpec)
app.Flag("publish-internal-services", "Allow external-dns to publish DNS records for ClusterIP services (optional)").BoolVar(&cfg.PublishInternal)
app.Flag("publish-host-ip", "Allow external-dns to publish host-ip for headless services (optional)").BoolVar(&cfg.PublishHostIP)
app.Flag("publish-host-ip", "Allow external-dns to publish host-ip for headless services, optionally you could use 'external-dns.alpha.kubernetes.io/access' annotation on the Service to control whether to publish ExternalIP of the Host Node or InternalIP which is a default (optional)").BoolVar(&cfg.PublishHostIP)
app.Flag("always-publish-not-ready-addresses", "Always publish also not ready addresses for headless services (optional)").BoolVar(&cfg.AlwaysPublishNotReadyAddresses)
app.Flag("connector-source-server", "The server to connect for connector source, valid only when using connector source").Default(defaultConfig.ConnectorSourceServer).StringVar(&cfg.ConnectorSourceServer)
app.Flag("crd-source-apiversion", "API version of the CRD for crd source, e.g. `externaldns.k8s.io/v1alpha1`, valid only when using crd source").Default(defaultConfig.CRDSourceAPIVersion).StringVar(&cfg.CRDSourceAPIVersion)
Expand Down
36 changes: 27 additions & 9 deletions source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,29 @@ func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname stri
}

for _, headlessDomain := range headlessDomains {
var ep string
var ep []string
if sc.publishHostIP {
ep = pod.Status.HostIP
log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, ep)
internalIPs := []string{pod.Status.HostIP}
var externalIPs endpoint.Targets

node, err := sc.nodeInformer.Lister().Get(pod.Spec.NodeName)
if err != nil {
log.Errorf("Node %s not found", pod.Spec.NodeName)
return nil
}
for _, address := range node.Status.Addresses {
if address.Type == v1.NodeExternalIP {
externalIPs = append(externalIPs, address.Address)
}
}

access := getAccessFromAnnotations(svc.Annotations)
ep = append(ep, endpointsByAccessType(access, externalIPs, internalIPs)...)
} else {
ep = address.IP
ep = append(ep, address.IP)
log.Debugf("Generating matching endpoint %s with EndpointAddress IP %s", headlessDomain, ep)
}
targetsByHeadlessDomain[headlessDomain] = append(targetsByHeadlessDomain[headlessDomain], ep)
targetsByHeadlessDomain[headlessDomain] = append(targetsByHeadlessDomain[headlessDomain], ep...)
}
}
}
Expand Down Expand Up @@ -595,16 +609,20 @@ func (sc *serviceSource) extractNodePortTargets(svc *v1.Service) (endpoint.Targe
}

access := getAccessFromAnnotations(svc.Annotations)
return endpointsByAccessType(access, externalIPs, internalIPs), nil
}

func endpointsByAccessType(access string, externalIPs endpoint.Targets, internalIPs endpoint.Targets) endpoint.Targets {
if access == "public" {
return externalIPs, nil
return externalIPs
}
if access == "private" {
return internalIPs, nil
return internalIPs
}
if len(externalIPs) > 0 {
return externalIPs, nil
return externalIPs
}
return internalIPs, nil
return internalIPs
}

func (sc *serviceSource) extractNodePortEndpoints(svc *v1.Service, nodeTargets endpoint.Targets, hostname string, ttl endpoint.TTL) []*endpoint.Endpoint {
Expand Down
303 changes: 303 additions & 0 deletions source/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2948,10 +2948,23 @@ func TestHeadlessServicesHostIP(t *testing.T) {
var addresses []v1.EndpointAddress
var notReadyAddresses []v1.EndpointAddress
for i, podname := range tc.podnames {
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-" + podname,
},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{},
},
}
_, err = kubernetes.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
require.NoError(t, err)

pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{},
Hostname: tc.hostnames[i],
NodeName: node.Name,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Expand Down Expand Up @@ -3022,6 +3035,296 @@ func TestHeadlessServicesHostIP(t *testing.T) {
}
}

// TestHeadlessServicesExternalHostIP tests that headless services generate the correct endpoints.
func TestHeadlessServicesExternalHostIP(t *testing.T) {
for _, tc := range []struct {
title string
targetNamespace string
svcNamespace string
svcName string
svcType v1.ServiceType
compatibility string
fqdnTemplate string
ignoreHostnameAnnotation bool
labels map[string]string
annotations map[string]string
clusterIP string
hostIPs []string
selector map[string]string
lbs []string
podnames []string
hostnames []string
podsReady []bool
publishNotReadyAddresses bool
expected []*endpoint.Endpoint
expectError bool
}{
{
"annotated Headless services return endpoints for each selected Pod",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
false,
map[string]string{"component": "foo"},
map[string]string{
accessAnnotationKey: "public",
hostnameAnnotationKey: "service.example.org",
},
v1.ClusterIPNone,
[]string{"1.1.1.1", "1.1.1.2"},
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]bool{true, true},
false,
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
{DNSName: "foo-1.service.example.org", Targets: endpoint.Targets{"1.1.1.2"}},
{DNSName: "service.example.org", Targets: endpoint.Targets{"1.1.1.1", "1.1.1.2"}},
},
false,
},
{
"hostname annotated Headless services are ignored",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
true,
map[string]string{"component": "foo"},
map[string]string{
accessAnnotationKey: "public",
hostnameAnnotationKey: "service.example.org",
},
v1.ClusterIPNone,
[]string{"1.1.1.1", "1.1.1.2"},
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]bool{true, true},
false,
[]*endpoint.Endpoint{},
false,
},
{
"annotated Headless services return endpoints with TTL for each selected Pod",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
false,
map[string]string{"component": "foo"},
map[string]string{
accessAnnotationKey: "public",
hostnameAnnotationKey: "service.example.org",
ttlAnnotationKey: "1",
},
v1.ClusterIPNone,
[]string{"1.1.1.1", "1.1.1.2"},
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]bool{true, true},
false,
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}, RecordTTL: endpoint.TTL(1)},
{DNSName: "foo-1.service.example.org", Targets: endpoint.Targets{"1.1.1.2"}, RecordTTL: endpoint.TTL(1)},
{DNSName: "service.example.org", Targets: endpoint.Targets{"1.1.1.1", "1.1.1.2"}, RecordTTL: endpoint.TTL(1)},
},
false,
},
{
"annotated Headless services return endpoints for each selected Pod, which are in running state",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
false,
map[string]string{"component": "foo"},
map[string]string{
accessAnnotationKey: "public",
hostnameAnnotationKey: "service.example.org",
},
v1.ClusterIPNone,
[]string{"1.1.1.1", "1.1.1.2"},
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"foo-0", "foo-1"},
[]bool{true, false},
false,
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
{DNSName: "service.example.org", Targets: endpoint.Targets{"1.1.1.1"}},
},
false,
},
{
"annotated Headless services return endpoints for pods missing hostname",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
false,
map[string]string{"component": "foo"},
map[string]string{
accessAnnotationKey: "public",
hostnameAnnotationKey: "service.example.org",
},
v1.ClusterIPNone,
[]string{"1.1.1.1", "1.1.1.2"},
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo-0", "foo-1"},
[]string{"", ""},
[]bool{true, true},
false,
[]*endpoint.Endpoint{
{DNSName: "service.example.org", Targets: endpoint.Targets{"1.1.1.1", "1.1.1.2"}},
},
false,
},
} {
t.Run(tc.title, func(t *testing.T) {
// Create a Kubernetes testing client
kubernetes := fake.NewSimpleClientset()

service := &v1.Service{
Spec: v1.ServiceSpec{
Type: tc.svcType,
ClusterIP: tc.clusterIP,
Selector: tc.selector,
PublishNotReadyAddresses: tc.publishNotReadyAddresses,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: tc.svcName,
Labels: tc.labels,
Annotations: tc.annotations,
},
Status: v1.ServiceStatus{},
}
_, err := kubernetes.CoreV1().Services(service.Namespace).Create(context.TODO(), service, metav1.CreateOptions{})
require.NoError(t, err)

var addresses []v1.EndpointAddress
var notReadyAddresses []v1.EndpointAddress
for i, podname := range tc.podnames {
node := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-" + podname,
},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Addresses: []v1.NodeAddress{{Type: v1.NodeExternalIP, Address: tc.hostIPs[i]}},
},
}
_, err = kubernetes.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
require.NoError(t, err)

pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{},
Hostname: tc.hostnames[i],
NodeName: node.Name,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: podname,
Labels: tc.labels,
Annotations: tc.annotations,
},
Status: v1.PodStatus{
HostIP: tc.hostIPs[i],
},
}

_, err = kubernetes.CoreV1().Pods(tc.svcNamespace).Create(context.TODO(), pod, metav1.CreateOptions{})
require.NoError(t, err)

address := v1.EndpointAddress{
IP: "4.3.2.1",
TargetRef: &v1.ObjectReference{
APIVersion: "",
Kind: "Pod",
Name: podname,
},
}
if tc.podsReady[i] {
addresses = append(addresses, address)
} else {
notReadyAddresses = append(notReadyAddresses, address)
}
}
endpointsObject := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: tc.svcName,
Labels: tc.labels,
},
Subsets: []v1.EndpointSubset{
{
Addresses: addresses,
NotReadyAddresses: notReadyAddresses,
},
},
}
_, err = kubernetes.CoreV1().Endpoints(tc.svcNamespace).Create(context.TODO(), endpointsObject, metav1.CreateOptions{})
require.NoError(t, err)

// Create our object under test and get the endpoints.
client, _ := NewServiceSource(
kubernetes,
tc.targetNamespace,
"",
tc.fqdnTemplate,
false,
tc.compatibility,
true,
true,
false,
[]string{},
tc.ignoreHostnameAnnotation,
)
require.NoError(t, err)

endpoints, err := client.Endpoints(context.TODO())
if tc.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}

// Validate returned endpoints against desired endpoints.
validateEndpoints(t, endpoints, tc.expected)
})
}
}

// TestExternalServices tests that external services generate the correct endpoints.
func TestExternalServices(t *testing.T) {
t.Parallel()
Expand Down