Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor controller as posthook when configured and enabled #33841

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 21 additions & 0 deletions pkg/genericapiserver/tunneler.go
Expand Up @@ -17,8 +17,10 @@ limitations under the License.
package genericapiserver

import (
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"os"
"sync/atomic"
Expand All @@ -45,6 +47,25 @@ type Tunneler interface {
SecondsSinceSSHKeySync() int64
}

// TunnelSyncHealthChecker returns a health func that indicates if a tunneler is healthy.
// It's compatible with healthz.NamedCheck
func TunnelSyncHealthChecker(tunneler Tunneler) func(req *http.Request) error {
return func(req *http.Request) error {
if tunneler == nil {
return nil
}
lag := tunneler.SecondsSinceSync()
if lag > 600 {
return fmt.Errorf("Tunnel sync is taking to long: %d", lag)
}
sshKeyLag := tunneler.SecondsSinceSSHKeySync()
if sshKeyLag > 600 {
return fmt.Errorf("SSHKey sync is taking to long: %d", sshKeyLag)
}
return nil
}
}

type SSHTunneler struct {
// Important: Since these two int64 fields are using sync/atomic, they have to be at the top of the struct due to a bug on 32-bit platforms
// See: https://golang.org/pkg/sync/atomic/ for more information
Expand Down
29 changes: 29 additions & 0 deletions pkg/genericapiserver/tunneler_test.go
Expand Up @@ -18,6 +18,7 @@ package genericapiserver

import (
"fmt"
"net"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -104,3 +105,31 @@ func TestGenerateSSHKey(t *testing.T) {

// TODO: testing error cases where the file can not be removed?
}

type FakeTunneler struct {
SecondsSinceSyncValue int64
SecondsSinceSSHKeySyncValue int64
}

func (t *FakeTunneler) Run(AddressFunc) {}
func (t *FakeTunneler) Stop() {}
func (t *FakeTunneler) Dial(net, addr string) (net.Conn, error) { return nil, nil }
func (t *FakeTunneler) SecondsSinceSync() int64 { return t.SecondsSinceSyncValue }
func (t *FakeTunneler) SecondsSinceSSHKeySync() int64 { return t.SecondsSinceSSHKeySyncValue }

// TestIsTunnelSyncHealthy verifies that the 600 second lag test
// is honored.
func TestIsTunnelSyncHealthy(t *testing.T) {
tunneler := &FakeTunneler{}

// Pass case: 540 second lag
tunneler.SecondsSinceSyncValue = 540
healthFn := TunnelSyncHealthChecker(tunneler)
err := healthFn(nil)
assert.NoError(t, err, "IsTunnelSyncHealthy() should not have returned an error.")

// Fail case: 720 second lag
tunneler.SecondsSinceSyncValue = 720
err = healthFn(nil)
assert.Error(t, err, "IsTunnelSyncHealthy() should have returned an error.")
}
62 changes: 51 additions & 11 deletions pkg/master/controller.go
Expand Up @@ -26,9 +26,11 @@ import (
"k8s.io/kubernetes/pkg/api/endpoints"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/rest"
"k8s.io/kubernetes/pkg/registry/core/endpoint"
coreclient "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/unversioned"
"k8s.io/kubernetes/pkg/genericapiserver"
"k8s.io/kubernetes/pkg/registry/core/namespace"
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
corerest "k8s.io/kubernetes/pkg/registry/core/rest"
"k8s.io/kubernetes/pkg/registry/core/service"
servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller"
Expand Down Expand Up @@ -72,6 +74,42 @@ type Controller struct {
runner *async.Runner
}

// NewBootstrapController returns a controller for watching the core capabilities of the master
func (c *Config) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage) *Controller {
return &Controller{
NamespaceRegistry: legacyRESTStorage.NamespaceRegistry,
ServiceRegistry: legacyRESTStorage.ServiceRegistry,

EndpointReconciler: c.EndpointReconcilerConfig.Reconciler,
EndpointInterval: c.EndpointReconcilerConfig.Interval,

SystemNamespaces: []string{api.NamespaceSystem},
SystemNamespacesInterval: 1 * time.Minute,

ServiceClusterIPRegistry: legacyRESTStorage.ServiceClusterIPAllocator,
ServiceClusterIPRange: c.GenericConfig.ServiceClusterIPRange,
ServiceClusterIPInterval: 3 * time.Minute,

ServiceNodePortRegistry: legacyRESTStorage.ServiceNodePortAllocator,
ServiceNodePortRange: c.GenericConfig.ServiceNodePortRange,
ServiceNodePortInterval: 3 * time.Minute,

PublicIP: c.GenericConfig.PublicAddress,

ServiceIP: c.GenericConfig.ServiceReadWriteIP,
ServicePort: c.GenericConfig.ServiceReadWritePort,
ExtraServicePorts: c.GenericConfig.ExtraServicePorts,
ExtraEndpointPorts: c.GenericConfig.ExtraEndpointPorts,
PublicServicePort: c.GenericConfig.ReadWritePort,
KubernetesServiceNodePort: c.GenericConfig.KubernetesServiceNodePort,
}
}

func (c *Controller) PostStartHook(hookContext genericapiserver.PostStartHookContext) error {
c.Start()
return nil
}

// Start begins the core controller loops that must exist for bootstrapping
// a cluster.
func (c *Controller) Start() {
Expand Down Expand Up @@ -257,18 +295,18 @@ type EndpointReconciler interface {
// masterCountEndpointReconciler reconciles endpoints based on a specified expected number of
// masters. masterCountEndpointReconciler implements EndpointReconciler.
type masterCountEndpointReconciler struct {
masterCount int
endpointRegistry endpoint.Registry
masterCount int
endpointClient coreclient.EndpointsGetter
}

var _ EndpointReconciler = &masterCountEndpointReconciler{}

// NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a
// specified expected number of masters.
func NewMasterCountEndpointReconciler(masterCount int, endpointRegistry endpoint.Registry) *masterCountEndpointReconciler {
func NewMasterCountEndpointReconciler(masterCount int, endpointClient coreclient.EndpointsGetter) *masterCountEndpointReconciler {
return &masterCountEndpointReconciler{
masterCount: masterCount,
endpointRegistry: endpointRegistry,
masterCount: masterCount,
endpointClient: endpointClient,
}
}

Expand All @@ -285,8 +323,7 @@ func NewMasterCountEndpointReconciler(masterCount int, endpointRegistry endpoint
// to be running (c.masterCount).
// * ReconcileEndpoints is called periodically from all apiservers.
func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []api.EndpointPort, reconcilePorts bool) error {
ctx := api.NewDefaultContext()
e, err := r.endpointRegistry.GetEndpoints(ctx, serviceName)
e, err := r.endpointClient.Endpoints(api.NamespaceDefault).Get(serviceName)
if err != nil {
e = &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Expand All @@ -301,7 +338,8 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
Addresses: []api.EndpointAddress{{IP: ip.String()}},
Ports: endpointPorts,
}}
return r.endpointRegistry.UpdateEndpoints(ctx, e)
_, err = r.endpointClient.Endpoints(api.NamespaceDefault).Create(e)
return err
}

// First, determine if the endpoint is in the format we expect (one
Expand All @@ -314,7 +352,8 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
Ports: endpointPorts,
}}
glog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e)
return r.endpointRegistry.UpdateEndpoints(ctx, e)
_, err = r.endpointClient.Endpoints(api.NamespaceDefault).Update(e)
return err
}
if ipCorrect && portsCorrect {
return nil
Expand Down Expand Up @@ -349,7 +388,8 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
e.Subsets[0].Ports = endpointPorts
}
glog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
return r.endpointRegistry.UpdateEndpoints(ctx, e)
_, err = r.endpointClient.Endpoints(api.NamespaceDefault).Update(e)
return err
}

// Determine if the endpoint is in the format ReconcileEndpoints expects.
Expand Down
98 changes: 79 additions & 19 deletions pkg/master/controller_test.go
Expand Up @@ -23,6 +23,8 @@ import (
"testing"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/util/intstr"
)
Expand All @@ -40,14 +42,15 @@ func TestReconcileEndpoints(t *testing.T) {
additionalMasters int
endpoints *api.EndpointsList
expectUpdate *api.Endpoints // nil means none expected
expectCreate *api.Endpoints // nil means none expected
}{
{
testName: "no existing endpoints",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: nil,
expectUpdate: &api.Endpoints{
expectCreate: &api.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Expand Down Expand Up @@ -222,7 +225,7 @@ func TestReconcileEndpoints(t *testing.T) {
}},
}},
},
expectUpdate: &api.Endpoints{
expectCreate: &api.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Expand Down Expand Up @@ -371,24 +374,52 @@ func TestReconcileEndpoints(t *testing.T) {
},
}
for _, test := range reconcile_tests {
registry := &registrytest.EndpointRegistry{
Endpoints: test.endpoints,
fakeClient := fake.NewSimpleClientset()
if test.endpoints != nil {
fakeClient = fake.NewSimpleClientset(test.endpoints)
}
reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, registry)
reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.Core())
err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}

updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() != "update" {
continue
}
updates = append(updates, action.(core.UpdateAction))
}
if test.expectUpdate != nil {
if len(registry.Updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates)
} else if e, a := test.expectUpdate, &registry.Updates[0]; !reflect.DeepEqual(e, a) {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else if e, a := test.expectUpdate, updates[0].GetObject(); !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectUpdate == nil && len(registry.Updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates)
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}

creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() != "create" {
continue
}
creates = append(creates, action.(core.CreateAction))
}
if test.expectCreate != nil {
if len(creates) != 1 {
t.Errorf("case %q: unexpected creates: %v", test.testName, creates)
} else if e, a := test.expectCreate, creates[0].GetObject(); !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectCreate == nil && len(creates) > 0 {
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates)
}

}

non_reconcile_tests := []struct {
Expand All @@ -399,6 +430,7 @@ func TestReconcileEndpoints(t *testing.T) {
additionalMasters int
endpoints *api.EndpointsList
expectUpdate *api.Endpoints // nil means none expected
expectCreate *api.Endpoints // nil means none expected
}{
{
testName: "existing endpoints extra service ports missing port no update",
Expand Down Expand Up @@ -450,7 +482,7 @@ func TestReconcileEndpoints(t *testing.T) {
ip: "1.2.3.4",
endpointPorts: []api.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: nil,
expectUpdate: &api.Endpoints{
expectCreate: &api.Endpoints{
ObjectMeta: om("foo"),
Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}},
Expand All @@ -460,24 +492,52 @@ func TestReconcileEndpoints(t *testing.T) {
},
}
for _, test := range non_reconcile_tests {
registry := &registrytest.EndpointRegistry{
Endpoints: test.endpoints,
fakeClient := fake.NewSimpleClientset()
if test.endpoints != nil {
fakeClient = fake.NewSimpleClientset(test.endpoints)
}
reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, registry)
reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.Core())
err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false)
if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
}

updates := []core.UpdateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() != "update" {
continue
}
updates = append(updates, action.(core.UpdateAction))
}
if test.expectUpdate != nil {
if len(registry.Updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, registry.Updates)
} else if e, a := test.expectUpdate, &registry.Updates[0]; !reflect.DeepEqual(e, a) {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else if e, a := test.expectUpdate, updates[0].GetObject(); !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectUpdate == nil && len(registry.Updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, registry.Updates)
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}

creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() != "create" {
continue
}
creates = append(creates, action.(core.CreateAction))
}
if test.expectCreate != nil {
if len(creates) != 1 {
t.Errorf("case %q: unexpected creates: %v", test.testName, creates)
} else if e, a := test.expectCreate, creates[0].GetObject(); !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectCreate == nil && len(creates) > 0 {
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates)
}

}

}
Expand Down