Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Federation] Move annotations and related parsing code as common code #44525

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 21 additions & 17 deletions federation/apis/federation/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,36 +122,40 @@ type ClusterList struct {
Items []Cluster
}

// Temporary/alpha structures to support custom replica assignments within FederatedReplicaSet.

// A set of preferences that can be added to federated version of ReplicaSet as a json-serialized annotation.
// The preferences allow the user to express in which clusters he wants to put his replicas within the
// mentioned FederatedReplicaSet.
type FederatedReplicaSetPreferences struct {
// If set to true then already scheduled and running replicas may be moved to other clusters to
// in order to bring cluster replicasets towards a desired state. Otherwise, if set to false,
// Temporary/alpha structures to support custom replica assignments within Federated workloads.

// A set of preferences that can be added to federated version of workloads (deployments, replicasets, ..)
// as a json-serialized annotation. The preferences allow the users to express in which clusters they
// want to put their replicas within the mentioned workload objects.
type ReplicaAllocationPreferences struct {
// If set to true then already scheduled and running replicas may be moved to other clusters
// in order to match current state to the specified preferences. Otherwise, if set to false,
// up and running replicas will not be moved.
// +optional
Rebalance bool

// A mapping between cluster names and preferences regarding local ReplicaSet in these clusters.
// "*" (if provided) applies to all clusters if an explicit mapping is not provided. If there is no
// "*" that clusters without explicit preferences should not have any replicas scheduled.
// A mapping between cluster names and preferences regarding a local workload object (dep, rs, .. ) in
// these clusters.
// "*" (if provided) applies to all clusters if an explicit mapping is not provided.
// If omitted, clusters without explicit preferences should not have any replicas scheduled.
// +optional
Clusters map[string]ClusterReplicaSetPreferences
Clusters map[string]ClusterPreferences
}

// Preferences regarding number of replicas assigned to a cluster replicaset within a federated replicaset.
type ClusterReplicaSetPreferences struct {
// Minimum number of replicas that should be assigned to this Local ReplicaSet. 0 by default.
// Preferences regarding number of replicas assigned to a cluster workload object (dep, rs, ..) within
// a federated workload object.
type ClusterPreferences struct {
// Minimum number of replicas that should be assigned to this cluster workload object. 0 by default.
// +optional
MinReplicas int64

// Maximum number of replicas that should be assigned to this Local ReplicaSet. Unbounded if no value provided (default).
// Maximum number of replicas that should be assigned to this cluster workload object.
// Unbounded if no value provided (default).
// +optional
MaxReplicas *int64

// A number expressing the preference to put an additional replica to this LocalReplicaSet. 0 by default.
// A number expressing the preference to put an additional replica to this cluster workload object.
// 0 by default.
Weight int64
}

Expand Down
46 changes: 23 additions & 23 deletions federation/apis/federation/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ func RegisterDeepCopies(scheme *runtime.Scheme) error {
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_Cluster, InType: reflect.TypeOf(&Cluster{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterCondition, InType: reflect.TypeOf(&ClusterCondition{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterList, InType: reflect.TypeOf(&ClusterList{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterReplicaSetPreferences, InType: reflect.TypeOf(&ClusterReplicaSetPreferences{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterPreferences, InType: reflect.TypeOf(&ClusterPreferences{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterServiceIngress, InType: reflect.TypeOf(&ClusterServiceIngress{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterSpec, InType: reflect.TypeOf(&ClusterSpec{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ClusterStatus, InType: reflect.TypeOf(&ClusterStatus{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_FederatedReplicaSetPreferences, InType: reflect.TypeOf(&FederatedReplicaSetPreferences{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_FederatedServiceIngress, InType: reflect.TypeOf(&FederatedServiceIngress{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ReplicaAllocationPreferences, InType: reflect.TypeOf(&ReplicaAllocationPreferences{})},
conversion.GeneratedDeepCopyFunc{Fn: DeepCopy_federation_ServerAddressByClientCIDR, InType: reflect.TypeOf(&ServerAddressByClientCIDR{})},
)
}
Expand Down Expand Up @@ -99,10 +99,10 @@ func DeepCopy_federation_ClusterList(in interface{}, out interface{}, c *convers
}
}

func DeepCopy_federation_ClusterReplicaSetPreferences(in interface{}, out interface{}, c *conversion.Cloner) error {
func DeepCopy_federation_ClusterPreferences(in interface{}, out interface{}, c *conversion.Cloner) error {
{
in := in.(*ClusterReplicaSetPreferences)
out := out.(*ClusterReplicaSetPreferences)
in := in.(*ClusterPreferences)
out := out.(*ClusterPreferences)
*out = *in
if in.MaxReplicas != nil {
in, out := &in.MaxReplicas, &out.MaxReplicas
Expand Down Expand Up @@ -169,38 +169,38 @@ func DeepCopy_federation_ClusterStatus(in interface{}, out interface{}, c *conve
}
}

func DeepCopy_federation_FederatedReplicaSetPreferences(in interface{}, out interface{}, c *conversion.Cloner) error {
func DeepCopy_federation_FederatedServiceIngress(in interface{}, out interface{}, c *conversion.Cloner) error {
{
in := in.(*FederatedReplicaSetPreferences)
out := out.(*FederatedReplicaSetPreferences)
in := in.(*FederatedServiceIngress)
out := out.(*FederatedServiceIngress)
*out = *in
if in.Clusters != nil {
in, out := &in.Clusters, &out.Clusters
*out = make(map[string]ClusterReplicaSetPreferences)
for key, val := range *in {
newVal := new(ClusterReplicaSetPreferences)
if err := DeepCopy_federation_ClusterReplicaSetPreferences(&val, newVal, c); err != nil {
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]ClusterServiceIngress, len(*in))
for i := range *in {
if err := DeepCopy_federation_ClusterServiceIngress(&(*in)[i], &(*out)[i], c); err != nil {
return err
}
(*out)[key] = *newVal
}
}
return nil
}
}

func DeepCopy_federation_FederatedServiceIngress(in interface{}, out interface{}, c *conversion.Cloner) error {
func DeepCopy_federation_ReplicaAllocationPreferences(in interface{}, out interface{}, c *conversion.Cloner) error {
{
in := in.(*FederatedServiceIngress)
out := out.(*FederatedServiceIngress)
in := in.(*ReplicaAllocationPreferences)
out := out.(*ReplicaAllocationPreferences)
*out = *in
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]ClusterServiceIngress, len(*in))
for i := range *in {
if err := DeepCopy_federation_ClusterServiceIngress(&(*in)[i], &(*out)[i], c); err != nil {
if in.Clusters != nil {
in, out := &in.Clusters, &out.Clusters
*out = make(map[string]ClusterPreferences)
for key, val := range *in {
newVal := new(ClusterPreferences)
if err := DeepCopy_federation_ClusterPreferences(&val, newVal, c); err != nil {
return err
}
(*out)[key] = *newVal
}
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion federation/pkg/federation-controller/deployment/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//federation/pkg/federation-controller/util/planner:go_default_library",
"//federation/pkg/federation-controller/util/podanalyzer:go_default_library",
"//federation/pkg/federation-controller/util/replicapreferences:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
Expand Down Expand Up @@ -55,7 +56,6 @@ go_test(
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package deployment

import (
"bytes"
"encoding/json"
"fmt"
"sort"
"time"
Expand All @@ -44,6 +43,7 @@ import (
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences"
"k8s.io/kubernetes/pkg/api"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
Expand All @@ -67,21 +67,6 @@ var (
updateTimeout = 30 * time.Second
)

func parseFederationDeploymentPreference(fd *extensionsv1.Deployment) (*fed.FederatedReplicaSetPreferences, error) {
if fd.Annotations == nil {
return nil, nil
}
fdPrefString, found := fd.Annotations[FedDeploymentPreferencesAnnotation]
if !found {
return nil, nil
}
var fdPref fed.FederatedReplicaSetPreferences
if err := json.Unmarshal([]byte(fdPrefString), &fdPref); err != nil {
return nil, err
}
return &fdPref, nil
}

type DeploymentController struct {
fedClient fedclientset.Interface

Expand Down Expand Up @@ -116,8 +101,8 @@ func NewDeploymentController(federationClient fedclientset.Interface) *Deploymen
clusterDeliverer: fedutil.NewDelayingDeliverer(),
deploymentWorkQueue: workqueue.New(),
deploymentBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
defaultPlanner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{
Clusters: map[string]fed.ClusterReplicaSetPreferences{
defaultPlanner: planner.NewPlanner(&fed.ReplicaAllocationPreferences{
Clusters: map[string]fed.ClusterPreferences{
"*": {Weight: 1},
},
}),
Expand Down Expand Up @@ -372,7 +357,7 @@ func (fdc *DeploymentController) schedule(fd *extensionsv1.Deployment, clusters
// TODO: integrate real scheduler

plannerToBeUsed := fdc.defaultPlanner
fdPref, err := parseFederationDeploymentPreference(fd)
fdPref, err := replicapreferences.GetAllocationPreferences(fd, FedDeploymentPreferencesAnnotation)
if err != nil {
glog.Info("Invalid Deployment specific preference, use default. deployment: %v, err: %v", fd.Name, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"testing"
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -43,39 +42,6 @@ const (
pods = "pods"
)

func TestParseFederationDeploymentPreference(t *testing.T) {
successPrefs := []string{
`{"rebalance": true,
"clusters": {
"k8s-1": {"minReplicas": 10, "maxReplicas": 20, "weight": 2},
"*": {"weight": 1}
}}`,
}
failedPrefes := []string{
`{`, // bad json
}

rs := newDeploymentWithReplicas("d-1", 100)
accessor, _ := meta.Accessor(rs)
anno := accessor.GetAnnotations()
if anno == nil {
anno = make(map[string]string)
accessor.SetAnnotations(anno)
}
for _, prefString := range successPrefs {
anno[FedDeploymentPreferencesAnnotation] = prefString
pref, err := parseFederationDeploymentPreference(rs)
assert.NotNil(t, pref)
assert.Nil(t, err)
}
for _, prefString := range failedPrefes {
anno[FedDeploymentPreferencesAnnotation] = prefString
pref, err := parseFederationDeploymentPreference(rs)
assert.Nil(t, pref)
assert.NotNil(t, err)
}
}

func TestDeploymentController(t *testing.T) {
flag.Set("logtostderr", "true")
flag.Set("v", "5")
Expand Down
2 changes: 1 addition & 1 deletion federation/pkg/federation-controller/replicaset/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//federation/pkg/federation-controller/util/eventsink:go_default_library",
"//federation/pkg/federation-controller/util/planner:go_default_library",
"//federation/pkg/federation-controller/util/podanalyzer:go_default_library",
"//federation/pkg/federation-controller/util/replicapreferences:go_default_library",
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/apis/extensions/v1beta1:go_default_library",
Expand Down Expand Up @@ -55,7 +56,6 @@ go_test(
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/testing:go_default_library",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package replicaset

import (
"bytes"
"encoding/json"
"fmt"
"sort"
"time"
Expand All @@ -44,6 +43,7 @@ import (
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/planner"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/podanalyzer"
"k8s.io/kubernetes/federation/pkg/federation-controller/util/replicapreferences"
"k8s.io/kubernetes/pkg/api"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
Expand All @@ -67,21 +67,6 @@ var (
updateTimeout = 30 * time.Second
)

func parseFederationReplicaSetReference(frs *extensionsv1.ReplicaSet) (*fed.FederatedReplicaSetPreferences, error) {
if frs.Annotations == nil {
return nil, nil
}
frsPrefString, found := frs.Annotations[FedReplicaSetPreferencesAnnotation]
if !found {
return nil, nil
}
var frsPref fed.FederatedReplicaSetPreferences
if err := json.Unmarshal([]byte(frsPrefString), &frsPref); err != nil {
return nil, err
}
return &frsPref, nil
}

type ReplicaSetController struct {
fedClient fedclientset.Interface

Expand Down Expand Up @@ -118,8 +103,8 @@ func NewReplicaSetController(federationClient fedclientset.Interface) *ReplicaSe
clusterDeliverer: fedutil.NewDelayingDeliverer(),
replicasetWorkQueue: workqueue.New(),
replicaSetBackoff: flowcontrol.NewBackOff(5*time.Second, time.Minute),
defaultPlanner: planner.NewPlanner(&fed.FederatedReplicaSetPreferences{
Clusters: map[string]fed.ClusterReplicaSetPreferences{
defaultPlanner: planner.NewPlanner(&fed.ReplicaAllocationPreferences{
Clusters: map[string]fed.ClusterPreferences{
"*": {Weight: 1},
},
}),
Expand Down Expand Up @@ -352,7 +337,7 @@ func (frsc *ReplicaSetController) schedule(frs *extensionsv1.ReplicaSet, cluster
// TODO: integrate real scheduler

plnr := frsc.defaultPlanner
frsPref, err := parseFederationReplicaSetReference(frs)
frsPref, err := replicapreferences.GetAllocationPreferences(frs, FedReplicaSetPreferencesAnnotation)
if err != nil {
glog.Info("Invalid ReplicaSet specific preference, use default. rs: %v, err: %v", frs, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"testing"
"time"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
core "k8s.io/client-go/testing"
Expand All @@ -44,39 +43,6 @@ const (
k8s2 = "k8s-2"
)

func TestParseFederationReplicaSetReference(t *testing.T) {
successPrefs := []string{
`{"rebalance": true,
"clusters": {
"k8s-1": {"minReplicas": 10, "maxReplicas": 20, "weight": 2},
"*": {"weight": 1}
}}`,
}
failedPrefes := []string{
`{`, // bad json
}

rs := newReplicaSetWithReplicas("rs-1", 100)
accessor, _ := meta.Accessor(rs)
anno := accessor.GetAnnotations()
if anno == nil {
anno = make(map[string]string)
accessor.SetAnnotations(anno)
}
for _, prefString := range successPrefs {
anno[FedReplicaSetPreferencesAnnotation] = prefString
pref, err := parseFederationReplicaSetReference(rs)
assert.NotNil(t, pref)
assert.Nil(t, err)
}
for _, prefString := range failedPrefes {
anno[FedReplicaSetPreferencesAnnotation] = prefString
pref, err := parseFederationReplicaSetReference(rs)
assert.Nil(t, pref)
assert.NotNil(t, err)
}
}

func TestReplicaSetController(t *testing.T) {
flag.Set("logtostderr", "true")
flag.Set("v", "5")
Expand Down
1 change: 1 addition & 0 deletions federation/pkg/federation-controller/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ filegroup(
"//federation/pkg/federation-controller/util/finalizers:all-srcs",
"//federation/pkg/federation-controller/util/planner:all-srcs",
"//federation/pkg/federation-controller/util/podanalyzer:all-srcs",
"//federation/pkg/federation-controller/util/replicapreferences:all-srcs",
"//federation/pkg/federation-controller/util/test:all-srcs",
],
tags = ["automanaged"],
Expand Down