Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove non-reuseable bits of MasterServer #35795

Merged
merged 1 commit into from
Nov 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/genericapiserver/reststorage_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ import (
type RESTOptionsGetter func(resource unversioned.GroupResource) generic.RESTOptions

type RESTStorageProvider interface {
GroupName() string
NewRESTStorage(apiResourceConfigSource APIResourceConfigSource, restOptionsGetter RESTOptionsGetter) (APIGroupInfo, bool)
}
12 changes: 0 additions & 12 deletions pkg/master/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,38 +32,28 @@ go_library(
"//pkg/apis/apps/v1alpha1:go_default_library",
"//pkg/apis/authentication/install:go_default_library",
"//pkg/apis/authentication/v1beta1:go_default_library",
"//pkg/apis/authorization:go_default_library",
"//pkg/apis/authorization/install:go_default_library",
"//pkg/apis/authorization/v1beta1:go_default_library",
"//pkg/apis/autoscaling:go_default_library",
"//pkg/apis/autoscaling/install:go_default_library",
"//pkg/apis/autoscaling/v1:go_default_library",
"//pkg/apis/batch:go_default_library",
"//pkg/apis/batch/install:go_default_library",
"//pkg/apis/batch/v1:go_default_library",
"//pkg/apis/certificates:go_default_library",
"//pkg/apis/certificates/install:go_default_library",
"//pkg/apis/certificates/v1alpha1:go_default_library",
"//pkg/apis/componentconfig/install:go_default_library",
"//pkg/apis/extensions:go_default_library",
"//pkg/apis/extensions/install:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
"//pkg/apis/imagepolicy/install:go_default_library",
"//pkg/apis/policy:go_default_library",
"//pkg/apis/policy/install:go_default_library",
"//pkg/apis/policy/v1alpha1:go_default_library",
"//pkg/apis/rbac:go_default_library",
"//pkg/apis/rbac/install:go_default_library",
"//pkg/apis/rbac/v1alpha1:go_default_library",
"//pkg/apis/storage:go_default_library",
"//pkg/apis/storage/install:go_default_library",
"//pkg/apis/storage/v1beta1:go_default_library",
"//pkg/apiserver:go_default_library",
"//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library",
"//pkg/genericapiserver:go_default_library",
"//pkg/healthz:go_default_library",
"//pkg/kubelet/client:go_default_library",
"//pkg/master/ports:go_default_library",
"//pkg/master/thirdparty:go_default_library",
"//pkg/registry/apps/rest:go_default_library",
"//pkg/registry/authentication/rest:go_default_library",
Expand All @@ -84,12 +74,10 @@ go_library(
"//pkg/registry/rbac/rest:go_default_library",
"//pkg/registry/storage/rest:go_default_library",
"//pkg/routes:go_default_library",
"//pkg/storage/etcd/util:go_default_library",
"//pkg/util/async:go_default_library",
"//pkg/util/intstr:go_default_library",
"//pkg/util/net:go_default_library",
"//pkg/util/runtime:go_default_library",
"//pkg/util/sets:go_default_library",
"//pkg/util/wait:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/prometheus/client_golang/prometheus",
Expand Down
184 changes: 53 additions & 131 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,48 +18,32 @@ package master

import (
"fmt"
"net"
"net/http"
"net/url"
"reflect"
"strconv"
"strings"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
appsapi "k8s.io/kubernetes/pkg/apis/apps/v1alpha1"
authenticationv1beta1 "k8s.io/kubernetes/pkg/apis/authentication/v1beta1"
"k8s.io/kubernetes/pkg/apis/authorization"
authorizationapiv1beta1 "k8s.io/kubernetes/pkg/apis/authorization/v1beta1"
"k8s.io/kubernetes/pkg/apis/autoscaling"
autoscalingapiv1 "k8s.io/kubernetes/pkg/apis/autoscaling/v1"
"k8s.io/kubernetes/pkg/apis/batch"
batchapiv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
"k8s.io/kubernetes/pkg/apis/certificates"
certificatesapiv1alpha1 "k8s.io/kubernetes/pkg/apis/certificates/v1alpha1"
"k8s.io/kubernetes/pkg/apis/extensions"
extensionsapiv1beta1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/apis/policy"
policyapiv1alpha1 "k8s.io/kubernetes/pkg/apis/policy/v1alpha1"
"k8s.io/kubernetes/pkg/apis/rbac"
rbacapi "k8s.io/kubernetes/pkg/apis/rbac/v1alpha1"
"k8s.io/kubernetes/pkg/apis/storage"
storageapiv1beta1 "k8s.io/kubernetes/pkg/apis/storage/v1beta1"
"k8s.io/kubernetes/pkg/apiserver"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/healthz"
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/master/ports"
"k8s.io/kubernetes/pkg/master/thirdparty"

"k8s.io/kubernetes/pkg/registry/generic"
"k8s.io/kubernetes/pkg/registry/generic/registry"
"k8s.io/kubernetes/pkg/routes"
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
"k8s.io/kubernetes/pkg/util/sets"

"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -94,8 +78,7 @@ type Config struct {
DeleteCollectionWorkers int
EventTTL time.Duration
KubeletClientConfig kubeletclient.KubeletClientConfig
// genericapiserver.RESTStorageProviders provides RESTStorage building methods keyed by groupName
RESTStorageProviders map[string]genericapiserver.RESTStorageProvider

// Used to start and monitor tunneling
Tunneler genericapiserver.Tunneler
EnableUISupport bool
Expand All @@ -113,17 +96,6 @@ type EndpointReconcilerConfig struct {
// Master contains state for a Kubernetes cluster master/api server.
type Master struct {
GenericAPIServer *genericapiserver.GenericAPIServer

thirdPartyResourceServer *thirdparty.ThirdPartyResourceServer

// nodeClient is used to back the tunneler
nodeClient coreclient.NodeInterface
}

type RESTOptionsGetter func(resource unversioned.GroupResource) generic.RESTOptions

type RESTStorageProvider interface {
NewRESTStorage(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter RESTOptionsGetter) (groupInfo genericapiserver.APIGroupInfo, enabled bool)
}

type completedConfig struct {
Expand Down Expand Up @@ -178,9 +150,6 @@ func (c completedConfig) New() (*Master, error) {

m := &Master{
GenericAPIServer: s,
nodeClient: coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes(),

thirdPartyResourceServer: thirdparty.NewThirdPartyResourceServer(s),
}

restOptionsFactory := restOptionsFactory{
Expand All @@ -198,33 +167,30 @@ func (c completedConfig) New() (*Master, error) {
// install legacy rest storage
if c.GenericConfig.APIResourceConfigSource.AnyResourcesForVersionEnabled(apiv1.SchemeGroupVersion) {
legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
StorageFactory: c.StorageFactory,
ProxyTransport: c.ProxyTransport,
KubeletClientConfig: c.KubeletClientConfig,
EventTTL: c.EventTTL,
ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange,
ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange,
ComponentStatusServerFunc: func() map[string]apiserver.Server { return getServersToValidate(c.StorageFactory) },
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
StorageFactory: c.StorageFactory,
ProxyTransport: c.ProxyTransport,
KubeletClientConfig: c.KubeletClientConfig,
EventTTL: c.EventTTL,
ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange,
ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange,
LoopbackClientConfig: c.GenericConfig.LoopbackClientConfig,
}
m.InstallLegacyAPI(c.Config, restOptionsFactory.NewFor, legacyRESTStorageProvider)
}

// Add some hardcoded storage for now. Append to the map.
if c.RESTStorageProviders == nil {
c.RESTStorageProviders = map[string]genericapiserver.RESTStorageProvider{}
restStorageProviders := []genericapiserver.RESTStorageProvider{
appsrest.RESTStorageProvider{},
authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authenticator},
authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer},
autoscalingrest.RESTStorageProvider{},
batchrest.RESTStorageProvider{},
certificatesrest.RESTStorageProvider{},
extensionsrest.RESTStorageProvider{ResourceInterface: thirdparty.NewThirdPartyResourceServer(s, c.StorageFactory)},
policyrest.RESTStorageProvider{},
rbacrest.RESTStorageProvider{AuthorizerRBACSuperUser: c.GenericConfig.AuthorizerRBACSuperUser},
storagerest.RESTStorageProvider{},
}
c.RESTStorageProviders[appsapi.GroupName] = appsrest.RESTStorageProvider{}
c.RESTStorageProviders[authenticationv1beta1.GroupName] = authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authenticator}
c.RESTStorageProviders[authorization.GroupName] = authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorizer}
c.RESTStorageProviders[autoscaling.GroupName] = autoscalingrest.RESTStorageProvider{}
c.RESTStorageProviders[batch.GroupName] = batchrest.RESTStorageProvider{}
c.RESTStorageProviders[certificates.GroupName] = certificatesrest.RESTStorageProvider{}
c.RESTStorageProviders[extensions.GroupName] = extensionsrest.RESTStorageProvider{ResourceInterface: m.thirdPartyResourceServer}
c.RESTStorageProviders[policy.GroupName] = policyrest.RESTStorageProvider{}
c.RESTStorageProviders[rbac.GroupName] = &rbacrest.RESTStorageProvider{AuthorizerRBACSuperUser: c.GenericConfig.AuthorizerRBACSuperUser}
c.RESTStorageProviders[storage.GroupName] = storagerest.RESTStorageProvider{}
m.InstallAPIs(c.Config, restOptionsFactory.NewFor)
m.InstallAPIs(c.Config.GenericConfig.APIResourceConfigSource, restOptionsFactory.NewFor, restStorageProviders...)

m.InstallGeneralEndpoints(c.Config)

Expand Down Expand Up @@ -255,7 +221,9 @@ func (m *Master) InstallGeneralEndpoints(c *Config) {
// Run the tunneler.
healthzChecks := []healthz.HealthzChecker{}
if c.Tunneler != nil {
c.Tunneler.Run(m.getNodeAddresses)
nodeClient := coreclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig).Nodes()
c.Tunneler.Run(nodeAddressProvider{nodeClient}.externalAddresses)

healthzChecks = append(healthzChecks, healthz.NamedCheck("SSH Tunnel Check", genericapiserver.TunnelSyncHealthChecker(c.Tunneler)))
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Name: "apiserver_proxy_tunnel_sync_latency_secs",
Expand All @@ -272,34 +240,22 @@ func (m *Master) InstallGeneralEndpoints(c *Config) {

}

func (m *Master) InstallAPIs(c *Config, restOptionsGetter genericapiserver.RESTOptionsGetter) {
// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Master) InstallAPIs(apiResourceConfigSource genericapiserver.APIResourceConfigSource, restOptionsGetter genericapiserver.RESTOptionsGetter, restStorageProviders ...genericapiserver.RESTStorageProvider) {
apiGroupsInfo := []genericapiserver.APIGroupInfo{}

// Install third party resource support if requested
// TODO seems like this bit ought to be unconditional and the REST API is controlled by the config
if c.GenericConfig.APIResourceConfigSource.ResourceEnabled(extensionsapiv1beta1.SchemeGroupVersion.WithResource("thirdpartyresources")) {
var err error
// TODO figure out why this isn't a loopback client
m.thirdPartyResourceServer.ThirdPartyStorageConfig, err = c.StorageFactory.NewConfig(extensions.Resource("thirdpartyresources"))
if err != nil {
glog.Fatalf("Error getting third party storage: %v", err)
}
}

// stabilize order.
// TODO find a better way to configure priority of groups
for _, group := range sets.StringKeySet(c.RESTStorageProviders).List() {
if !c.GenericConfig.APIResourceConfigSource.AnyResourcesForGroupEnabled(group) {
glog.V(1).Infof("Skipping disabled API group %q.", group)
for _, restStorageBuilder := range restStorageProviders {
groupName := restStorageBuilder.GroupName()
if !apiResourceConfigSource.AnyResourcesForGroupEnabled(groupName) {
glog.V(1).Infof("Skipping disabled API group %q.", groupName)
continue
}
restStorageBuilder := c.RESTStorageProviders[group]
apiGroupInfo, enabled := restStorageBuilder.NewRESTStorage(c.GenericConfig.APIResourceConfigSource, restOptionsGetter)
apiGroupInfo, enabled := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
if !enabled {
glog.Warningf("Problem initializing API group %q, skipping.", group)
glog.Warningf("Problem initializing API group %q, skipping.", groupName)
continue
}
glog.V(1).Infof("Enabling API group %q.", group)
glog.V(1).Infof("Enabling API group %q.", groupName)

if postHookProvider, ok := restStorageBuilder.(genericapiserver.PostStartHookProvider); ok {
name, hook, err := postHookProvider.PostStartHook()
Expand All @@ -321,44 +277,6 @@ func (m *Master) InstallAPIs(c *Config, restOptionsGetter genericapiserver.RESTO
}
}

func getServersToValidate(storageFactory genericapiserver.StorageFactory) map[string]apiserver.Server {
serversToValidate := map[string]apiserver.Server{
"controller-manager": {Addr: "127.0.0.1", Port: ports.ControllerManagerPort, Path: "/healthz"},
"scheduler": {Addr: "127.0.0.1", Port: ports.SchedulerPort, Path: "/healthz"},
}

for ix, machine := range storageFactory.Backends() {
etcdUrl, err := url.Parse(machine)
if err != nil {
glog.Errorf("Failed to parse etcd url for validation: %v", err)
continue
}
var port int
var addr string
if strings.Contains(etcdUrl.Host, ":") {
var portString string
addr, portString, err = net.SplitHostPort(etcdUrl.Host)
if err != nil {
glog.Errorf("Failed to split host/port: %s (%v)", etcdUrl.Host, err)
continue
}
port, _ = strconv.Atoi(portString)
} else {
addr = etcdUrl.Host
port = 2379
}
// TODO: etcd health checking should be abstracted in the storage tier
serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{
Addr: addr,
EnableHTTPS: etcdUrl.Scheme == "https",
Port: port,
Path: "/health",
Validate: etcdutil.EtcdHealthCheck,
}
}
return serversToValidate
}

type restOptionsFactory struct {
deleteCollectionWorkers int
enableGarbageCollection bool
Expand All @@ -381,6 +299,27 @@ func (f restOptionsFactory) NewFor(resource unversioned.GroupResource) generic.R
}
}

type nodeAddressProvider struct {
nodeClient coreclient.NodeInterface
}

func (n nodeAddressProvider) externalAddresses() (addresses []string, err error) {
nodes, err := n.nodeClient.List(api.ListOptions{})
if err != nil {
return nil, err
}
addrs := []string{}
for ix := range nodes.Items {
node := &nodes.Items[ix]
addr, err := findExternalAddress(node)
if err != nil {
return nil, err
}
addrs = append(addrs, addr)
}
return addrs, nil
}

// findExternalAddress returns ExternalIP of provided node with fallback to LegacyHostIP.
func findExternalAddress(node *api.Node) (string, error) {
var fallback string
Expand All @@ -399,23 +338,6 @@ func findExternalAddress(node *api.Node) (string, error) {
return "", fmt.Errorf("Couldn't find external address: %v", node)
}

func (m *Master) getNodeAddresses() ([]string, error) {
nodes, err := m.nodeClient.List(api.ListOptions{})
if err != nil {
return nil, err
}
addrs := []string{}
for ix := range nodes.Items {
node := &nodes.Items[ix]
addr, err := findExternalAddress(node)
if err != nil {
return nil, err
}
addrs = append(addrs, addr)
}
return addrs, nil
}

func DefaultAPIResourceConfigSource() *genericapiserver.ResourceConfig {
ret := genericapiserver.NewResourceConfig()
ret.EnableVersions(
Expand Down