Skip to content

Commit

Permalink
REVIEW: refactoring: merge storage factory code paths
Browse files Browse the repository at this point in the history
  • Loading branch information
sttts authored and deads2k committed Apr 27, 2017
1 parent 36ea264 commit b5c285d
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 151 deletions.
5 changes: 4 additions & 1 deletion pkg/cmd/server/admin/overwrite_bootstrappolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ func (o OverwriteBootstrapPolicyOptions) OverwriteBootstrapPolicy() error {
}

// this brings in etcd server client libraries
optsGetter := originrest.StorageOptions(*masterConfig)
optsGetter, err := originrest.StorageOptions(*masterConfig)
if err != nil {
return err
}

return OverwriteBootstrapPolicy(optsGetter, o.File, o.CreateBootstrapPolicyCommand, o.Force, o.Out)
}
Expand Down
64 changes: 22 additions & 42 deletions pkg/cmd/server/kubernetes/master/master_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
openapicommon "k8s.io/apimachinery/pkg/openapi"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
Expand All @@ -41,13 +42,14 @@ import (
kapiserveroptions "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
cmapp "k8s.io/kubernetes/cmd/kube-controller-manager/app/options"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/apps"
"k8s.io/kubernetes/pkg/apis/autoscaling"
"k8s.io/kubernetes/pkg/apis/batch"
batchv2alpha1 "k8s.io/kubernetes/pkg/apis/batch/v2alpha1"
"k8s.io/kubernetes/pkg/apis/extensions"
kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kinternalclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/kubeapiserver"
"k8s.io/kubernetes/pkg/master"
"k8s.io/kubernetes/pkg/registry/cachesize"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
Expand Down Expand Up @@ -140,6 +142,11 @@ func BuildKubeAPIserverOptions(masterConfig configapi.MasterConfig) (*kapiserver
}

server.Etcd.EnableGarbageCollection = false // disabled until we add the controller. MUST be in synced with the value in CMServer
server.Etcd.StorageConfig.Prefix = masterConfig.EtcdStorageConfig.KubernetesStoragePrefix
server.Etcd.StorageConfig.ServerList = masterConfig.EtcdClientInfo.URLs
server.Etcd.StorageConfig.KeyFile = masterConfig.EtcdClientInfo.ClientCert.KeyFile
server.Etcd.StorageConfig.CertFile = masterConfig.EtcdClientInfo.ClientCert.CertFile
server.Etcd.StorageConfig.CAFile = masterConfig.EtcdClientInfo.CA

server.GenericServerRunOptions.MaxRequestsInFlight = masterConfig.ServingInfo.MaxRequestsInFlight
server.GenericServerRunOptions.MinRequestTimeout = masterConfig.ServingInfo.RequestTimeoutSeconds
Expand All @@ -163,57 +170,30 @@ func BuildKubeAPIserverOptions(masterConfig configapi.MasterConfig) (*kapiserver
}

// BuildStorageFactory builds a storage factory based on server.Etcd.StorageConfig with overrides from masterConfig.
// This storage factory is ONLY USED FOR kubernetes registries right now. Compare pkg/util/restoptions/configgetter.go.
func BuildStorageFactory(masterConfig configapi.MasterConfig, server *kapiserveroptions.ServerRunOptions) (apiserverstorage.StorageFactory, error) {
// This storage factory is used for kubernetes and origin registries. Compare pkg/util/restoptions/configgetter.go.
func BuildStorageFactory(masterConfig configapi.MasterConfig, server *kapiserveroptions.ServerRunOptions, enforcedStorageVersions []schema.GroupVersionResource) (*apiserverstorage.DefaultStorageFactory, error) {
resourceEncodingConfig := apiserverstorage.NewDefaultResourceEncodingConfig(kapi.Registry)
resourceEncodingConfig.SetVersionEncoding(
kapi.GroupName,
schema.GroupVersion{Group: kapi.GroupName, Version: masterConfig.EtcdStorageConfig.KubernetesStorageVersion},
kapi.SchemeGroupVersion,
)

resourceEncodingConfig.SetVersionEncoding(
extensions.GroupName,
schema.GroupVersion{Group: extensions.GroupName, Version: "v1beta1"},
extensions.SchemeGroupVersion,
)

resourceEncodingConfig.SetVersionEncoding(
batch.GroupName,
schema.GroupVersion{Group: batch.GroupName, Version: "v1"},
batch.SchemeGroupVersion,
)

resourceEncodingConfig.SetVersionEncoding(
autoscaling.GroupName,
schema.GroupVersion{Group: autoscaling.GroupName, Version: "v1"},
autoscaling.SchemeGroupVersion,
)

storageGroupsToEncodingVersion, err := server.StorageSerialization.StorageGroupsToEncodingVersion()
if err != nil {
return nil, err
}
for group, storageEncodingVersion := range storageGroupsToEncodingVersion {
resourceEncodingConfig.SetVersionEncoding(group, storageEncodingVersion, schema.GroupVersion{Group: group, Version: runtime.APIVersionInternal})
}
resourceEncodingConfig.SetResourceEncoding(batch.Resource("cronjobs"), batchv2alpha1.SchemeGroupVersion, batch.SchemeGroupVersion)

// use the stock storage config based on args, but override bits from our config where appropriate
etcdConfig := server.Etcd.StorageConfig
etcdConfig.Prefix = masterConfig.EtcdStorageConfig.KubernetesStoragePrefix
etcdConfig.ServerList = masterConfig.EtcdClientInfo.URLs
etcdConfig.KeyFile = masterConfig.EtcdClientInfo.ClientCert.KeyFile
etcdConfig.CertFile = masterConfig.EtcdClientInfo.ClientCert.CertFile
etcdConfig.CAFile = masterConfig.EtcdClientInfo.CA
// use legacy group name "" for all resources that existed when apigroups were introduced
for _, gvr := range enforcedStorageVersions {
resourceEncodingConfig.SetResourceEncoding(gvr.GroupResource(), schema.GroupVersion{Version: gvr.Version}, schema.GroupVersion{Version: runtime.APIVersionInternal})
}

storageFactory, err := kubeapiserver.NewStorageFactory(
etcdConfig,
storageFactory := apiserverstorage.NewDefaultStorageFactory(
server.Etcd.StorageConfig,
server.Etcd.DefaultStorageMediaType,
kapi.Codecs,
// FIXME: is this supposed to be resourceEncodingConfig???
apiserverstorage.NewDefaultResourceEncodingConfig(kapi.Registry),
storageGroupsToEncodingVersion,
// FIXME: this GroupVersionResource override should be configurable
[]schema.GroupVersionResource{batch.Resource("cronjobs").WithVersion("v2alpha1")},
resourceEncodingConfig,
master.DefaultAPIResourceConfigSource(),
server.APIEnablement.RuntimeConfig,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -416,7 +396,7 @@ func buildKubeApiserverConfig(
return nil, err
}

storageFactory, err := BuildStorageFactory(masterConfig, apiserverOptions)
storageFactory, err := BuildStorageFactory(masterConfig, apiserverOptions, nil)
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/cmd/server/origin/master_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ func BuildMasterConfig(options configapi.MasterConfig) (*MasterConfig, error) {
return nil, err
}

restOptsGetter := originrest.StorageOptions(options)
restOptsGetter, err := originrest.StorageOptions(options)
if err != nil {
return nil, err
}

clientCAs, err := configapi.GetClientCertCAPool(options)
if err != nil {
Expand Down Expand Up @@ -650,7 +653,7 @@ func newServiceAccountTokenGetter(options configapi.MasterConfig) (serviceaccoun
if err != nil {
return nil, err
}
kubeStorageFactory, err := kubernetes.BuildStorageFactory(options, apiserverOptions)
kubeStorageFactory, err := kubernetes.BuildStorageFactory(options, apiserverOptions, nil)
if err != nil {
return nil, err
}
Expand Down
32 changes: 31 additions & 1 deletion pkg/cmd/server/origin/rest/storage_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ import (

// StorageOptions returns the appropriate storage configuration for the origin rest APIs, including
// overiddes.
func StorageOptions(options configapi.MasterConfig) restoptions.Getter {
func StorageOptions(options configapi.MasterConfig) (restoptions.Getter, error) {
return restoptions.NewConfigGetter(
options,
&serverstorage.ResourceConfig{},
// prefixes:
map[schema.GroupResource]string{
{Resource: "clusterpolicies"}: "authorization/cluster/policies",
{Resource: "clusterpolicies", Group: "authorization.openshift.io"}: "authorization/cluster/policies",
Expand Down Expand Up @@ -45,6 +46,35 @@ func StorageOptions(options configapi.MasterConfig) restoptions.Getter {
{Resource: "netnamespaces"}: "registry/sdnnetnamespaces",
{Resource: "netnamespaces", Group: "network.openshift.io"}: "registry/sdnnetnamespaces",
},
// storage versions:
[]schema.GroupVersionResource{
{"authorization.openshift.io", "v1", "clusterpolicybindings"},
{"authorization.openshift.io", "v1", "clusterpolicies"},
{"authorization.openshift.io", "v1", "policybindings"},
{"authorization.openshift.io", "v1", "rolebindingrestrictions"},
{"authorization.openshift.io", "v1", "policies"},
{"build.openshift.io", "v1", "builds"},
{"build.openshift.io", "v1", "buildconfigs"},
{"apps.openshift.io", "v1", "deploymentconfigs"},
{"image.openshift.io", "v1", "imagestreams"},
{"image.openshift.io", "v1", "images"},
{"oauth.openshift.io", "v1", "oauthclientauthorizations"},
{"oauth.openshift.io", "v1", "oauthaccesstokens"},
{"oauth.openshift.io", "v1", "oauthauthorizetokens"},
{"oauth.openshift.io", "v1", "oauthclients"},
{"project.openshift.io", "v1", "projects"},
{"quota.openshift.io", "v1", "clusterresourcequotas"},
{"route.openshift.io", "v1", "routes"},
{"network.openshift.io", "v1", "netnamespaces"},
{"network.openshift.io", "v1", "hostsubnets"},
{"network.openshift.io", "v1", "clusternetworks"},
{"network.openshift.io", "v1", "egressnetworkpolicies"},
{"template.openshift.io", "v1", "templates"},
{"user.openshift.io", "v1", "groups"},
{"user.openshift.io", "v1", "users"},
{"user.openshift.io", "v1", "identities"},
},
// quorum resources:
map[schema.GroupResource]struct{}{
{Resource: "oauthauthorizetokens"}: {},
{Resource: "oauthauthorizetokens", Group: "oauth.openshift.io"}: {},
Expand Down
134 changes: 32 additions & 102 deletions pkg/util/restoptions/configgetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,10 @@ import (
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
apiserveroptions "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/kubeapiserver"

"github.com/golang/glog"
configapi "github.com/openshift/origin/pkg/cmd/server/api"
cmdflags "github.com/openshift/origin/pkg/cmd/util/flags"
kubernetes "github.com/openshift/origin/pkg/cmd/server/kubernetes/master"
)

// UseConfiguredCacheSize indicates that the configured cache size should be used
Expand All @@ -37,111 +34,31 @@ type configRESTOptionsGetter struct {
storageFactory serverstorage.StorageFactory
defaultResourceConfig *serverstorage.ResourceConfig

cacheEnabled bool
defaultCacheSize int
cacheSizes map[schema.GroupResource]int
quorumResources map[schema.GroupResource]struct{}
defaultResourcePrefixes map[schema.GroupResource]string
cacheEnabled bool
defaultCacheSize int
cacheSizes map[schema.GroupResource]int
quorumResources map[schema.GroupResource]struct{}
}

// NewConfigGetter returns a restoptions.Getter implemented using information from the provided master config.
// By default, the etcd watch cache is enabled with a size of 1000 per resource type.
// TODO: this class should either not need to know about configapi.MasterConfig, or not be in pkg/util
func NewConfigGetter(masterOptions configapi.MasterConfig, defaultResourceConfig *serverstorage.ResourceConfig, defaultResourcePrefixes map[schema.GroupResource]string, quorumResources map[schema.GroupResource]struct{}) Getter {
getter := &configRESTOptionsGetter{
masterOptions: masterOptions,
cacheEnabled: true,
defaultCacheSize: 1000,
cacheSizes: map[schema.GroupResource]int{},
restOptionsMap: map[schema.GroupResource]generic.RESTOptions{},
defaultResourceConfig: defaultResourceConfig,
quorumResources: quorumResources,
defaultResourcePrefixes: defaultResourcePrefixes,
}

if err := getter.loadSettings(); err != nil {
glog.Error(err)
}

return getter
}

func (g *configRESTOptionsGetter) loadSettings() error {
options := apiserveroptions.NewServerRunOptions()
if g.masterOptions.KubernetesMasterConfig != nil {
if errs := cmdflags.Resolve(g.masterOptions.KubernetesMasterConfig.APIServerArguments, options.AddFlags); len(errs) > 0 {
return kerrors.NewAggregate(errs)
}
}

storageGroupsToEncodingVersion, err := options.StorageSerialization.StorageGroupsToEncodingVersion()
func NewConfigGetter(masterOptions configapi.MasterConfig, defaultResourceConfig *serverstorage.ResourceConfig, resourcePrefixOverrides map[schema.GroupResource]string, enforcedStorageVersions []schema.GroupVersionResource, quorumResources map[schema.GroupResource]struct{}) (Getter, error) {
apiserverOptions, err := kubernetes.BuildKubeAPIserverOptions(masterOptions)
if err != nil {
return err
return nil, err
}

storageConfig := options.Etcd.StorageConfig
storageConfig.Prefix = g.masterOptions.EtcdStorageConfig.OpenShiftStoragePrefix
storageConfig.ServerList = g.masterOptions.EtcdClientInfo.URLs
storageConfig.KeyFile = g.masterOptions.EtcdClientInfo.ClientCert.KeyFile
storageConfig.CertFile = g.masterOptions.EtcdClientInfo.ClientCert.CertFile
storageConfig.CAFile = g.masterOptions.EtcdClientInfo.CA

resourceEncodingConfig := serverstorage.NewDefaultResourceEncodingConfig(kapi.Registry)

storageFactory, err := kubeapiserver.NewStorageFactory(
storageConfig,
options.Etcd.DefaultStorageMediaType,
kapi.Codecs,
resourceEncodingConfig,
storageGroupsToEncodingVersion,
nil,
g.defaultResourceConfig,
options.APIEnablement.RuntimeConfig)
storageFactory, err := kubernetes.BuildStorageFactory(masterOptions, apiserverOptions, enforcedStorageVersions)
if err != nil {
return err
}

// TODO: the following works, but better make single resource overrides possible in BuildDefaultStorageFactory
// instead of being late to the party and patching here:

// use legacy group name "" for all resources that existed when apigroups were introduced
for _, gvr := range []schema.GroupVersionResource{
{Group: "authorization.openshift.io", Version: "v1", Resource: "clusterpolicybindings"},
{Group: "authorization.openshift.io", Version: "v1", Resource: "clusterpolicies"},
{Group: "authorization.openshift.io", Version: "v1", Resource: "policybindings"},
{Group: "authorization.openshift.io", Version: "v1", Resource: "rolebindingrestrictions"},
{Group: "authorization.openshift.io", Version: "v1", Resource: "policies"},
{Group: "build.openshift.io", Version: "v1", Resource: "builds"},
{Group: "build.openshift.io", Version: "v1", Resource: "buildconfigs"},
{Group: "apps.openshift.io", Version: "v1", Resource: "deploymentconfigs"},
{Group: "image.openshift.io", Version: "v1", Resource: "imagestreams"},
{Group: "image.openshift.io", Version: "v1", Resource: "images"},
{Group: "oauth.openshift.io", Version: "v1", Resource: "oauthclientauthorizations"},
{Group: "oauth.openshift.io", Version: "v1", Resource: "oauthaccesstokens"},
{Group: "oauth.openshift.io", Version: "v1", Resource: "oauthauthorizetokens"},
{Group: "oauth.openshift.io", Version: "v1", Resource: "oauthclients"},
{Group: "project.openshift.io", Version: "v1", Resource: "projects"},
{Group: "quota.openshift.io", Version: "v1", Resource: "clusterresourcequotas"},
{Group: "route.openshift.io", Version: "v1", Resource: "routes"},
{Group: "network.openshift.io", Version: "v1", Resource: "netnamespaces"},
{Group: "network.openshift.io", Version: "v1", Resource: "hostsubnets"},
{Group: "network.openshift.io", Version: "v1", Resource: "clusternetworks"},
{Group: "network.openshift.io", Version: "v1", Resource: "egressnetworkpolicies"},
{Group: "template.openshift.io", Version: "v1", Resource: "templates"},
{Group: "user.openshift.io", Version: "v1", Resource: "groups"},
{Group: "user.openshift.io", Version: "v1", Resource: "users"},
{Group: "user.openshift.io", Version: "v1", Resource: "identities"},
} {
resourceEncodingConfig.SetResourceEncoding(gvr.GroupResource(), schema.GroupVersion{Version: gvr.Version}, schema.GroupVersion{Version: runtime.APIVersionInternal})
return nil, err
}
storageFactory.DefaultResourcePrefixes = resourcePrefixOverrides
storageFactory.StorageConfig.Prefix = masterOptions.EtcdStorageConfig.OpenShiftStoragePrefix

storageFactory.DefaultResourcePrefixes = g.defaultResourcePrefixes
g.storageFactory = storageFactory

g.cacheEnabled = options.Etcd.EnableWatchCache

// TODO: refactor vendor/k8s.io/kubernetes/pkg/registry/cachesize to remove our custom cache size code
errs := []error{}
for _, c := range options.GenericServerRunOptions.WatchCacheSizes {
cacheSizes := map[schema.GroupResource]int{}
for _, c := range apiserverOptions.GenericServerRunOptions.WatchCacheSizes {
tokens := strings.Split(c, "#")
if len(tokens) != 2 {
errs = append(errs, fmt.Errorf("invalid watch cache size value '%s', expecting <resource>#<size> format (e.g. builds#100)", c))
Expand All @@ -155,10 +72,22 @@ func (g *configRESTOptionsGetter) loadSettings() error {
errs = append(errs, fmt.Errorf("invalid watch cache size value '%s': %v", c, err))
continue
}

g.cacheSizes[resource] = size
cacheSizes[resource] = size
}
return kerrors.NewAggregate(errs)
if len(errs) > 0 {
return nil, kerrors.NewAggregate(errs)
}

return &configRESTOptionsGetter{
masterOptions: masterOptions,
cacheEnabled: apiserverOptions.Etcd.EnableWatchCache,
defaultCacheSize: 1000,
cacheSizes: cacheSizes,
restOptionsMap: map[schema.GroupResource]generic.RESTOptions{},
defaultResourceConfig: defaultResourceConfig,
quorumResources: quorumResources,
storageFactory: storageFactory,
}, nil
}

func (g *configRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
Expand Down Expand Up @@ -210,7 +139,8 @@ func (g *configRESTOptionsGetter) GetRESTOptions(resource schema.GroupResource)
resourceOptions := generic.RESTOptions{
StorageConfig: config,
Decorator: decorator,
DeleteCollectionWorkers: 1,
DeleteCollectionWorkers: 1, // TODO(rebase): use upstream value here from Etcd options?
EnableGarbageCollection: false, // TODO(rebase): use upstream value here from Etcd options?
ResourcePrefix: g.storageFactory.ResourcePrefix(resource),
}
g.restOptionsMap[resource] = resourceOptions
Expand Down
5 changes: 4 additions & 1 deletion test/integration/bootstrap_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,10 @@ func TestBootstrapPolicyOverwritePolicyCommand(t *testing.T) {
t.Errorf("timeout: %v", err)
}

optsGetter := originrest.StorageOptions(*masterConfig)
optsGetter, err := originrest.StorageOptions(*masterConfig)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

if err := admin.OverwriteBootstrapPolicy(optsGetter, masterConfig.PolicyConfig.BootstrapPolicyFile, admin.CreateBootstrapPolicyFileFullCommand, true, ioutil.Discard); err != nil {
t.Errorf("unexpected error: %v", err)
Expand Down
5 changes: 4 additions & 1 deletion test/integration/oauthstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func TestOAuthStorage(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}

optsGetter := originrest.StorageOptions(*masterOptions)
optsGetter, err := originrest.StorageOptions(*masterOptions)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

clientStorage, err := clientetcd.NewREST(optsGetter)
if err != nil {
Expand Down
Loading

0 comments on commit b5c285d

Please sign in to comment.