Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Finish migration to edge-based for endpoints in KubeProxy #44318

Merged
merged 2 commits into from Apr 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 2 additions & 10 deletions cmd/kube-proxy/app/server.go
Expand Up @@ -220,9 +220,6 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err

var proxier proxy.ProxyProvider
var servicesHandler proxyconfig.ServiceConfigHandler
// TODO: Migrate all handlers to EndpointsHandler type and
// get rid of this one.
var endpointsHandler proxyconfig.EndpointsConfigHandler
var endpointsEventHandler proxyconfig.EndpointsHandler

proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{})
Expand Down Expand Up @@ -261,7 +258,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
// our config.EndpointsConfigHandler.
loadBalancer := winuserspace.NewLoadBalancerRR()
// set EndpointsHandler to our loadBalancer
endpointsHandler = loadBalancer
endpointsEventHandler = loadBalancer
proxierUserspace, err := winuserspace.NewProxier(
loadBalancer,
net.ParseIP(config.BindAddress),
Expand Down Expand Up @@ -321,12 +318,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err
go serviceConfig.Run(wait.NeverStop)

endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod)
if endpointsHandler != nil {
endpointsConfig.RegisterHandler(endpointsHandler)
}
if endpointsEventHandler != nil {
endpointsConfig.RegisterEventHandler(endpointsEventHandler)
}
endpointsConfig.RegisterEventHandler(endpointsEventHandler)
go endpointsConfig.Run(wait.NeverStop)

// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
Expand Down
2 changes: 1 addition & 1 deletion cmd/kubemark/hollow-node.go
Expand Up @@ -143,7 +143,7 @@ func main() {
serviceConfig.RegisterHandler(&kubemark.FakeProxyHandler{})

endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), configResyncPeriod)
endpointsConfig.RegisterHandler(&kubemark.FakeProxyHandler{})
endpointsConfig.RegisterEventHandler(&kubemark.FakeProxyHandler{})

eventClient, err := clientgoclientset.NewForConfig(clientConfig)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions pkg/kubemark/hollow_proxy.go
Expand Up @@ -40,8 +40,11 @@ type HollowProxy struct {

type FakeProxyHandler struct{}

func (*FakeProxyHandler) OnServiceUpdate(services []*api.Service) {}
func (*FakeProxyHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {}
func (*FakeProxyHandler) OnServiceUpdate(services []*api.Service) {}
func (*FakeProxyHandler) OnEndpointsAdd(endpoints *api.Endpoints) {}
func (*FakeProxyHandler) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {}
func (*FakeProxyHandler) OnEndpointsDelete(endpoints *api.Endpoints) {}
func (*FakeProxyHandler) OnEndpointsSynced() {}

type FakeProxier struct{}

Expand Down
1 change: 1 addition & 0 deletions pkg/proxy/config/BUILD
Expand Up @@ -40,6 +40,7 @@ go_test(
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/client/informers/informers_generated/internalversion:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/testing",
Expand Down
48 changes: 19 additions & 29 deletions pkg/proxy/config/api_test.go
Expand Up @@ -24,6 +24,7 @@ import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
ktesting "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/api"
Expand Down Expand Up @@ -124,40 +125,34 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)

ch := make(chan struct{})
handler := newEpsHandler(t, nil, func() { ch <- struct{}{} })
handler := NewEndpointsHandlerMock()

sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)

endpointsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute)
endpointsConfig.RegisterHandler(handler)
endpointsConfig.RegisterEventHandler(handler)
go sharedInformers.Start(stopCh)
go endpointsConfig.Run(stopCh)

// Add the first endpoints
handler.expected = []*api.Endpoints{endpoints1v1}
fakeWatch.Add(endpoints1v1)
<-ch
handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v1})

// Add another endpoints
handler.expected = []*api.Endpoints{endpoints1v1, endpoints2}
fakeWatch.Add(endpoints2)
<-ch
handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v1, endpoints2})

// Modify endpoints1
handler.expected = []*api.Endpoints{endpoints1v2, endpoints2}
fakeWatch.Modify(endpoints1v2)
<-ch
handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v2, endpoints2})

// Delete endpoints1
handler.expected = []*api.Endpoints{endpoints2}
fakeWatch.Delete(endpoints1v2)
<-ch
handler.ValidateEndpoints(t, []*api.Endpoints{endpoints2})

// Delete endpoints2
handler.expected = []*api.Endpoints{}
fakeWatch.Delete(endpoints2)
<-ch
handler.ValidateEndpoints(t, []*api.Endpoints{})
}

type svcHandler struct {
Expand All @@ -178,22 +173,17 @@ func (s *svcHandler) OnServiceUpdate(services []*api.Service) {
}
}

type epsHandler struct {
t *testing.T
expected []*api.Endpoints
done func()
}

func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) *epsHandler {
return &epsHandler{t: t, expected: eps, done: done}
}

func (e *epsHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {
defer e.done()
sort.Sort(sortedEndpoints(endpoints))
if !reflect.DeepEqual(e.expected, endpoints) {
e.t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, e.expected)
func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) EndpointsHandler {
ehm := &EndpointsHandlerMock{
state: make(map[types.NamespacedName]*api.Endpoints),
}
ehm.process = func(endpoints []*api.Endpoints) {
defer done()
if !reflect.DeepEqual(eps, endpoints) {
t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, eps)
}
}
return ehm
}

func TestInitialSync(t *testing.T) {
Expand Down Expand Up @@ -225,7 +215,7 @@ func TestInitialSync(t *testing.T) {
svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done)
svcConfig.RegisterHandler(svcHandler)
epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done)
epsConfig.RegisterHandler(epsHandler)
epsConfig.RegisterEventHandler(epsHandler)

stopCh := make(chan struct{})
defer close(stopCh)
Expand Down
85 changes: 6 additions & 79 deletions pkg/proxy/config/config.go
Expand Up @@ -45,21 +45,6 @@ type ServiceConfigHandler interface {
OnServiceUpdate(services []*api.Service)
}

// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints.
type EndpointsConfigHandler interface {
// OnEndpointsUpdate gets called when endpoints configuration is changed for a given
// service on any of the configuration sources. An example is when a new
// service comes up, or when containers come up or down for an existing service.
//
// NOTE: For efficiency, endpoints are being passed by reference, thus,
// OnEndpointsUpdate should NOT modify pointers of a given slice.
// Those endpoints objects are shared with other layers of the system and
// are guaranteed to be immutable with the assumption that are also
// not mutated by those handlers. Make a deep copy if you need to modify
// them in your code.
OnEndpointsUpdate(endpoints []*api.Endpoints)
}

// EndpointsHandler is an abstract interface o objects which receive
// notifications about endpoints object changes.
type EndpointsHandler interface {
Expand All @@ -83,24 +68,13 @@ type EndpointsConfig struct {
lister listers.EndpointsLister
listerSynced cache.InformerSynced
eventHandlers []EndpointsHandler
// TODO: Remove handlers by switching them to eventHandlers.
handlers []EndpointsConfigHandler
// updates channel is used to trigger registered handlers.
updates chan struct{}
stop chan struct{}
}

// NewEndpointsConfig creates a new EndpointsConfig.
func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyncPeriod time.Duration) *EndpointsConfig {
result := &EndpointsConfig{
lister: endpointsInformer.Lister(),
listerSynced: endpointsInformer.Informer().HasSynced,
// The updates channel is used to send interrupts to the Endpoints handler.
// It's buffered because we never want to block for as long as there is a
// pending interrupt, but don't want to drop them if the handler is doing
// work.
updates: make(chan struct{}, 1),
stop: make(chan struct{}),
}

endpointsInformer.Informer().AddEventHandlerWithResyncPeriod(
Expand All @@ -115,11 +89,6 @@ func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyn
return result
}

// RegisterHandler registers a handler which is called on every endpoints change.
func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) {
c.handlers = append(c.handlers, handler)
}

// RegisterEventHandler registers a handler which is called on every endpoints change.
func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) {
c.eventHandlers = append(c.eventHandlers, handler)
Expand All @@ -132,40 +101,12 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) {
return
}

// We have synced informers. Now we can start delivering updates
// to the registered handler.
go func() {
for i := range c.eventHandlers {
glog.V(3).Infof("Calling handler.OnEndpointsSynced()")
c.eventHandlers[i].OnEndpointsSynced()
}
for {
select {
case <-c.updates:
endpoints, err := c.lister.List(labels.Everything())
if err != nil {
glog.Errorf("Error while listing endpoints from cache: %v", err)
// This will cause a retry (if there isn't any other trigger in-flight).
c.dispatchUpdate()
continue
}
if endpoints == nil {
endpoints = []*api.Endpoints{}
}
for i := range c.handlers {
glog.V(3).Infof("Calling handler.OnEndpointsUpdate()")
c.handlers[i].OnEndpointsUpdate(endpoints)
}
case <-c.stop:
return
}
}
}()
// Close updates channel when stopCh is closed.
go func() {
<-stopCh
close(c.stop)
}()
for i := range c.eventHandlers {
glog.V(3).Infof("Calling handler.OnEndpointsSynced()")
c.eventHandlers[i].OnEndpointsSynced()
}

<-stopCh
}

func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
Expand All @@ -178,7 +119,6 @@ func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) {
glog.V(4).Infof("Calling handler.OnEndpointsAdd")
c.eventHandlers[i].OnEndpointsAdd(endpoints)
}
c.dispatchUpdate()
}

func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
Expand All @@ -196,7 +136,6 @@ func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) {
glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints)
}
c.dispatchUpdate()
}

func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
Expand All @@ -216,18 +155,6 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) {
glog.V(4).Infof("Calling handler.OnEndpointsUpdate")
c.eventHandlers[i].OnEndpointsDelete(endpoints)
}
c.dispatchUpdate()
}

func (c *EndpointsConfig) dispatchUpdate() {
select {
case c.updates <- struct{}{}:
// Work enqueued successfully
case <-c.stop:
// We're shut down / avoid logging the message below
default:
glog.V(4).Infof("Endpoints handler already has a pending interrupt.")
}
}

// ServiceConfig tracks a set of service configurations.
Expand Down