Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ReplicaSet client. #19746

Merged
merged 1 commit into from
Jan 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
56 changes: 56 additions & 0 deletions pkg/client/cache/listers.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,62 @@ func (s *StoreToDeploymentLister) GetDeploymentsForRC(rc *api.ReplicationControl
return
}

// StoreToReplicaSetLister gives a store List and Exists methods. The store must contain only ReplicaSets.
type StoreToReplicaSetLister struct {
Store
}

// Exists checks if the given ReplicaSet exists in the store.
func (s *StoreToReplicaSetLister) Exists(rs *extensions.ReplicaSet) (bool, error) {
_, exists, err := s.Store.Get(rs)
if err != nil {
return false, err
}
return exists, nil
}

// List lists all ReplicaSets in the store.
// TODO: converge on the interface in pkg/client
func (s *StoreToReplicaSetLister) List() (rss []extensions.ReplicaSet, err error) {
for _, rs := range s.Store.List() {
rss = append(rss, *(rs.(*extensions.ReplicaSet)))
}
return rss, nil
}

// GetPodReplicaSets returns a list of ReplicaSets managing a pod. Returns an error only if no matching ReplicaSets are found.
func (s *StoreToReplicaSetLister) GetPodReplicaSets(pod *api.Pod) (rss []extensions.ReplicaSet, err error) {
var selector labels.Selector
var rs extensions.ReplicaSet

if len(pod.Labels) == 0 {
err = fmt.Errorf("no ReplicaSets found for pod %v because it has no labels", pod.Name)
return
}

for _, m := range s.Store.List() {
rs = *m.(*extensions.ReplicaSet)
if rs.Namespace != pod.Namespace {
continue
}
selector, err = extensions.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
err = fmt.Errorf("failed to convert pod selector to selector: %v", err)
return
}

// If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
continue
}
rss = append(rss, rs)
}
if len(rss) == 0 {
err = fmt.Errorf("could not find ReplicaSet for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
}
return
}

// StoreToDaemonSetLister gives a store List and Exists methods. The store must contain only DaemonSets.
type StoreToDaemonSetLister struct {
Store
Expand Down
112 changes: 112 additions & 0 deletions pkg/client/cache/listers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,118 @@ func TestStoreToReplicationControllerLister(t *testing.T) {
}
}

func TestStoreToReplicaSetLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
lister := StoreToReplicaSetLister{store}
testCases := []struct {
inRSs []*extensions.ReplicaSet
list func() ([]extensions.ReplicaSet, error)
outRSNames sets.String
expectErr bool
}{
// Basic listing with all labels and no selectors
{
inRSs: []*extensions.ReplicaSet{
{ObjectMeta: api.ObjectMeta{Name: "basic"}},
},
list: func() ([]extensions.ReplicaSet, error) {
return lister.List()
},
outRSNames: sets.NewString("basic"),
},
// No pod labels
{
inRSs: []*extensions.ReplicaSet{
{
ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"},
Spec: extensions.ReplicaSetSpec{
Selector: &extensions.LabelSelector{MatchLabels: map[string]string{"foo": "baz"}},
},
},
},
list: func() ([]extensions.ReplicaSet, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "pod1", Namespace: "ns"},
}
return lister.GetPodReplicaSets(pod)
},
outRSNames: sets.NewString(),
expectErr: true,
},
// No ReplicaSet selectors
{
inRSs: []*extensions.ReplicaSet{
{
ObjectMeta: api.ObjectMeta{Name: "basic", Namespace: "ns"},
},
},
list: func() ([]extensions.ReplicaSet, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
Namespace: "ns",
Labels: map[string]string{"foo": "bar"},
},
}
return lister.GetPodReplicaSets(pod)
},
outRSNames: sets.NewString(),
expectErr: true,
},
// Matching labels to selectors and namespace
{
inRSs: []*extensions.ReplicaSet{
{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: extensions.ReplicaSetSpec{
Selector: &extensions.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: "ns"},
Spec: extensions.ReplicaSetSpec{
Selector: &extensions.LabelSelector{MatchLabels: map[string]string{"foo": "bar"}},
},
},
},
list: func() ([]extensions.ReplicaSet, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
Labels: map[string]string{"foo": "bar"},
Namespace: "ns",
},
}
return lister.GetPodReplicaSets(pod)
},
outRSNames: sets.NewString("bar"),
},
}
for _, c := range testCases {
for _, r := range c.inRSs {
store.Add(r)
}

gotRSs, err := c.list()
if err != nil && c.expectErr {
continue
} else if c.expectErr {
t.Error("Expected error, got none")
continue
} else if err != nil {
t.Errorf("Unexpected error %#v", err)
continue
}
gotNames := make([]string, len(gotRSs))
for ix := range gotRSs {
gotNames[ix] = gotRSs[ix].Name
}
if !c.outRSNames.HasAll(gotNames...) || len(gotNames) != len(c.outRSNames) {
t.Errorf("Unexpected got ReplicaSets %+v expected %+v", gotNames, c.outRSNames)
}
}
}

func TestStoreToDaemonSetLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
lister := StoreToDaemonSetLister{store}
Expand Down
23 changes: 23 additions & 0 deletions pkg/client/unversioned/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,29 @@ func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationContro
}
}

// ReplicaSetHasDesiredReplicas returns a condition that will be true if and only if
// the desired replica count for a ReplicaSet's ReplicaSelector equals the Replicas count.
func ReplicaSetHasDesiredReplicas(c ExtensionsInterface, replicaSet *extensions.ReplicaSet) wait.ConditionFunc {

// If we're given a ReplicaSet where the status lags the spec, it either means that the
// ReplicaSet is stale, or that the ReplicaSet manager hasn't noticed the update yet.
// Polling status.Replicas is not safe in the latter case.
desiredGeneration := replicaSet.Generation

return func() (bool, error) {
rs, err := c.ReplicaSets(replicaSet.Namespace).Get(replicaSet.Name)
if err != nil {
return false, err
}
// There's a chance a concurrent update modifies the Spec.Replicas causing this check to
// pass, or, after this check has passed, a modification causes the ReplicaSet manager to
// create more pods. This will not be an issue once we've implemented graceful delete for
// ReplicaSets, but till then concurrent stop operations on the same ReplicaSet might have
// unintended side effects.
return rs.Status.ObservedGeneration >= desiredGeneration && rs.Status.Replicas == rs.Spec.Replicas, nil
}
}

// JobHasDesiredParallelism returns a condition that will be true if the desired parallelism count
// for a job equals the current active counts or is less by an appropriate successful/unsuccessful count.
func JobHasDesiredParallelism(c ExtensionsInterface, job *extensions.Job) wait.ConditionFunc {
Expand Down
5 changes: 5 additions & 0 deletions pkg/client/unversioned/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type ExtensionsInterface interface {
JobsNamespacer
IngressNamespacer
ThirdPartyResourceNamespacer
ReplicaSetsNamespacer
}

// ExtensionsClient is used to interact with experimental Kubernetes features.
Expand Down Expand Up @@ -71,6 +72,10 @@ func (c *ExtensionsClient) ThirdPartyResources(namespace string) ThirdPartyResou
return newThirdPartyResources(c, namespace)
}

func (c *ExtensionsClient) ReplicaSets(namespace string) ReplicaSetInterface {
return newReplicaSets(c, namespace)
}

// NewExtensions creates a new ExtensionsClient for the given config. This client
// provides access to experimental Kubernetes features.
// Features of Extensions group are not supported and may be changed or removed in
Expand Down
100 changes: 100 additions & 0 deletions pkg/client/unversioned/replica_sets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package unversioned

import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/watch"
)

// ReplicaSetsNamespacer has methods to work with ReplicaSet resources in a namespace
type ReplicaSetsNamespacer interface {
ReplicaSets(namespace string) ReplicaSetInterface
}

// ReplicaSetInterface has methods to work with ReplicaSet resources.
type ReplicaSetInterface interface {
List(opts api.ListOptions) (*extensions.ReplicaSetList, error)
Get(name string) (*extensions.ReplicaSet, error)
Create(ctrl *extensions.ReplicaSet) (*extensions.ReplicaSet, error)
Update(ctrl *extensions.ReplicaSet) (*extensions.ReplicaSet, error)
UpdateStatus(ctrl *extensions.ReplicaSet) (*extensions.ReplicaSet, error)
Delete(name string, options *api.DeleteOptions) error
Watch(opts api.ListOptions) (watch.Interface, error)
}

// replicaSets implements ReplicaSetsNamespacer interface
type replicaSets struct {
client *ExtensionsClient
ns string
}

// newReplicaSets returns a ReplicaSetClient
func newReplicaSets(c *ExtensionsClient, namespace string) *replicaSets {
return &replicaSets{c, namespace}
}

// List takes a selector, and returns the list of ReplicaSets that match that selector.
func (c *replicaSets) List(opts api.ListOptions) (result *extensions.ReplicaSetList, err error) {
result = &extensions.ReplicaSetList{}
err = c.client.Get().Namespace(c.ns).Resource("replicasets").VersionedParams(&opts, api.Scheme).Do().Into(result)
return
}

// Get returns information about a particular ReplicaSet.
func (c *replicaSets) Get(name string) (result *extensions.ReplicaSet, err error) {
result = &extensions.ReplicaSet{}
err = c.client.Get().Namespace(c.ns).Resource("replicasets").Name(name).Do().Into(result)
return
}

// Create creates a new ReplicaSet.
func (c *replicaSets) Create(rs *extensions.ReplicaSet) (result *extensions.ReplicaSet, err error) {
result = &extensions.ReplicaSet{}
err = c.client.Post().Namespace(c.ns).Resource("replicasets").Body(rs).Do().Into(result)
return
}

// Update updates an existing ReplicaSet.
func (c *replicaSets) Update(rs *extensions.ReplicaSet) (result *extensions.ReplicaSet, err error) {
result = &extensions.ReplicaSet{}
err = c.client.Put().Namespace(c.ns).Resource("replicasets").Name(rs.Name).Body(rs).Do().Into(result)
return
}

// UpdateStatus updates an existing ReplicaSet status
func (c *replicaSets) UpdateStatus(rs *extensions.ReplicaSet) (result *extensions.ReplicaSet, err error) {
result = &extensions.ReplicaSet{}
err = c.client.Put().Namespace(c.ns).Resource("replicasets").Name(rs.Name).SubResource("status").Body(rs).Do().Into(result)
return
}

// Delete deletes an existing ReplicaSet.
func (c *replicaSets) Delete(name string, options *api.DeleteOptions) (err error) {
return c.client.Delete().Namespace(c.ns).Resource("replicasets").Name(name).Body(options).Do().Error()
}

// Watch returns a watch.Interface that watches the requested ReplicaSets.
func (c *replicaSets) Watch(opts api.ListOptions) (watch.Interface, error) {
return c.client.Get().
Prefix("watch").
Namespace(c.ns).
Resource("replicasets").
VersionedParams(&opts, api.Scheme).
Watch()
}