Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][DO NOT REVIEW] Serialize once #81218

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/runtime/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ func (defaultFramer) NewFrameWriter(w io.Writer) io.Writer { return w }
type WithVersionEncoder struct {
Version GroupVersioner
Encoder
ObjectConvertor
ObjectTyper
}

Expand Down Expand Up @@ -242,6 +243,14 @@ func (e WithVersionEncoder) Encode(obj Object, stream io.Writer) error {
return err
}

// CustomEncoder is an interface that allows intercepting encoding for
// individual object to allow custom encoding.
type CustomEncoder interface {
// InterceptEncode takes encoder that would have been used if the object
// didn't implement CustomEncoder interface.
InterceptEncode(encoder WithVersionEncoder, w io.Writer) error
}

// WithoutVersionDecoder clears the group version kind of a deserialized object.
type WithoutVersionDecoder struct {
Decoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, i

// Encode serializes the provided object to the given writer.
func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
if customEncoder, ok := obj.(runtime.CustomEncoder); ok {
return customEncoder.InterceptEncode(runtime.WithVersionEncoder{Encoder: s}, w)
}

if s.options.Yaml {
json, err := caseSensitiveJsonIterator.Marshal(obj)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func (s *Serializer) Decode(originalData []byte, gvk *schema.GroupVersionKind, i

// Encode serializes the provided object to the given writer.
func (s *Serializer) Encode(obj runtime.Object, w io.Writer) error {
if customEncoder, ok := obj.(runtime.CustomEncoder); ok {
return customEncoder.InterceptEncode(runtime.WithVersionEncoder{Encoder: s}, w)
}

prefixSize := uint64(len(s.prefix))

var unk runtime.Unknown
Expand Down Expand Up @@ -419,6 +423,10 @@ func unmarshalToObject(typer runtime.ObjectTyper, creater runtime.ObjectCreater,

// Encode serializes the provided object to the given writer. Overrides is ignored.
func (s *RawSerializer) Encode(obj runtime.Object, w io.Writer) error {
if customEncoder, ok := obj.(runtime.CustomEncoder); ok {
return customEncoder.InterceptEncode(runtime.WithVersionEncoder{Encoder: s}, w)
}

switch t := obj.(type) {
case bufferedReverseMarshaller:
// this path performs a single allocation during write but requires the caller to implement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

"k8s.io/klog"
)

// NewDefaultingCodecForScheme is a convenience method for callers that are using a scheme.
Expand Down Expand Up @@ -195,6 +197,28 @@ func (c *codec) Encode(obj runtime.Object, w io.Writer) error {
}
}

if customEncoder, ok := obj.(runtime.CustomEncoder); ok {
klog.Errorf("DDDD: %#v %#v", obj, c.typer)

_, _, err := c.typer.ObjectKinds(obj)
if err != nil {
klog.Errorf("EEEE: WTF? %#v #%v", obj, c.typer)
}
encoder := runtime.WithVersionEncoder{
Version: c.encodeVersion,
Encoder: c.encoder,
ObjectConvertor: c.convertor,
ObjectTyper: c.typer,
}
return customEncoder.InterceptEncode(encoder, w)
}

if reflect.TypeOf(obj).String() == "apiextensions.CustomResourceDefinition" ||
reflect.TypeOf(obj).String() == "*apiextensions.CustomResourceDefinition" {
klog.Errorf("BBBB: %#v", obj)
klog.Errorf("CCCC: %#v", c.typer)
}

gvks, isUnversioned, err := c.typer.ObjectKinds(obj)
if err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"cacher.go",
"caching_object.go",
"time_budget.go",
"util.go",
"watch_cache.go",
Expand All @@ -15,10 +16,12 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
Expand Down
25 changes: 21 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,9 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
watcher.nonblockingAdd(event)
}
} else {
// Cache serializations of the object across all watchers.
event.Object = newObjectWithSerializations(event.Object)

c.blockedWatchers = c.blockedWatchers[:0]
for _, watcher := range c.watchersBuffer {
if !watcher.nonblockingAdd(event) {
Expand Down Expand Up @@ -1194,21 +1197,35 @@ func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event
return nil
}

// FIXME: Avoid those strange ifs below.
var watchEvent watch.Event
switch {
case curObjPasses && !oldObjPasses:
return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
watchEvent.Type = watch.Added
if _, ok := event.Object.(*ObjectWithSerializations); ok {
watchEvent.Object = event.Object
} else {
watchEvent.Object = event.Object.DeepCopyObject()
}
case curObjPasses && oldObjPasses:
return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
watchEvent.Type = watch.Modified
if _, ok := event.Object.(*ObjectWithSerializations); ok {
watchEvent.Object = event.Object
} else {
watchEvent.Object = event.Object.DeepCopyObject()
}
case !curObjPasses && oldObjPasses:
watchEvent.Type = watch.Deleted
// return a delete event with the previous object content, but with the event's resource version
oldObj := event.PrevObject.DeepCopyObject()
if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
}
return &watch.Event{Type: watch.Deleted, Object: oldObj}
// FIXME: For now we don't do any caching here.
watchEvent.Object = oldObj
}

return nil
return &watchEvent
}

// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ func testCacherSendBookmarkEvents(t *testing.T, watchCacheEnabled, allowWatchBoo
}
rv, err := cacher.versioner.ObjectResourceVersion(event.Object)
if err != nil {
t.Errorf("failed to parse resource version from %#v", event.Object)
t.Errorf("failed to parse resource version from %#v: %v", event.Object, err)
}
if event.Type == watch.Bookmark {
if !expectedBookmarks {
Expand Down