Skip to content

Commit

Permalink
apiserver by lib: add etcd store
Browse files Browse the repository at this point in the history
  • Loading branch information
phosae committed May 11, 2023
1 parent d18e7ff commit ea08ef9
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 13 deletions.
16 changes: 12 additions & 4 deletions api-aggregation-lib/pkg/apiserver/apiserver.go
Expand Up @@ -26,7 +26,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
clientrest "k8s.io/client-go/rest"

"github.com/phosae/x-kubernetes/api-aggregation-lib/pkg/registry"
fooregistry "github.com/phosae/x-kubernetes/api-aggregation-lib/pkg/registry/hello.zeng.dev/foo"
hello "github.com/phosae/x-kubernetes/api/hello.zeng.dev"
hellov1 "github.com/phosae/x-kubernetes/api/hello.zeng.dev/v1"
)
Expand Down Expand Up @@ -58,7 +58,8 @@ func init() {

// ExtraConfig holds custom apiserver config
type ExtraConfig struct {
Rest *clientrest.Config
Rest *clientrest.Config
EnableEtcdStorage bool
}

// Config defines the config for the apiserver
Expand Down Expand Up @@ -110,8 +111,15 @@ func (c completedConfig) New() (*HelloApiServer, error) {

apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(hellov1.GroupName, Scheme, metav1.ParameterCodec, Codecs)

apiGroupInfo.VersionedResourcesStorageMap["v1"] = map[string]rest.Storage{
"foos": registry.NewFooApi(),
apiGroupInfo.VersionedResourcesStorageMap["v1"] = map[string]rest.Storage{}
if c.ExtraConfig.EnableEtcdStorage {
etcdstorage, err := fooregistry.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
if err != nil {
return nil, err
}
apiGroupInfo.VersionedResourcesStorageMap["v1"]["foos"] = etcdstorage
} else {
apiGroupInfo.VersionedResourcesStorageMap["v1"]["foos"] = fooregistry.NewMemStore()
}

if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
Expand Down
62 changes: 55 additions & 7 deletions api-aggregation-lib/pkg/cmd/start.go
Expand Up @@ -7,10 +7,14 @@ import (
"os"

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
genericoptions "k8s.io/apiserver/pkg/server/options"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/storagebackend"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
Expand All @@ -20,22 +24,29 @@ import (

myapiserver "github.com/phosae/x-kubernetes/api-aggregation-lib/pkg/apiserver"
generatedopenapi "github.com/phosae/x-kubernetes/api/generated/openapi"
hellov1 "github.com/phosae/x-kubernetes/api/hello.zeng.dev/v1"
)

const defaultEtcdPathPrefix = "/registry/hello.zeng.dev"

type Options struct {
// RecommendedOptions *genericoptions.RecommendedOptions // - EtcdOptions
SecureServing *genericoptions.SecureServingOptionsWithLoopback
Kubeconfig string
Features *genericoptions.FeatureOptions

EnableEtcdStorage bool
Etcd *genericoptions.EtcdOptions
}

func (o *Options) Flags() (fs cliflag.NamedFlagSets) {
msfs := fs.FlagSet("hello.zeng.dev-server")
msfs.StringVar(&o.Kubeconfig, "kubeconfig", o.Kubeconfig, "The path to the kubeconfig used to connect to the Kubernetes API server and the Kubelets (defaults to in-cluster config)")
msfs.StringVar(&o.Kubeconfig, "kubeconfig", o.Kubeconfig, "The path to the kubeconfig used to connect to the Kubernetes API server (defaults to in-cluster config)")

o.SecureServing.AddFlags(fs.FlagSet("apiserver secure serving"))
o.Features.AddFlags(fs.FlagSet("features"))

msfs.BoolVar(&o.EnableEtcdStorage, "enable-etcd-storage", false, "If true, store objects in etcd")
o.Etcd.AddFlags(fs.FlagSet("Etcd"))
return fs
}

Expand All @@ -56,13 +67,40 @@ func (o Options) ServerConfig() (*myapiserver.Config, error) {
return nil, err
}

//apiservercfg.ClientConfig, err = o.restConfig()
//if err != nil {
// return nil, err
//}
if o.EnableEtcdStorage {
if err := o.Etcd.Complete(apiservercfg.Config.StorageObjectCountTracker, apiservercfg.Config.DrainedNotify(), apiservercfg.Config.AddPostStartHook); err != nil {
return nil, err
}

// set apiservercfg's RESTOptionsGetter as StorageFactoryRestOptionsFactory{..., StorageFactory: DefaultStorageFactory}
// like https://github.com/kubernetes/kubernetes/blob/e1ad9bee5bba8fbe85a6bf6201379ce8b1a611b1/cmd/kube-apiserver/app/server.go#L407-L415
// DefaultStorageFactory#NewConfig provides a way to negotiate StorageSerializer/DeSerializer by Etcd.DefaultStorageMediaType option
//
// DefaultStorageFactory's NewConfig will be called by interface genericregistry.RESTOptionsGetter#GetRESTOptions (struct StorageFactoryRestOptionsFactory)
// interface genericregistry.RESTOptionsGetter#GetRESTOptions will be called by genericregistry.Store#CompleteWithOptions
// Finally all RESTBackend Options will be passed to genericregistry.Store implementations
if o.Etcd.ApplyWithStorageFactoryTo(serverstorage.NewDefaultStorageFactory(
o.Etcd.StorageConfig,
o.Etcd.DefaultStorageMediaType,
myapiserver.Codecs,
serverstorage.NewDefaultResourceEncodingConfig(myapiserver.Scheme),
apiservercfg.MergedResourceConfig,
nil), &apiservercfg.Config); err != nil {
return nil, err
}
klog.Infof("etcd cfg: %v", o.Etcd)
o.Etcd.StorageConfig.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
// apiservercfg.ClientConfig, err = o.restConfig()
// if err != nil {
// return nil, err
// }
}

return &myapiserver.Config{
GenericConfig: apiservercfg,
ExtraConfig: myapiserver.ExtraConfig{},
ExtraConfig: myapiserver.ExtraConfig{
EnableEtcdStorage: o.EnableEtcdStorage,
},
}, nil
}

Expand Down Expand Up @@ -115,7 +153,17 @@ func (o Options) restConfig() (*rest.Config, error) {
func NewHelloServerCommand(stopCh <-chan struct{}) *cobra.Command {
opts := &Options{
SecureServing: genericoptions.NewSecureServingOptions().WithLoopback(),
// if just encode as json and store to etcd, just do this
// Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(defaultEtcdPathPrefix, myapiserver.Codecs.LegacyCodec(hellov1.SchemeGroupVersion))),
// but if we want to encode as json and pb, just assign nil to Codec here
// like the official kube-apiserver https://github.com/kubernetes/kubernetes/blob/e1ad9bee5bba8fbe85a6bf6201379ce8b1a611b1/cmd/kube-apiserver/app/options/options.go#L96
// when new/complete apiserver config, use EtcdOptions#ApplyWithStorageFactoryTo server.Config, which
// finally init server.Config.RESTOptionsGetter as StorageFactoryRestOptionsFactory
Etcd: genericoptions.NewEtcdOptions(storagebackend.NewDefaultConfig(defaultEtcdPathPrefix, nil)),
}
opts.Etcd.StorageConfig.EncodeVersioner = runtime.NewMultiGroupVersioner(hellov1.SchemeGroupVersion, schema.GroupKind{Group: hellov1.GroupName})
// opts.Etcd.DefaultStorageMediaType = "application/vnd.kubernetes.protobuf"
opts.Etcd.DefaultStorageMediaType = "application/json"
opts.SecureServing.BindPort = 6443

cmd := &cobra.Command{
Expand Down
32 changes: 32 additions & 0 deletions api-aggregation-lib/pkg/registry/hello.zeng.dev/foo/etcd.go
@@ -0,0 +1,32 @@
package foo

import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/generic"
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"

hellov1 "github.com/phosae/x-kubernetes/api/hello.zeng.dev/v1"
)

// NewREST returns a RESTStorage object that will work against API services.
func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) (*genericregistry.Store, error) {
strategy := NewStrategy(scheme)

store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &hellov1.Foo{} },
NewListFunc: func() runtime.Object { return &hellov1.FooList{} },
PredicateFunc: MatchFoo,
DefaultQualifiedResource: hellov1.Resource("foos"),
SingularQualifiedResource: hellov1.Resource("foos"),

CreateStrategy: strategy,
UpdateStrategy: strategy,
DeleteStrategy: strategy,
TableConvertor: strategy,
}
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: GetAttrs}
if err := store.CompleteWithOptions(options); err != nil {
return nil, err
}
return store, nil
}
@@ -1,4 +1,4 @@
package registry
package foo

import (
"context"
Expand All @@ -24,7 +24,7 @@ type fooApi struct {
store map[string]*hellov1.Foo
}

func NewFooApi() *fooApi {
func NewMemStore() *fooApi {
return &fooApi{
store: map[string]*hellov1.Foo{
"default/bar": {
Expand Down
113 changes: 113 additions & 0 deletions api-aggregation-lib/pkg/registry/hello.zeng.dev/foo/strategy.go
@@ -0,0 +1,113 @@
package foo

import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/names"

hellov1 "github.com/phosae/x-kubernetes/api/hello.zeng.dev/v1"
)

// NewStrategy creates and returns a fooStrategy instance
func NewStrategy(typer runtime.ObjectTyper) fooStrategy {
return fooStrategy{typer, names.SimpleNameGenerator}
}

// GetAttrs returns labels.Set, fields.Set, and error in case the given runtime.Object is not a Fischer
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
apiserver, ok := obj.(*hellov1.Foo)
if !ok {
return nil, nil, fmt.Errorf("given object is not a Fischer")
}
return labels.Set(apiserver.ObjectMeta.Labels), SelectableFields(apiserver), nil
}

// MatchFoo is the filter used by the generic etcd backend to watch events
// from etcd to clients of the apiserver only interested in specific labels/fields.
func MatchFoo(label labels.Selector, field fields.Selector) storage.SelectionPredicate {
return storage.SelectionPredicate{
Label: label,
Field: field,
GetAttrs: GetAttrs,
}
}

// SelectableFields returns a field set that represents the object.
func SelectableFields(obj *hellov1.Foo) fields.Set {
return generic.ObjectMetaFieldsSet(&obj.ObjectMeta, true)
}

type fooStrategy struct {
runtime.ObjectTyper
names.NameGenerator
}

func (fooStrategy) NamespaceScoped() bool {
return true
}

func (fooStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {
}

func (fooStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
}

func (fooStrategy) Validate(ctx context.Context, obj runtime.Object) field.ErrorList {
_ = obj.(*hellov1.Foo)
return nil
}

// WarningsOnCreate returns warnings for the creation of the given object.
func (fooStrategy) WarningsOnCreate(ctx context.Context, obj runtime.Object) []string { return nil }

func (fooStrategy) AllowCreateOnUpdate() bool {
return false
}

func (fooStrategy) AllowUnconditionalUpdate() bool {
return false
}

func (fooStrategy) Canonicalize(obj runtime.Object) {
}

func (fooStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object) field.ErrorList {
return field.ErrorList{}
}

// WarningsOnUpdate returns warnings for the given update.
func (fooStrategy) WarningsOnUpdate(ctx context.Context, obj, old runtime.Object) []string {
return nil
}

func (fooStrategy) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
var table metav1.Table

table.ColumnDefinitions = []metav1.TableColumnDefinition{
{Name: "Name", Type: "string", Format: "name", Description: metav1.ObjectMeta{}.SwaggerDoc()["name"]},
{Name: "Age", Type: "string", Description: metav1.ObjectMeta{}.SwaggerDoc()["creationTimestamp"]},
{Name: "Message", Type: "string", Format: "message", Description: "foo message"},
{Name: "Message1", Type: "string", Format: "message1", Description: "foo message plus"},
}

switch t := object.(type) {
case *hellov1.Foo:
table.ResourceVersion = t.ResourceVersion
addFoosToTable(&table, *t)
case *hellov1.FooList:
table.ResourceVersion = t.ResourceVersion
table.Continue = t.Continue
addFoosToTable(&table, t.Items...)
default:
}

return &table, nil
}

0 comments on commit ea08ef9

Please sign in to comment.