Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use etcd as backend for minion registry. #1478

Merged
merged 3 commits into from
Oct 7, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,13 @@ func makeMinionRegistry(c *Config) minion.Registry {
}
}
if minionRegistry == nil {
minionRegistry = minion.NewRegistry(c.Minions, c.NodeResources)
minionRegistry = etcd.NewRegistry(c.EtcdHelper, nil)
for _, minionID := range c.Minions {
minionRegistry.CreateMinion(nil, &api.Minion{
TypeMeta: api.TypeMeta{ID: minionID},
NodeResources: c.NodeResources,
})
}
}
if c.HealthCheckMinions {
minionRegistry = minion.NewHealthyRegistry(minionRegistry, &http.Client{})
Expand Down
41 changes: 39 additions & 2 deletions pkg/registry/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ import (
// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into
// kubelet (and vice versa)

// Registry implements PodRegistry, ControllerRegistry and ServiceRegistry
// with backed by etcd.
// Registry implements PodRegistry, ControllerRegistry, ServiceRegistry and MinionRegistry, backed by etcd.
type Registry struct {
tools.EtcdHelper
manifestFactory pod.ManifestFactory
Expand Down Expand Up @@ -382,3 +381,41 @@ func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector,
}
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
}

func makeMinionKey(minionID string) string {
return "/registry/minions/" + minionID
}

func (r *Registry) ListMinions(ctx api.Context) (*api.MinionList, error) {
minions := &api.MinionList{}
err := r.ExtractList("/registry/minions", &minions.Items, &minions.ResourceVersion)
return minions, err
}

func (r *Registry) CreateMinion(ctx api.Context, minion *api.Minion) error {
// TODO: Add some validations.
err := r.CreateObj(makeMinionKey(minion.ID), minion, 0)
return etcderr.InterpretCreateError(err, "minion", minion.ID)
}

func (r *Registry) ContainsMinion(ctx api.Context, minionID string) (bool, error) {
var minion api.Minion
key := makeMinionKey(minionID)
err := r.ExtractObj(key, &minion, false)
if err == nil {
return true, nil
} else if tools.IsEtcdNotFound(err) {
return false, nil
} else {
return false, etcderr.InterpretGetError(err, "minion", minion.ID)
}
}

func (r *Registry) DeleteMinion(ctx api.Context, minionID string) error {
key := makeMinionKey(minionID)
err := r.Delete(key, true)
if err != nil {
return etcderr.InterpretDeleteError(err, "minion", minionID)
}
return nil
}
115 changes: 115 additions & 0 deletions pkg/registry/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,6 +1019,121 @@ func TestEtcdWatchEndpointsBadSelector(t *testing.T) {
}
}

func TestEtcdListMinions(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
key := "/registry/minions"
fakeClient.Data[key] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Minion{
TypeMeta: api.TypeMeta{ID: "foo"},
}),
},
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Minion{
TypeMeta: api.TypeMeta{ID: "bar"},
}),
},
},
},
},
E: nil,
}
registry := NewTestEtcdRegistry(fakeClient)
minions, err := registry.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

if len(minions.Items) != 2 || minions.Items[0].ID != "foo" || minions.Items[1].ID != "bar" {
t.Errorf("Unexpected minion list: %#v", minions)
}
}

func TestEtcdCreateMinion(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.CreateMinion(ctx, &api.Minion{
TypeMeta: api.TypeMeta{ID: "foo"},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}

resp, err := fakeClient.Get("/registry/minions/foo", false, false)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

var minion api.Minion
err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &minion)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

if minion.ID != "foo" {
t.Errorf("Unexpected minion: %#v %s", minion, resp.Node.Value)
}
}

func TestEtcdContainsMinion(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Set("/registry/minions/foo", runtime.EncodeOrDie(latest.Codec, &api.Minion{TypeMeta: api.TypeMeta{ID: "foo"}}), 0)
registry := NewTestEtcdRegistry(fakeClient)
contains, err := registry.ContainsMinion(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}

if contains == false {
t.Errorf("Expected true, but got false")
}
}

func TestEtcdContainsMinionNotFound(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Data["/registry/minions/foo"] = tools.EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: tools.EtcdErrorNotFound,
}
registry := NewTestEtcdRegistry(fakeClient)
contains, err := registry.ContainsMinion(ctx, "foo")

if err != nil {
t.Errorf("unexpected error: %v", err)
}

if contains == true {
t.Errorf("Expected false, but got true")
}
}

func TestEtcdDeleteMinion(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient)
err := registry.DeleteMinion(ctx, "foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}

if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
}
key := "/registry/minions/foo"
if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
}

// TODO We need a test for the compare and swap behavior. This basically requires two things:
// 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that
// channel, this will enable us to orchestrate the flow of etcd requests in the test.
Expand Down
26 changes: 13 additions & 13 deletions pkg/registry/minion/caching_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type CachingRegistry struct {
}

func NewCachingRegistry(delegate Registry, ttl time.Duration) (Registry, error) {
list, err := delegate.List()
list, err := delegate.ListMinions(nil)
if err != nil {
return nil, err
}
Expand All @@ -57,9 +57,9 @@ func NewCachingRegistry(delegate Registry, ttl time.Duration) (Registry, error)
}, nil
}

func (r *CachingRegistry) Contains(nodeID string) (bool, error) {
func (r *CachingRegistry) ContainsMinion(ctx api.Context, nodeID string) (bool, error) {
if r.expired() {
if err := r.refresh(false); err != nil {
if err := r.refresh(ctx, false); err != nil {
return false, err
}
}
Expand All @@ -74,23 +74,23 @@ func (r *CachingRegistry) Contains(nodeID string) (bool, error) {
return false, nil
}

func (r *CachingRegistry) Delete(minion string) error {
if err := r.delegate.Delete(minion); err != nil {
func (r *CachingRegistry) DeleteMinion(ctx api.Context, nodeID string) error {
if err := r.delegate.DeleteMinion(ctx, nodeID); err != nil {
return err
}
return r.refresh(true)
return r.refresh(ctx, true)
}

func (r *CachingRegistry) Insert(minion string) error {
if err := r.delegate.Insert(minion); err != nil {
func (r *CachingRegistry) CreateMinion(ctx api.Context, minion *api.Minion) error {
if err := r.delegate.CreateMinion(ctx, minion); err != nil {
return err
}
return r.refresh(true)
return r.refresh(ctx, true)
}

func (r *CachingRegistry) List() (*api.MinionList, error) {
func (r *CachingRegistry) ListMinions(ctx api.Context) (*api.MinionList, error) {
if r.expired() {
if err := r.refresh(false); err != nil {
if err := r.refresh(ctx, false); err != nil {
return r.nodes, err
}
}
Expand All @@ -105,12 +105,12 @@ func (r *CachingRegistry) expired() bool {

// refresh updates the current store. It double checks expired under lock with the assumption
// of optimistic concurrency with the other functions.
func (r *CachingRegistry) refresh(force bool) error {
func (r *CachingRegistry) refresh(ctx api.Context, force bool) error {
r.lock.Lock()
defer r.lock.Unlock()
if force || r.expired() {
var err error
r.nodes, err = r.delegate.List()
r.nodes, err = r.delegate.ListMinions(ctx)
time := r.clock.Now()
atomic.SwapInt64(&r.lastUpdate, time.Unix())
return err
Expand Down
35 changes: 21 additions & 14 deletions pkg/registry/minion/caching_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)

Expand All @@ -33,19 +34,20 @@ func (f *fakeClock) Now() time.Time {
}

func TestCachingHit(t *testing.T) {
ctx := api.NewContext()
fakeClock := fakeClock{
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}, api.NodeResources{})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}, api.NodeResources{})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
lastUpdate: fakeClock.Now().Unix(),
nodes: expected,
}
list, err := cache.List()
list, err := cache.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand All @@ -55,11 +57,12 @@ func TestCachingHit(t *testing.T) {
}

func TestCachingMiss(t *testing.T) {
ctx := api.NewContext()
fakeClock := fakeClock{
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}, api.NodeResources{})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}, api.NodeResources{})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
Expand All @@ -68,7 +71,7 @@ func TestCachingMiss(t *testing.T) {
nodes: expected,
}
fakeClock.now = time.Unix(3, 0)
list, err := cache.List()
list, err := cache.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand All @@ -78,23 +81,26 @@ func TestCachingMiss(t *testing.T) {
}

func TestCachingInsert(t *testing.T) {
ctx := api.NewContext()
fakeClock := fakeClock{
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}, api.NodeResources{})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}, api.NodeResources{})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
lastUpdate: fakeClock.Now().Unix(),
nodes: expected,
}
err := cache.Insert("foo")
err := cache.CreateMinion(ctx, &api.Minion{
TypeMeta: api.TypeMeta{ID: "foo"},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
list, err := cache.List()
list, err := cache.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand All @@ -104,23 +110,24 @@ func TestCachingInsert(t *testing.T) {
}

func TestCachingDelete(t *testing.T) {
ctx := api.NewContext()
fakeClock := fakeClock{
now: time.Unix(0, 0),
}
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"})
fakeRegistry := registrytest.NewMinionRegistry([]string{"m1", "m2"}, api.NodeResources{})
expected := registrytest.MakeMinionList([]string{"m1", "m2", "m3"}, api.NodeResources{})
cache := CachingRegistry{
delegate: fakeRegistry,
ttl: 1 * time.Second,
clock: &fakeClock,
lastUpdate: fakeClock.Now().Unix(),
nodes: expected,
}
err := cache.Delete("m2")
err := cache.DeleteMinion(ctx, "m2")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
list, err := cache.List()
list, err := cache.ListMinions(ctx)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down