Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional optimizations to the encode/decode paths #26007

Merged
merged 9 commits into from
May 28, 2016
12 changes: 12 additions & 0 deletions pkg/api/install/install.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch/versioned"
)

const importPrefix = "k8s.io/kubernetes/pkg/api"
Expand Down Expand Up @@ -233,6 +234,17 @@ func addVersionsToScheme(externalVersions ...unversioned.GroupVersion) {
case *v1.Endpoints:
return true, v1.Convert_api_Endpoints_To_v1_Endpoints(a, b, s)
}

case *versioned.Event:
switch b := objB.(type) {
case *versioned.InternalEvent:
return true, versioned.Convert_versioned_Event_to_versioned_InternalEvent(a, b, s)
}
case *versioned.InternalEvent:
switch b := objB.(type) {
case *versioned.Event:
return true, versioned.Convert_versioned_InternalEvent_to_versioned_Event(a, b, s)
}
}
return false, nil
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ func GetReference(obj runtime.Object) (*ObjectReference, error) {
kind := gvk.Kind
if len(kind) == 0 {
// TODO: this is wrong
gvk, err := Scheme.ObjectKind(obj)
gvks, _, err := Scheme.ObjectKinds(obj)
if err != nil {
return nil, err
}
kind = gvk.Kind
kind = gvks[0].Kind
}

// if the object referenced is actually persisted, we can also get version from meta
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/ref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestGetReference(t *testing.T) {
// when vendoring kube, if you don't force the set of registered versions (like this hack/test-go.sh does)
// then you run into trouble because the types aren't registered in the scheme by anything. This does the
// register manually to allow unit test execution
if _, err := Scheme.ObjectKind(&Pod{}); err != nil {
if _, _, err := Scheme.ObjectKinds(&Pod{}); err != nil {
AddToScheme(Scheme)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/api/rest/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ func objectMetaAndKind(typer runtime.ObjectTyper, obj runtime.Object) (*api.Obje
if err != nil {
return nil, unversioned.GroupVersionKind{}, errors.NewInternalError(err)
}
kind, err := typer.ObjectKind(obj)
kinds, _, err := typer.ObjectKinds(obj)
if err != nil {
return nil, unversioned.GroupVersionKind{}, errors.NewInternalError(err)
}
return objectMeta, kind, nil
return objectMeta, kinds[0], nil
}

// NamespaceScopedStrategy has a method to tell if the object must be in a namespace.
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/serialization_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (

func init() {
codecsToTest = append(codecsToTest, func(version unversioned.GroupVersion, item runtime.Object) (runtime.Codec, error) {
s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), "application/arbitrary.content.type")
s := protobuf.NewSerializer(api.Scheme, api.Scheme, "application/arbitrary.content.type")
return api.Codecs.CodecForVersions(s, s, testapi.ExternalGroupVersions(), nil), nil
})
}
Expand Down Expand Up @@ -138,7 +138,7 @@ func BenchmarkEncodeProtobufGeneratedMarshal(b *testing.B) {
func BenchmarkDecodeCodecToInternalProtobuf(b *testing.B) {
items := benchmarkItems()
width := len(items)
s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), "application/arbitrary.content.type")
s := protobuf.NewSerializer(api.Scheme, api.Scheme, "application/arbitrary.content.type")
encoder := api.Codecs.EncoderForVersion(s, v1.SchemeGroupVersion)
var encoded [][]byte
for i := range items {
Expand Down
3 changes: 2 additions & 1 deletion pkg/api/testapi/testapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,11 @@ func ExternalGroupVersions() []unversioned.GroupVersion {

// Get codec based on runtime.Object
func GetCodecForObject(obj runtime.Object) (runtime.Codec, error) {
kind, err := api.Scheme.ObjectKind(obj)
kinds, _, err := api.Scheme.ObjectKinds(obj)
if err != nil {
return nil, fmt.Errorf("unexpected encoding error: %v", err)
}
kind := kinds[0]

for _, group := range Groups {
if group.GroupVersion().Group != kind.Group {
Expand Down
2 changes: 2 additions & 0 deletions pkg/api/testing/fuzzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ func FuzzerFor(t *testing.T, version unversioned.GroupVersion, src rand.Source)
randomQuantity := func() resource.Quantity {
var q resource.Quantity
c.Fuzz(&q)
// precalc the string for benchmarking purposes
_ = q.String()
return q
}
q.Limits = make(api.ResourceList)
Expand Down
39 changes: 27 additions & 12 deletions pkg/apiserver/api_installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (a *APIInstaller) getResourceKind(path string, storage rest.Storage) (unver
}

object := storage.New()
fqKinds, err := a.group.Typer.ObjectKinds(object)
fqKinds, _, err := a.group.Typer.ObjectKinds(object)
if err != nil {
return unversioned.GroupVersionKind{}, err
}
Expand Down Expand Up @@ -233,8 +233,11 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
var versionedList interface{}
if isLister {
list := lister.NewList()
listGVK, err := a.group.Typer.ObjectKind(list)
versionedListPtr, err := a.group.Creater.New(a.group.GroupVersion.WithKind(listGVK.Kind))
listGVKs, _, err := a.group.Typer.ObjectKinds(list)
if err != nil {
return nil, err
}
versionedListPtr, err := a.group.Creater.New(a.group.GroupVersion.WithKind(listGVKs[0].Kind))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -272,10 +275,11 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
)
if isGetterWithOptions {
getOptions, getSubpath, _ = getterWithOptions.NewGetOptions()
getOptionsInternalKind, err = a.group.Typer.ObjectKind(getOptions)
getOptionsInternalKinds, _, err := a.group.Typer.ObjectKinds(getOptions)
if err != nil {
return nil, err
}
getOptionsInternalKind = getOptionsInternalKinds[0]
versionedGetOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind(getOptionsInternalKind.Kind))
if err != nil {
return nil, err
Expand All @@ -300,12 +304,16 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if isConnecter {
connectOptions, connectSubpath, _ = connecter.NewConnectOptions()
if connectOptions != nil {
connectOptionsInternalKind, err = a.group.Typer.ObjectKind(connectOptions)
connectOptionsInternalKinds, _, err := a.group.Typer.ObjectKinds(connectOptions)
if err != nil {
return nil, err
}

connectOptionsInternalKind = connectOptionsInternalKinds[0]
versionedConnectOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind(connectOptionsInternalKind.Kind))
if err != nil {
return nil, err
}
}
}

Expand Down Expand Up @@ -390,18 +398,26 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag

resourcePath := namespacedPath
resourceParams := namespaceParams
itemPathPrefix := gpath.Join(a.prefix, scope.ParamName()) + "/"
itemPath := namespacedPath + "/{name}"
itemPathMiddle := "/" + resource + "/"
nameParams := append(namespaceParams, nameParam)
proxyParams := append(nameParams, pathParam)
itemPathSuffix := ""
if hasSubresource {
itemPath = itemPath + "/" + subresource
itemPathSuffix = "/" + subresource
itemPath = itemPath + itemPathSuffix
resourcePath = itemPath
resourceParams = nameParams
}
apiResource.Name = path
apiResource.Namespaced = true
apiResource.Kind = resourceKind
namer := scopeNaming{scope, a.group.Linker, gpath.Join(a.prefix, itemPath), false}

itemPathFn := func(name, namespace string) string {
return itemPathPrefix + namespace + itemPathMiddle + name + itemPathSuffix
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the resource is non-namespaced (e.g. Node)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's in a different branch.

On Mon, May 23, 2016 at 5:25 AM, Wojciech Tyczynski <
notifications@github.com> wrote:

In pkg/apiserver/api_installer.go
#26007 (comment)
:

        resourcePath = itemPath
        resourceParams = nameParams
    }
    apiResource.Name = path
    apiResource.Namespaced = true
    apiResource.Kind = resourceKind
  •   namer := scopeNaming{scope, a.group.Linker, gpath.Join(a.prefix, itemPath), false}
    
  •   itemPathFn := func(name, namespace string) string {
    
  •       return itemPathPrefix + namespace + itemPathMiddle + name + itemPathSuffix
    

What if the resource is non-namespaced (e.g. Node)?


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub
https://github.com/kubernetes/kubernetes/pull/26007/files/25a2a7d25a45edffcbdaefc20c93822fc71637e0..f670cc465211fb5328ee2beae330a6625750471c#r64192201

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah ok. I forgot that we handle those separately.

}
namer := scopeNaming{scope, a.group.Linker, itemPathFn, false}

actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer}, isLister)
actions = appendIf(actions, action{"POST", resourcePath, resourceParams, namer}, isCreater)
Expand Down Expand Up @@ -430,7 +446,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
// For ex: LIST all pods in all namespaces by sending a LIST request at /api/apiVersion/pods.
// TODO: more strongly type whether a resource allows these actions on "all namespaces" (bulk delete)
if !hasSubresource {
namer = scopeNaming{scope, a.group.Linker, gpath.Join(a.prefix, itemPath), true}
namer = scopeNaming{scope, a.group.Linker, itemPathFn, true}
actions = appendIf(actions, action{"LIST", resource, params, namer}, isLister)
actions = appendIf(actions, action{"WATCHLIST", "watch/" + resource, params, namer}, allowWatchList)
}
Expand Down Expand Up @@ -775,7 +791,7 @@ func (n rootScopeNaming) ObjectName(obj runtime.Object) (namespace, name string,
type scopeNaming struct {
scope meta.RESTScope
runtime.SelfLinker
itemPath string
itemPathFn func(name, namespace string) string
allNamespaces bool
}

Expand Down Expand Up @@ -822,9 +838,8 @@ func (n scopeNaming) GenerateLink(req *restful.Request, obj runtime.Object) (pat
if len(name) == 0 {
return "", "", errEmptyName
}
path = strings.Replace(n.itemPath, "{name}", name, 1)
path = strings.Replace(path, "{"+n.scope.ArgumentName()+"}", namespace, 1)
return path, "", nil

return n.itemPathFn(name, namespace), "", nil
}

// GenerateListLink returns the appropriate path and query to locate a list by its canonical path.
Expand Down
4 changes: 3 additions & 1 deletion pkg/apiserver/api_installer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ func TestScopeNamingGenerateLink(t *testing.T) {
s := scopeNaming{
meta.RESTScopeNamespace,
selfLinker,
"/api/v1/namespaces/{namespace}/services/{name}",
func(name, namespace string) string {
return "/api/v1/namespaces/" + namespace + "/services/" + name
},
true,
}
service := &api.Service{
Expand Down
6 changes: 3 additions & 3 deletions pkg/apiserver/resthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"math/rand"
"net/http"
"net/url"
gpath "path"
"strings"
"time"

Expand Down Expand Up @@ -940,10 +939,11 @@ func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object,

// transformDecodeError adds additional information when a decode fails.
func transformDecodeError(typer runtime.ObjectTyper, baseErr error, into runtime.Object, gvk *unversioned.GroupVersionKind, body []byte) error {
objGVK, err := typer.ObjectKind(into)
objGVKs, _, err := typer.ObjectKinds(into)
if err != nil {
return err
}
objGVK := objGVKs[0]
if gvk != nil && len(gvk.Kind) > 0 {
return errors.NewBadRequest(fmt.Sprintf("%s in version %q cannot be handled as a %s: %v", gvk.Kind, gvk.Version, objGVK.Kind, baseErr))
}
Expand All @@ -962,7 +962,7 @@ func setSelfLink(obj runtime.Object, req *restful.Request, namer ScopeNamer) err

newURL := *req.Request.URL
// use only canonical paths
newURL.Path = gpath.Clean(path)
newURL.Path = path
newURL.RawQuery = query
newURL.Fragment = ""

Expand Down
42 changes: 27 additions & 15 deletions pkg/apiserver/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,31 +167,38 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
flusher.Flush()

var unknown runtime.Unknown
internalEvent := &versioned.InternalEvent{}
buf := &bytes.Buffer{}
ch := s.watching.ResultChan()
for {
select {
case <-cn.CloseNotify():
return
case <-timeoutCh:
return
case event, ok := <-s.watching.ResultChan():
case event, ok := <-ch:
if !ok {
// End of results.
return
}

obj := event.Object
s.fixup(obj)
if err := s.embeddedEncoder.EncodeToStream(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
return
}
event.Object = &runtime.Unknown{
Raw: buf.Bytes(),
// ContentType is not required here because we are defaulting to the serializer
// type
}
if err := e.Encode((*versioned.InternalEvent)(&event)); err != nil {

// ContentType is not required here because we are defaulting to the serializer
// type
unknown.Raw = buf.Bytes()
event.Object = &unknown

// the internal event will be versioned by the encoder
*internalEvent = versioned.InternalEvent(event)
if err := e.Encode(internalEvent); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v (%#v)", err, e))
// client disconnect.
return
Expand All @@ -208,14 +215,18 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
defer ws.Close()
done := make(chan struct{})
go wsstream.IgnoreReceives(ws, 0)

var unknown runtime.Unknown
internalEvent := &versioned.InternalEvent{}
buf := &bytes.Buffer{}
streamBuf := &bytes.Buffer{}
ch := s.watching.ResultChan()
for {
select {
case <-done:
s.watching.Stop()
return
case event, ok := <-s.watching.ResultChan():
case event, ok := <-ch:
if !ok {
// End of results.
return
Expand All @@ -227,14 +238,15 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
return
}
event.Object = &runtime.Unknown{
Raw: buf.Bytes(),
// ContentType is not required here because we are defaulting to the serializer
// type
}

// ContentType is not required here because we are defaulting to the serializer
// type
unknown.Raw = buf.Bytes()
event.Object = &unknown

// the internal event will be versioned by the encoder
internalEvent := versioned.InternalEvent(event)
if err := s.encoder.EncodeToStream(&internalEvent, streamBuf); err != nil {
*internalEvent = versioned.InternalEvent(event)
if err := s.encoder.EncodeToStream(internalEvent, streamBuf); err != nil {
// encoding error
utilruntime.HandleError(fmt.Errorf("unable to encode event: %v", err))
s.watching.Stop()
Expand Down
1 change: 1 addition & 0 deletions pkg/apiserver/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ func benchmarkItems() []api.Pod {
items := make([]api.Pod, 3)
for i := range items {
apiObjectFuzzer.Fuzz(&items[i])
items[i].Spec.InitContainers, items[i].Status.InitContainerStatuses = nil, nil
}
return items
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/testing/core/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,11 +210,11 @@ func (o objects) Kind(kind unversioned.GroupVersionKind, name string) (runtime.O
}

func (o objects) Add(obj runtime.Object) error {
gvk, err := o.scheme.ObjectKind(obj)
gvks, _, err := o.scheme.ObjectKinds(obj)
if err != nil {
return err
}
kind := gvk.Kind
kind := gvks[0].Kind

switch {
case meta.IsListType(obj):
Expand Down