Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow StorageFactory to wrap encoders and decoders #40624

Merged
merged 1 commit into from Feb 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
155 changes: 94 additions & 61 deletions pkg/genericapiserver/server/storage_factory.go
Expand Up @@ -74,7 +74,19 @@ type DefaultStorageFactory struct {
APIResourceConfigSource APIResourceConfigSource

// newStorageCodecFn exists to be overwritten for unit testing.
newStorageCodecFn func(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion schema.GroupVersion, config storagebackend.Config) (codec runtime.Codec, err error)
newStorageCodecFn func(opts StorageCodecConfig) (codec runtime.Codec, err error)
}

// StorageCodecConfig are the arguments passed to newStorageCodecFn
type StorageCodecConfig struct {
StorageMediaType string
StorageSerializer runtime.StorageSerializer
StorageVersion schema.GroupVersion
MemoryVersion schema.GroupVersion
Config storagebackend.Config

EncoderDecoratorFn func(runtime.Encoder) runtime.Encoder
DecoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder
}

type groupResourceOverrides struct {
Expand All @@ -95,6 +107,34 @@ type groupResourceOverrides struct {
// of exposing one set of concepts. autoscaling.HPA and extensions.HPA as a for instance
// The order of the slice matters! It is the priority order of lookup for finding a storage location
cohabitatingResources []schema.GroupResource
// encoderDecoratorFn is optional and may wrap the provided encoder prior to being serialized.
encoderDecoratorFn func(runtime.Encoder) runtime.Encoder
// decoderDecoratorFn is optional and may wrap the provided decoders (can add new decoders). The order of
// returned decoders will be priority for attempt to decode.
decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder
}

// Apply overrides the provided config and options if the override has a value in that position
func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *StorageCodecConfig) {
if len(o.etcdLocation) > 0 {
config.ServerList = o.etcdLocation
}
if len(o.etcdPrefix) > 0 {
config.Prefix = o.etcdPrefix
}

if len(o.mediaType) > 0 {
options.StorageMediaType = o.mediaType
}
if o.serializer != nil {
options.StorageSerializer = o.serializer
}
if o.encoderDecoratorFn != nil {
options.EncoderDecoratorFn = o.encoderDecoratorFn
}
if o.decoderDecoratorFn != nil {
options.DecoderDecoratorFn = o.decoderDecoratorFn
}
}

var _ StorageFactory = &DefaultStorageFactory{}
Expand Down Expand Up @@ -165,6 +205,15 @@ func (s *DefaultStorageFactory) AddCohabitatingResources(groupResources ...schem
}
}

func (s *DefaultStorageFactory) AddSerializationChains(encoderDecoratorFn func(runtime.Encoder) runtime.Encoder, decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder, groupResources ...schema.GroupResource) {
for _, groupResource := range groupResources {
overrides := s.Overrides[groupResource]
overrides.encoderDecoratorFn = encoderDecoratorFn
overrides.decoderDecoratorFn = decoderDecoratorFn
s.Overrides[groupResource] = overrides
}
}

func getAllResourcesAlias(resource schema.GroupResource) schema.GroupResource {
return schema.GroupResource{Group: resource.Group, Resource: AllResources}
}
Expand All @@ -184,64 +233,38 @@ func (s *DefaultStorageFactory) getStorageGroupResource(groupResource schema.Gro
func (s *DefaultStorageFactory) NewConfig(groupResource schema.GroupResource) (*storagebackend.Config, error) {
chosenStorageResource := s.getStorageGroupResource(groupResource)

groupOverride := s.Overrides[getAllResourcesAlias(chosenStorageResource)]
exactResourceOverride := s.Overrides[chosenStorageResource]

overriddenEtcdLocations := []string{}
if len(groupOverride.etcdLocation) > 0 {
overriddenEtcdLocations = groupOverride.etcdLocation
}
if len(exactResourceOverride.etcdLocation) > 0 {
overriddenEtcdLocations = exactResourceOverride.etcdLocation
}

etcdPrefix := s.StorageConfig.Prefix
if len(groupOverride.etcdPrefix) > 0 {
etcdPrefix = groupOverride.etcdPrefix
}
if len(exactResourceOverride.etcdPrefix) > 0 {
etcdPrefix = exactResourceOverride.etcdPrefix
}

etcdMediaType := s.DefaultMediaType
if len(groupOverride.mediaType) != 0 {
etcdMediaType = groupOverride.mediaType
}
if len(exactResourceOverride.mediaType) != 0 {
etcdMediaType = exactResourceOverride.mediaType
// operate on copy
storageConfig := s.StorageConfig
codecConfig := StorageCodecConfig{
StorageMediaType: s.DefaultMediaType,
StorageSerializer: s.DefaultSerializer,
}

etcdSerializer := s.DefaultSerializer
if groupOverride.serializer != nil {
etcdSerializer = groupOverride.serializer
if override, ok := s.Overrides[getAllResourcesAlias(chosenStorageResource)]; ok {
override.Apply(&storageConfig, &codecConfig)
}
if exactResourceOverride.serializer != nil {
etcdSerializer = exactResourceOverride.serializer
}
// operate on copy
config := s.StorageConfig
config.Prefix = etcdPrefix
if len(overriddenEtcdLocations) > 0 {
config.ServerList = overriddenEtcdLocations
if override, ok := s.Overrides[chosenStorageResource]; ok {
override.Apply(&storageConfig, &codecConfig)
}

storageEncodingVersion, err := s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
var err error
codecConfig.StorageVersion, err = s.ResourceEncodingConfig.StorageEncodingFor(chosenStorageResource)
if err != nil {
return nil, err
}
internalVersion, err := s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
codecConfig.MemoryVersion, err = s.ResourceEncodingConfig.InMemoryEncodingFor(groupResource)
if err != nil {
return nil, err
}
codecConfig.Config = storageConfig

codec, err := s.newStorageCodecFn(etcdMediaType, etcdSerializer, storageEncodingVersion, internalVersion, config)
storageConfig.Codec, err = s.newStorageCodecFn(codecConfig)
if err != nil {
return nil, err
}
glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, codecConfig.StorageVersion, codecConfig.MemoryVersion, codecConfig.Config)

glog.V(3).Infof("storing %v in %v, reading as %v from %v", groupResource, storageEncodingVersion, internalVersion, config)
config.Codec = codec
return &config, nil
return &storageConfig, nil
}

// Get all backends for all registered storage destinations.
Expand All @@ -257,41 +280,51 @@ func (s *DefaultStorageFactory) Backends() []string {

// NewStorageCodec assembles a storage codec for the provided storage media type, the provided serializer, and the requested
// storage and memory versions.
func NewStorageCodec(storageMediaType string, ns runtime.StorageSerializer, storageVersion, memoryVersion schema.GroupVersion, config storagebackend.Config) (runtime.Codec, error) {
mediaType, _, err := mime.ParseMediaType(storageMediaType)
func NewStorageCodec(opts StorageCodecConfig) (runtime.Codec, error) {
mediaType, _, err := mime.ParseMediaType(opts.StorageMediaType)
if err != nil {
return nil, fmt.Errorf("%q is not a valid mime-type", storageMediaType)
return nil, fmt.Errorf("%q is not a valid mime-type", opts.StorageMediaType)
}
serializer, ok := runtime.SerializerInfoForMediaType(ns.SupportedMediaTypes(), mediaType)
serializer, ok := runtime.SerializerInfoForMediaType(opts.StorageSerializer.SupportedMediaTypes(), mediaType)
if !ok {
return nil, fmt.Errorf("unable to find serializer for %q", storageMediaType)
return nil, fmt.Errorf("unable to find serializer for %q", opts.StorageMediaType)
}

s := serializer.Serializer

// etcd2 only supports string data - we must wrap any result before returning
// TODO: storagebackend should return a boolean indicating whether it supports binary data
if !serializer.EncodesAsText && (config.Type == storagebackend.StorageTypeUnset || config.Type == storagebackend.StorageTypeETCD2) {
if !serializer.EncodesAsText && (opts.Config.Type == storagebackend.StorageTypeUnset || opts.Config.Type == storagebackend.StorageTypeETCD2) {
glog.V(4).Infof("Wrapping the underlying binary storage serializer with a base64 encoding for etcd2")
s = runtime.NewBase64Serializer(s)
}

encoder := ns.EncoderForVersion(
s,
// Give callers the opportunity to wrap encoders and decoders. For decoders, each returned decoder will
// be passed to the recognizer so that multiple decoders are available.
var encoder runtime.Encoder = s
if opts.EncoderDecoratorFn != nil {
encoder = opts.EncoderDecoratorFn(encoder)
}
decoders := []runtime.Decoder{s, opts.StorageSerializer.UniversalDeserializer()}
if opts.DecoderDecoratorFn != nil {
decoders = opts.DecoderDecoratorFn(decoders)
}

// Ensure the storage receives the correct version.
encoder = opts.StorageSerializer.EncoderForVersion(
encoder,
runtime.NewMultiGroupVersioner(
storageVersion,
schema.GroupKind{Group: storageVersion.Group},
schema.GroupKind{Group: memoryVersion.Group},
opts.StorageVersion,
schema.GroupKind{Group: opts.StorageVersion.Group},
schema.GroupKind{Group: opts.MemoryVersion.Group},
),
)

ds := recognizer.NewDecoder(s, ns.UniversalDeserializer())
decoder := ns.DecoderToVersion(
ds,
decoder := opts.StorageSerializer.DecoderToVersion(
recognizer.NewDecoder(decoders...),
runtime.NewMultiGroupVersioner(
memoryVersion,
schema.GroupKind{Group: memoryVersion.Group},
schema.GroupKind{Group: storageVersion.Group},
opts.MemoryVersion,
schema.GroupKind{Group: opts.MemoryVersion.Group},
schema.GroupKind{Group: opts.StorageVersion.Group},
),
)

Expand Down
62 changes: 62 additions & 0 deletions pkg/genericapiserver/server/storage_factory_test.go
Expand Up @@ -20,13 +20,75 @@ import (
"reflect"
"testing"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/genericapiserver/server/options"
)

type fakeNegotiater struct {
serializer, streamSerializer runtime.Serializer
framer runtime.Framer
types, streamTypes []string
}

func (n *fakeNegotiater) SupportedMediaTypes() []runtime.SerializerInfo {
var out []runtime.SerializerInfo
for _, s := range n.types {
info := runtime.SerializerInfo{Serializer: n.serializer, MediaType: s, EncodesAsText: true}
for _, t := range n.streamTypes {
if t == s {
info.StreamSerializer = &runtime.StreamSerializerInfo{
EncodesAsText: true,
Framer: n.framer,
Serializer: n.streamSerializer,
}
}
}
out = append(out, info)
}
return out
}

func (n *fakeNegotiater) UniversalDeserializer() runtime.Decoder {
return n.serializer
}

func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
return n.serializer
}

func (n *fakeNegotiater) DecoderToVersion(serializer runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
return n.serializer
}

func TestDefaultStorageFactory(t *testing.T) {
ns := &fakeNegotiater{types: []string{"test/test"}}
f := NewDefaultStorageFactory(storagebackend.Config{}, "test/test", ns, NewDefaultResourceEncodingConfig(), NewResourceConfig())
f.AddCohabitatingResources(schema.GroupResource{Resource: "test"}, schema.GroupResource{Resource: "test2", Group: "2"})
called := false
testEncoderChain := func(e runtime.Encoder) runtime.Encoder {
called = true
return e
}
f.AddSerializationChains(testEncoderChain, nil, schema.GroupResource{Resource: "test"})
f.SetEtcdLocation(schema.GroupResource{Resource: "*"}, []string{"/server2"})
f.SetEtcdPrefix(schema.GroupResource{Resource: "test"}, "/prefix_for_test")

config, err := f.NewConfig(schema.GroupResource{Resource: "test"})
if err != nil {
t.Fatal(err)
}
if config.Prefix != "/prefix_for_test" || !reflect.DeepEqual(config.ServerList, []string{"/server2"}) {
t.Errorf("unexpected config %#v", config)
}
if !called {
t.Errorf("expected encoder chain to be called")
}
}

func TestUpdateEtcdOverrides(t *testing.T) {
testCases := []struct {
resource schema.GroupResource
Expand Down