Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Federation][kubefed] Add option to expose federation apiserver on nodeport service #40516

Merged
merged 1 commit into from Feb 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions federation/pkg/kubefed/init/BUILD
Expand Up @@ -16,6 +16,7 @@ go_library(
"//cmd/kubeadm/app/phases/kubeconfig:go_default_library",
"//federation/pkg/kubefed/util:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/rbac:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
Expand All @@ -25,7 +26,6 @@ go_library(
"//vendor:github.com/spf13/cobra",
"//vendor:k8s.io/apimachinery/pkg/api/resource",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/tools/clientcmd",
"//vendor:k8s.io/client-go/tools/clientcmd/api",
Expand Down Expand Up @@ -54,7 +54,6 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/api/resource",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/util/diff",
"//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/client-go/dynamic",
"//vendor:k8s.io/client-go/rest/fake",
"//vendor:k8s.io/client-go/tools/clientcmd",
Expand Down
124 changes: 100 additions & 24 deletions federation/pkg/kubefed/init/init.go
Expand Up @@ -33,12 +33,13 @@ package init
import (
"fmt"
"io"
"net"
"strconv"
"strings"
"time"

"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
Expand All @@ -47,6 +48,7 @@ import (
kubeadmkubeconfigphase "k8s.io/kubernetes/cmd/kubeadm/app/phases/kubeconfig"
"k8s.io/kubernetes/federation/pkg/kubefed/util"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/apis/rbac"
client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
Expand Down Expand Up @@ -76,6 +78,9 @@ const (

lbAddrRetryInterval = 5 * time.Second
podWaitInterval = 2 * time.Second

apiserverServiceTypeFlag = "api-server-service-type"
apiserverAdvertiseAddressFlag = "api-server-advertise-address"
)

var (
Expand Down Expand Up @@ -137,6 +142,8 @@ func NewCmdInit(cmdOut io.Writer, config util.AdminConfig) *cobra.Command {
cmd.Flags().Bool("etcd-persistent-storage", true, "Use persistent volume for etcd. Defaults to 'true'.")
cmd.Flags().Bool("dry-run", false, "dry run without sending commands to server.")
cmd.Flags().String("storage-backend", "etcd2", "The storage backend for persistence. Options: 'etcd2' (default), 'etcd3'.")
cmd.Flags().String(apiserverServiceTypeFlag, string(v1.ServiceTypeLoadBalancer), "The type of service to create for federation API server. Options: 'LoadBalancer' (default), 'NodePort'.")
cmd.Flags().String(apiserverAdvertiseAddressFlag, "", "Preferred address to advertise api server nodeport service. Valid only if '"+apiserverServiceTypeFlag+"=NodePort'.")
return cmd
}

Expand All @@ -162,6 +169,21 @@ func initFederation(cmdOut io.Writer, config util.AdminConfig, cmd *cobra.Comman
etcdPersistence := cmdutil.GetFlagBool(cmd, "etcd-persistent-storage")
dryRun := cmdutil.GetDryRunFlag(cmd)
storageBackend := cmdutil.GetFlagString(cmd, "storage-backend")
apiserverServiceType := v1.ServiceType(cmdutil.GetFlagString(cmd, apiserverServiceTypeFlag))
apiserverAdvertiseAddress := cmdutil.GetFlagString(cmd, apiserverAdvertiseAddressFlag)

if apiserverServiceType != v1.ServiceTypeLoadBalancer && apiserverServiceType != v1.ServiceTypeNodePort {
return fmt.Errorf("invalid %s: %s, should be either %s or %s", apiserverServiceTypeFlag, apiserverServiceType, v1.ServiceTypeLoadBalancer, v1.ServiceTypeNodePort)
}
if apiserverAdvertiseAddress != "" {
ip := net.ParseIP(apiserverAdvertiseAddress)
if ip == nil {
return fmt.Errorf("invalid %s: %s, should be a valid ip address", apiserverAdvertiseAddressFlag, apiserverAdvertiseAddress)
}
if apiserverServiceType != v1.ServiceTypeNodePort {
return fmt.Errorf("%s should be passed only with '%s=NodePort'", apiserverAdvertiseAddressFlag, apiserverServiceTypeFlag)
}
}

hostFactory := config.HostFactory(initFlags.Host, initFlags.Kubeconfig)
hostClientset, err := hostFactory.ClientSet()
Expand All @@ -181,11 +203,7 @@ func initFederation(cmdOut io.Writer, config util.AdminConfig, cmd *cobra.Comman
}

// 2. Expose a network endpoint for the federation API server
svc, err := createService(hostClientset, initFlags.FederationSystemNamespace, serverName, dryRun)
if err != nil {
return err
}
ips, hostnames, err := waitForLoadBalancerAddress(hostClientset, svc, dryRun)
svc, ips, hostnames, err := createService(hostClientset, initFlags.FederationSystemNamespace, serverName, apiserverAdvertiseAddress, apiserverServiceType, dryRun)
if err != nil {
return err
}
Expand Down Expand Up @@ -220,16 +238,12 @@ func initFederation(cmdOut io.Writer, config util.AdminConfig, cmd *cobra.Comman

// Since only one IP address can be specified as advertise address,
// we arbitrarily pick the first available IP address
advertiseAddress := ""
if len(ips) > 0 {
// Pick user provided apiserverAdvertiseAddress over other available IP addresses.
advertiseAddress := apiserverAdvertiseAddress
if advertiseAddress == "" && len(ips) > 0 {
advertiseAddress = ips[0]
}

endpoint := advertiseAddress
if advertiseAddress == "" && len(hostnames) > 0 {
endpoint = hostnames[0]
}

// 6. Create federation API server
_, err = createAPIServer(hostClientset, initFlags.FederationSystemNamespace, serverName, image, serverCredName, advertiseAddress, storageBackend, pvc, dryRun)
if err != nil {
Expand Down Expand Up @@ -257,6 +271,19 @@ func initFederation(cmdOut io.Writer, config util.AdminConfig, cmd *cobra.Comman
return err
}

// Pick the first ip/hostname to update the api server endpoint in kubeconfig and also to give information to user
// In case of NodePort Service for api server, ips are node external ips.
endpoint := ""
if len(ips) > 0 {
endpoint = ips[0]
} else if len(hostnames) > 0 {
endpoint = hostnames[0]
}
// If the service is nodeport, need to append the port to endpoint as it is non-standard port
if apiserverServiceType == v1.ServiceTypeNodePort {
endpoint = endpoint + ":" + strconv.Itoa(int(svc.Spec.Ports[0].NodePort))
}

// 8. Write the federation API server endpoint info, credentials
// and context to kubeconfig
err = updateKubeconfig(config, initFlags.Name, endpoint, entKeyPairs, dryRun)
Expand All @@ -274,7 +301,7 @@ func initFederation(cmdOut io.Writer, config util.AdminConfig, cmd *cobra.Comman
if err != nil {
return err
}
return printSuccess(cmdOut, ips, hostnames)
return printSuccess(cmdOut, ips, hostnames, svc)
}
_, err = fmt.Fprintf(cmdOut, "Federation control plane runs (dry run)\n")
return err
Expand All @@ -294,32 +321,73 @@ func createNamespace(clientset *client.Clientset, namespace string, dryRun bool)
return clientset.Core().Namespaces().Create(ns)
}

func createService(clientset *client.Clientset, namespace, svcName string, dryRun bool) (*api.Service, error) {
func createService(clientset *client.Clientset, namespace, svcName, apiserverAdvertiseAddress string, apiserverServiceType v1.ServiceType, dryRun bool) (*api.Service, []string, []string, error) {
svc := &api.Service{
ObjectMeta: metav1.ObjectMeta{
Name: svcName,
Namespace: namespace,
Labels: componentLabel,
},
Spec: api.ServiceSpec{
Type: api.ServiceTypeLoadBalancer,
Type: api.ServiceType(apiserverServiceType),
Selector: apiserverSvcSelector,
Ports: []api.ServicePort{
{
Name: "https",
Protocol: "TCP",
Port: 443,
TargetPort: intstr.FromInt(443),
Name: "https",
Protocol: "TCP",
Port: 443,
},
},
},
}

if dryRun {
return svc, nil
return svc, nil, nil, nil
}

var err error
svc, err = clientset.Core().Services(namespace).Create(svc)

ips := []string{}
hostnames := []string{}
if apiserverServiceType == v1.ServiceTypeLoadBalancer {
ips, hostnames, err = waitForLoadBalancerAddress(clientset, svc, dryRun)
} else {
if apiserverAdvertiseAddress != "" {
ips = append(ips, apiserverAdvertiseAddress)
} else {
ips, err = getClusterNodeIPs(clientset)
}
}
if err != nil {
return svc, nil, nil, err
}

return svc, ips, hostnames, err
}

func getClusterNodeIPs(clientset *client.Clientset) ([]string, error) {
preferredAddressTypes := []api.NodeAddressType{
api.NodeExternalIP,
}
nodeList, err := clientset.Nodes().List(metav1.ListOptions{})
if err != nil {
return nil, err
}
nodeAddresses := []string{}
for _, node := range nodeList.Items {
OuterLoop:
for _, addressType := range preferredAddressTypes {
for _, address := range node.Status.Addresses {
if address.Type == addressType {
nodeAddresses = append(nodeAddresses, address.Address)
break OuterLoop
}
}
}
}

return clientset.Core().Services(namespace).Create(svc)
return nodeAddresses, nil
}

func waitForLoadBalancerAddress(clientset *client.Clientset, svc *api.Service, dryRun bool) ([]string, []string, error) {
Expand Down Expand Up @@ -720,9 +788,17 @@ func waitSrvHealthy(config util.AdminConfig, context, kubeconfig string) error {
return err
}

func printSuccess(cmdOut io.Writer, ips, hostnames []string) error {
func printSuccess(cmdOut io.Writer, ips, hostnames []string, svc *api.Service) error {
svcEndpoints := append(ips, hostnames...)
_, err := fmt.Fprintf(cmdOut, "Federation API server is running at: %s\n", strings.Join(svcEndpoints, ", "))
endpoints := strings.Join(svcEndpoints, ", ")
if svc.Spec.Type == api.ServiceTypeNodePort {
endpoints = ips[0] + ":" + strconv.Itoa(int(svc.Spec.Ports[0].NodePort))
if len(ips) > 1 {
endpoints = endpoints + ", ..."
}
}

_, err := fmt.Fprintf(cmdOut, "Federation API server is running at: %s\n", endpoints)
return err
}

Expand Down