Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Event Registry/REST types #1700

Merged
merged 6 commits into from
Oct 13, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 8 additions & 6 deletions cmd/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,18 @@ import (
var (
port = flag.Uint("port", 8080, "The port to listen on. Default 8080")
address = util.IP(net.ParseIP("127.0.0.1"))
apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'")
apiPrefix = flag.String("api_prefix", "/api", "The prefix for API requests on the server. Default '/api'.")
storageVersion = flag.String("storage_version", "", "The version to store resources with. Defaults to server preferred")
cloudProvider = flag.String("cloud_provider", "", "The provider for cloud services. Empty string for no provider.")
cloudConfigFile = flag.String("cloud_config", "", "The path to the cloud provider configuration file. Empty string for no configuration file.")
minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs")
minionRegexp = flag.String("minion_regexp", "", "If non empty, and -cloud_provider is specified, a regular expression for matching minion VMs.")
minionPort = flag.Uint("minion_port", 10250, "The port at which kubelet will be listening on the minions.")
healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true")
minionCacheTTL = flag.Duration("minion_cache_ttl", 30*time.Second, "Duration of time to cache minion information. Default 30 seconds")
tokenAuthFile = flag.String("token_auth_file", "", "If set, the file that will be used to secure the API server via token authentication")
healthCheckMinions = flag.Bool("health_check_minions", true, "If true, health check minions and filter unhealthy ones. Default true.")
minionCacheTTL = flag.Duration("minion_cache_ttl", 30*time.Second, "Duration of time to cache minion information. Default 30 seconds.")
eventTTL = flag.Duration("event_ttl", 48*time.Hour, "Amount of time to retain events. Default 2 days.")
tokenAuthFile = flag.String("token_auth_file", "", "If set, the file that will be used to secure the API server via token authentication.")
etcdServerList util.StringList
etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers")
etcdConfigFile = flag.String("etcd_config", "", "The config file for the etcd client. Mutually exclusive with -etcd_servers.")
machineList util.StringList
corsAllowedOriginList util.StringList
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow privileged containers.")
Expand Down Expand Up @@ -178,6 +179,7 @@ func main() {
HealthCheckMinions: *healthCheckMinions,
Minions: machineList,
MinionCacheTTL: *minionCacheTTL,
EventTTL: *eventTTL,
MinionRegexp: *minionRegexp,
PodInfoGetter: podInfoGetter,
NodeResources: api.NodeResources{
Expand Down
16 changes: 14 additions & 2 deletions pkg/api/testapi/testapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
)

// Version returns the API version to test against as set by the KUBE_API_VERSION env var.
// Version returns the API version to test against, as set by the KUBE_API_VERSION env var.
func Version() string {
version := os.Getenv("KUBE_API_VERSION")
if version == "" {
Expand All @@ -33,10 +33,22 @@ func Version() string {
return version
}

func CodecForVersionOrDie() runtime.Codec {
// Codec returns the codec for the API version to test against, as set by the
// KUBE_API_VERSION env var.
func Codec() runtime.Codec {
interfaces, err := latest.InterfacesFor(Version())
if err != nil {
panic(err)
}
return interfaces.Codec
}

// ResourceVersioner returns the ResourceVersioner for the API version to test against,
// as set by the KUBE_API_VERSION env var.
func ResourceVersioner() runtime.ResourceVersioner {
interfaces, err := latest.InterfacesFor(Version())
if err != nil {
panic(err)
}
return interfaces.ResourceVersioner
}
2 changes: 1 addition & 1 deletion pkg/client/restclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestDoRequest(t *testing.T) {
{Request: testRequest{Method: "GET", Path: "error"}, Response: Response{StatusCode: 500}, Error: true},
{Request: testRequest{Method: "POST", Path: "faildecode"}, Response: Response{StatusCode: 200, RawBody: &invalid}},
{Request: testRequest{Method: "GET", Path: "failread"}, Response: Response{StatusCode: 200, RawBody: &invalid}},
{Client: &Client{&RESTClient{baseURL: uri, Codec: testapi.CodecForVersionOrDie()}}, Request: testRequest{Method: "GET", Path: "nocertificate"}, Error: true},
{Client: &Client{&RESTClient{baseURL: uri, Codec: testapi.Codec()}}, Request: testRequest{Method: "GET", Path: "nocertificate"}, Error: true},
}
for _, c := range testClients {
client := c.Setup()
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/replication_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
}

func TestSyncReplicationControllerCreates(t *testing.T) {
body := runtime.EncodeOrDie(testapi.CodecForVersionOrDie(), newPodList(0))
body := runtime.EncodeOrDie(testapi.Codec(), newPodList(0))
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
Expand All @@ -170,7 +170,7 @@ func TestSyncReplicationControllerCreates(t *testing.T) {

func TestCreateReplica(t *testing.T) {
ctx := api.NewDefaultContext()
body := runtime.EncodeOrDie(testapi.CodecForVersionOrDie(), &api.Pod{})
body := runtime.EncodeOrDie(testapi.Codec(), &api.Pod{})
fakeHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
Expand Down
6 changes: 6 additions & 0 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/event"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service"
Expand All @@ -49,6 +51,7 @@ type Config struct {
HealthCheckMinions bool
Minions []string
MinionCacheTTL time.Duration
EventTTL time.Duration
MinionRegexp string
PodInfoGetter client.PodInfoGetter
NodeResources api.NodeResources
Expand All @@ -62,6 +65,7 @@ type Master struct {
endpointRegistry endpoint.Registry
minionRegistry minion.Registry
bindingRegistry binding.Registry
eventRegistry generic.Registry
storage map[string]apiserver.RESTStorage
client *client.Client
}
Expand Down Expand Up @@ -92,6 +96,7 @@ func New(c *Config) *Master {
serviceRegistry: serviceRegistry,
endpointRegistry: etcd.NewRegistry(c.EtcdHelper, nil),
bindingRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory),
eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())),
minionRegistry: minionRegistry,
client: c.Client,
}
Expand Down Expand Up @@ -147,6 +152,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
"services": service.NewREST(m.serviceRegistry, cloud, m.minionRegistry),
"endpoints": endpoint.NewREST(m.endpointRegistry),
"minions": minion.NewREST(m.minionRegistry),
"events": event.NewREST(m.eventRegistry),

// TODO: should appear only in scheduler API group.
"bindings": binding.NewREST(m.bindingRegistry),
Expand Down
19 changes: 19 additions & 0 deletions pkg/registry/event/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
Copyright 2014 Google Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package event provides Registry interface and it's REST
// implementation for storing Event api objects.
package event
58 changes: 58 additions & 0 deletions pkg/registry/event/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright 2014 Google Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package event

import (
"path"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
)

// registry implements custom changes to generic.Etcd.
type registry struct {
*etcdgeneric.Etcd
ttl uint64
}

// Create stores the object with a ttl, so that events don't stay in the system forever.
func (r registry) Create(ctx api.Context, id string, obj runtime.Object) error {
err := r.Etcd.Helper.CreateObj(r.Etcd.KeyFunc(id), obj, r.ttl)
return etcderr.InterpretCreateError(err, r.Etcd.EndpointName, id)
}

// NewEtcdRegistry returns a registry which will store Events in the given
// EtcdHelper. ttl is the time that Events will be retained by the system.
func NewEtcdRegistry(h tools.EtcdHelper, ttl uint64) generic.Registry {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My previous comment got lost. I am concerned about the etcd-ness leaking into each registry module. Shouldn't registries be defined in terms of backing store interfaces, so they can use etcd but not actually KNOW that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is phrased that way-- see generic.Registry, which makes no mention of etcd.

This function is specifically to make an etcd registry, so of course it has to mention etcd.

If we get a new FrooBobber store, then we'd have a file here with a NewFrooBobberRegistry() function.

Would you prefer that I make a pkg/registry/event/etcd directory for this to go in? I can do that if it makes you happy, it just seemed a little excessive.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caveat, I am not super familiar with this area,s till. My point is that the word and concept of etcd should never appear here.

struct registry should be defined as something like

{
    RegsitryBackingStore
    ttl int
}

Where BackingStore is an interface with functions that map to Etcd. Creating a registry should be something like:

backingStore := NewEtcdBackingStore()
....
eventRegistry:      event.NewRegistry(backingStore, uint64(c.EventTTL.Seconds())),

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That confuses me. This file is providing the thing you've called "RegsitryBackingStore". How can I make that more clear?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates an Event-specific backing store. Presumably there would be a Pod-specific backing store and a Service-specific backing store and an Enpoints-specific backing store and ...

Etcd should be mentioned once in the whole init sequence. It is the medium. Then you can define the schemas that sit atop the medium - Event, Pod, Service..

If this isn't clear, maybe it is I who just isn't getting it - come by and we can discuss after lunch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(offline convo: resolution here is that tools.EtcdHelper should be hidden behind some sort of StorageHelper interface. I'll do this in a follow-up PR at some point.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

On Mon, Oct 13, 2014 at 2:31 PM, Daniel Smith notifications@github.com
wrote:

In pkg/registry/event/registry.go:

+// registry implements custom changes to generic.Etcd.
+type registry struct {

  • *etcdgeneric.Etcd
  • ttl uint64
    +}

+// Create stores the object with a ttl, so that events don't stay in the system forever.
+func (r registry) Create(ctx api.Context, id string, obj runtime.Object) error {

  • err := r.Etcd.Helper.CreateObj(r.Etcd.KeyFunc(id), obj, r.ttl)
  • return etcderr.InterpretCreateError(err, r.Etcd.EndpointName, id)
    +}

+// NewEtcdRegistry returns a registry which will store Events in the given
+// EtcdHelper. ttl is the time that Events will be retained by the system.
+func NewEtcdRegistry(h tools.EtcdHelper, ttl uint64) generic.Registry {

(offline convo: resolution here is that tools.EtcdHelper should be hidden
behind some sort of StorageHelper interface. I'll do this in a follow-up PR
at some point.)

Reply to this email directly or view it on GitHub
https://github.com/GoogleCloudPlatform/kubernetes/pull/1700/files#r18796285
.

return registry{
Etcd: &etcdgeneric.Etcd{
NewFunc: func() runtime.Object { return &api.Event{} },
NewListFunc: func() runtime.Object { return &api.EventList{} },
EndpointName: "events",
KeyRoot: "/registry/events",
KeyFunc: func(id string) string {
return path.Join("/registry/events", id)
},
Helper: h,
},
ttl: ttl,
}
}
105 changes: 105 additions & 0 deletions pkg/registry/event/registry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
Copyright 2014 Google Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package event

import (
"reflect"
"testing"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"

"github.com/coreos/go-etcd/etcd"
)

var testTTL uint64 = 60

func NewTestEventEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) {
f := tools.NewFakeEtcdClient(t)
f.TestIndex = true
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.ResourceVersioner()}}
return f, NewEtcdRegistry(h, testTTL)
}

func TestEventCreate(t *testing.T) {
eventA := &api.Event{
TypeMeta: api.TypeMeta{ID: "foo"},
Reason: "forTesting",
}
eventB := &api.Event{
TypeMeta: api.TypeMeta{ID: "foo"},
Reason: "forTesting",
}

nodeWithEventA := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), eventA),
ModifiedIndex: 1,
CreatedIndex: 1,
TTL: int64(testTTL),
},
},
E: nil,
}

emptyNode := tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}

path := "/registry/events/foo"
key := "foo"

table := map[string]struct {
existing tools.EtcdResponseWithError
expect tools.EtcdResponseWithError
toCreate runtime.Object
errOK func(error) bool
}{
"normal": {
existing: emptyNode,
expect: nodeWithEventA,
toCreate: eventA,
errOK: func(err error) bool { return err == nil },
},
"preExisting": {
existing: nodeWithEventA,
expect: nodeWithEventA,
toCreate: eventB,
errOK: errors.IsAlreadyExists,
},
}

for name, item := range table {
fakeClient, registry := NewTestEventEtcdRegistry(t)
fakeClient.Data[path] = item.existing
err := registry.Create(api.NewContext(), key, item.toCreate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}

if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a))
}
}
}