diff --git a/cmd/kube-proxy/app/server.go b/cmd/kube-proxy/app/server.go index f4615be72871..3f0c43d94490 100644 --- a/cmd/kube-proxy/app/server.go +++ b/cmd/kube-proxy/app/server.go @@ -220,9 +220,6 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err var proxier proxy.ProxyProvider var servicesHandler proxyconfig.ServiceConfigHandler - // TODO: Migrate all handlers to EndpointsHandler type and - // get rid of this one. - var endpointsHandler proxyconfig.EndpointsConfigHandler var endpointsEventHandler proxyconfig.EndpointsHandler proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{}) @@ -261,7 +258,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err // our config.EndpointsConfigHandler. loadBalancer := winuserspace.NewLoadBalancerRR() // set EndpointsHandler to our loadBalancer - endpointsHandler = loadBalancer + endpointsEventHandler = loadBalancer proxierUserspace, err := winuserspace.NewProxier( loadBalancer, net.ParseIP(config.BindAddress), @@ -321,12 +318,7 @@ func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, err go serviceConfig.Run(wait.NeverStop) endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), config.ConfigSyncPeriod) - if endpointsHandler != nil { - endpointsConfig.RegisterHandler(endpointsHandler) - } - if endpointsEventHandler != nil { - endpointsConfig.RegisterEventHandler(endpointsEventHandler) - } + endpointsConfig.RegisterEventHandler(endpointsEventHandler) go endpointsConfig.Run(wait.NeverStop) // This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those diff --git a/cmd/kubemark/hollow-node.go b/cmd/kubemark/hollow-node.go index 2b9cfa669eca..5c422d10dba1 100644 --- a/cmd/kubemark/hollow-node.go +++ b/cmd/kubemark/hollow-node.go @@ -143,7 +143,7 @@ func main() { serviceConfig.RegisterHandler(&kubemark.FakeProxyHandler{}) endpointsConfig := proxyconfig.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), configResyncPeriod) - endpointsConfig.RegisterHandler(&kubemark.FakeProxyHandler{}) + endpointsConfig.RegisterEventHandler(&kubemark.FakeProxyHandler{}) eventClient, err := clientgoclientset.NewForConfig(clientConfig) if err != nil { diff --git a/pkg/kubemark/hollow_proxy.go b/pkg/kubemark/hollow_proxy.go index ae738f88e47b..4e5fecb361b0 100644 --- a/pkg/kubemark/hollow_proxy.go +++ b/pkg/kubemark/hollow_proxy.go @@ -40,8 +40,11 @@ type HollowProxy struct { type FakeProxyHandler struct{} -func (*FakeProxyHandler) OnServiceUpdate(services []*api.Service) {} -func (*FakeProxyHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) {} +func (*FakeProxyHandler) OnServiceUpdate(services []*api.Service) {} +func (*FakeProxyHandler) OnEndpointsAdd(endpoints *api.Endpoints) {} +func (*FakeProxyHandler) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) {} +func (*FakeProxyHandler) OnEndpointsDelete(endpoints *api.Endpoints) {} +func (*FakeProxyHandler) OnEndpointsSynced() {} type FakeProxier struct{} diff --git a/pkg/proxy/config/BUILD b/pkg/proxy/config/BUILD index 2adb49bc4211..8c6967c1d2c3 100644 --- a/pkg/proxy/config/BUILD +++ b/pkg/proxy/config/BUILD @@ -40,6 +40,7 @@ go_test( "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", "//pkg/client/informers/informers_generated/internalversion:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/testing", diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 11600eae8c3d..6d8d0a68f0b9 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -24,6 +24,7 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" ktesting "k8s.io/client-go/testing" "k8s.io/kubernetes/pkg/api" @@ -124,40 +125,34 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - ch := make(chan struct{}) - handler := newEpsHandler(t, nil, func() { ch <- struct{}{} }) + handler := NewEndpointsHandlerMock() sharedInformers := informers.NewSharedInformerFactory(client, time.Minute) endpointsConfig := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute) - endpointsConfig.RegisterHandler(handler) + endpointsConfig.RegisterEventHandler(handler) go sharedInformers.Start(stopCh) go endpointsConfig.Run(stopCh) // Add the first endpoints - handler.expected = []*api.Endpoints{endpoints1v1} fakeWatch.Add(endpoints1v1) - <-ch + handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v1}) // Add another endpoints - handler.expected = []*api.Endpoints{endpoints1v1, endpoints2} fakeWatch.Add(endpoints2) - <-ch + handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v1, endpoints2}) // Modify endpoints1 - handler.expected = []*api.Endpoints{endpoints1v2, endpoints2} fakeWatch.Modify(endpoints1v2) - <-ch + handler.ValidateEndpoints(t, []*api.Endpoints{endpoints1v2, endpoints2}) // Delete endpoints1 - handler.expected = []*api.Endpoints{endpoints2} fakeWatch.Delete(endpoints1v2) - <-ch + handler.ValidateEndpoints(t, []*api.Endpoints{endpoints2}) // Delete endpoints2 - handler.expected = []*api.Endpoints{} fakeWatch.Delete(endpoints2) - <-ch + handler.ValidateEndpoints(t, []*api.Endpoints{}) } type svcHandler struct { @@ -178,22 +173,17 @@ func (s *svcHandler) OnServiceUpdate(services []*api.Service) { } } -type epsHandler struct { - t *testing.T - expected []*api.Endpoints - done func() -} - -func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) *epsHandler { - return &epsHandler{t: t, expected: eps, done: done} -} - -func (e *epsHandler) OnEndpointsUpdate(endpoints []*api.Endpoints) { - defer e.done() - sort.Sort(sortedEndpoints(endpoints)) - if !reflect.DeepEqual(e.expected, endpoints) { - e.t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, e.expected) +func newEpsHandler(t *testing.T, eps []*api.Endpoints, done func()) EndpointsHandler { + ehm := &EndpointsHandlerMock{ + state: make(map[types.NamespacedName]*api.Endpoints), + } + ehm.process = func(endpoints []*api.Endpoints) { + defer done() + if !reflect.DeepEqual(eps, endpoints) { + t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, eps) + } } + return ehm } func TestInitialSync(t *testing.T) { @@ -225,7 +215,7 @@ func TestInitialSync(t *testing.T) { svcHandler := newSvcHandler(t, []*api.Service{svc2, svc1}, wg.Done) svcConfig.RegisterHandler(svcHandler) epsHandler := newEpsHandler(t, []*api.Endpoints{eps2, eps1}, wg.Done) - epsConfig.RegisterHandler(epsHandler) + epsConfig.RegisterEventHandler(epsHandler) stopCh := make(chan struct{}) defer close(stopCh) diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index 73bfae3caf83..979733af9932 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -45,21 +45,6 @@ type ServiceConfigHandler interface { OnServiceUpdate(services []*api.Service) } -// EndpointsConfigHandler is an abstract interface of objects which receive update notifications for the set of endpoints. -type EndpointsConfigHandler interface { - // OnEndpointsUpdate gets called when endpoints configuration is changed for a given - // service on any of the configuration sources. An example is when a new - // service comes up, or when containers come up or down for an existing service. - // - // NOTE: For efficiency, endpoints are being passed by reference, thus, - // OnEndpointsUpdate should NOT modify pointers of a given slice. - // Those endpoints objects are shared with other layers of the system and - // are guaranteed to be immutable with the assumption that are also - // not mutated by those handlers. Make a deep copy if you need to modify - // them in your code. - OnEndpointsUpdate(endpoints []*api.Endpoints) -} - // EndpointsHandler is an abstract interface o objects which receive // notifications about endpoints object changes. type EndpointsHandler interface { @@ -83,11 +68,6 @@ type EndpointsConfig struct { lister listers.EndpointsLister listerSynced cache.InformerSynced eventHandlers []EndpointsHandler - // TODO: Remove handlers by switching them to eventHandlers. - handlers []EndpointsConfigHandler - // updates channel is used to trigger registered handlers. - updates chan struct{} - stop chan struct{} } // NewEndpointsConfig creates a new EndpointsConfig. @@ -95,12 +75,6 @@ func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyn result := &EndpointsConfig{ lister: endpointsInformer.Lister(), listerSynced: endpointsInformer.Informer().HasSynced, - // The updates channel is used to send interrupts to the Endpoints handler. - // It's buffered because we never want to block for as long as there is a - // pending interrupt, but don't want to drop them if the handler is doing - // work. - updates: make(chan struct{}, 1), - stop: make(chan struct{}), } endpointsInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -115,11 +89,6 @@ func NewEndpointsConfig(endpointsInformer coreinformers.EndpointsInformer, resyn return result } -// RegisterHandler registers a handler which is called on every endpoints change. -func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { - c.handlers = append(c.handlers, handler) -} - // RegisterEventHandler registers a handler which is called on every endpoints change. func (c *EndpointsConfig) RegisterEventHandler(handler EndpointsHandler) { c.eventHandlers = append(c.eventHandlers, handler) @@ -132,40 +101,12 @@ func (c *EndpointsConfig) Run(stopCh <-chan struct{}) { return } - // We have synced informers. Now we can start delivering updates - // to the registered handler. - go func() { - for i := range c.eventHandlers { - glog.V(3).Infof("Calling handler.OnEndpointsSynced()") - c.eventHandlers[i].OnEndpointsSynced() - } - for { - select { - case <-c.updates: - endpoints, err := c.lister.List(labels.Everything()) - if err != nil { - glog.Errorf("Error while listing endpoints from cache: %v", err) - // This will cause a retry (if there isn't any other trigger in-flight). - c.dispatchUpdate() - continue - } - if endpoints == nil { - endpoints = []*api.Endpoints{} - } - for i := range c.handlers { - glog.V(3).Infof("Calling handler.OnEndpointsUpdate()") - c.handlers[i].OnEndpointsUpdate(endpoints) - } - case <-c.stop: - return - } - } - }() - // Close updates channel when stopCh is closed. - go func() { - <-stopCh - close(c.stop) - }() + for i := range c.eventHandlers { + glog.V(3).Infof("Calling handler.OnEndpointsSynced()") + c.eventHandlers[i].OnEndpointsSynced() + } + + <-stopCh } func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) { @@ -178,7 +119,6 @@ func (c *EndpointsConfig) handleAddEndpoints(obj interface{}) { glog.V(4).Infof("Calling handler.OnEndpointsAdd") c.eventHandlers[i].OnEndpointsAdd(endpoints) } - c.dispatchUpdate() } func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) { @@ -196,7 +136,6 @@ func (c *EndpointsConfig) handleUpdateEndpoints(oldObj, newObj interface{}) { glog.V(4).Infof("Calling handler.OnEndpointsUpdate") c.eventHandlers[i].OnEndpointsUpdate(oldEndpoints, endpoints) } - c.dispatchUpdate() } func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) { @@ -216,18 +155,6 @@ func (c *EndpointsConfig) handleDeleteEndpoints(obj interface{}) { glog.V(4).Infof("Calling handler.OnEndpointsUpdate") c.eventHandlers[i].OnEndpointsDelete(endpoints) } - c.dispatchUpdate() -} - -func (c *EndpointsConfig) dispatchUpdate() { - select { - case c.updates <- struct{}{}: - // Work enqueued successfully - case <-c.stop: - // We're shut down / avoid logging the message below - default: - glog.V(4).Infof("Endpoints handler already has a pending interrupt.") - } } // ServiceConfig tracks a set of service configurations. diff --git a/pkg/proxy/config/config_test.go b/pkg/proxy/config/config_test.go index b5883211fdf4..d4a9a370e074 100644 --- a/pkg/proxy/config/config_test.go +++ b/pkg/proxy/config/config_test.go @@ -19,10 +19,12 @@ package config import ( "reflect" "sort" + "sync" "testing" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" ktesting "k8s.io/client-go/testing" @@ -45,7 +47,6 @@ func (s sortedServices) Less(i, j int) bool { type ServiceHandlerMock struct { updated chan []*api.Service - waits int } func NewServiceHandlerMock() *ServiceHandlerMock { @@ -90,17 +91,66 @@ func (s sortedEndpoints) Less(i, j int) bool { } type EndpointsHandlerMock struct { + lock sync.Mutex + + state map[types.NamespacedName]*api.Endpoints + synced bool updated chan []*api.Endpoints - waits int + process func([]*api.Endpoints) } func NewEndpointsHandlerMock() *EndpointsHandlerMock { - return &EndpointsHandlerMock{updated: make(chan []*api.Endpoints, 5)} + ehm := &EndpointsHandlerMock{ + state: make(map[types.NamespacedName]*api.Endpoints), + updated: make(chan []*api.Endpoints, 5), + } + ehm.process = func(endpoints []*api.Endpoints) { + ehm.updated <- endpoints + } + return ehm +} + +func (h *EndpointsHandlerMock) OnEndpointsAdd(endpoints *api.Endpoints) { + h.lock.Lock() + defer h.lock.Unlock() + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + h.state[namespacedName] = endpoints + h.sendEndpoints() +} + +func (h *EndpointsHandlerMock) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { + h.lock.Lock() + defer h.lock.Unlock() + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + h.state[namespacedName] = endpoints + h.sendEndpoints() +} + +func (h *EndpointsHandlerMock) OnEndpointsDelete(endpoints *api.Endpoints) { + h.lock.Lock() + defer h.lock.Unlock() + namespacedName := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name} + delete(h.state, namespacedName) + h.sendEndpoints() } -func (h *EndpointsHandlerMock) OnEndpointsUpdate(endpoints []*api.Endpoints) { +func (h *EndpointsHandlerMock) OnEndpointsSynced() { + h.lock.Lock() + defer h.lock.Unlock() + h.synced = true + h.sendEndpoints() +} + +func (h *EndpointsHandlerMock) sendEndpoints() { + if !h.synced { + return + } + endpoints := make([]*api.Endpoints, 0, len(h.state)) + for _, eps := range h.state { + endpoints = append(endpoints, eps) + } sort.Sort(sortedEndpoints(endpoints)) - h.updated <- endpoints + h.process(endpoints) } func (h *EndpointsHandlerMock) ValidateEndpoints(t *testing.T, expectedEndpoints []*api.Endpoints) { @@ -230,8 +280,8 @@ func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) { config := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute) handler := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock() - config.RegisterHandler(handler) - config.RegisterHandler(handler2) + config.RegisterEventHandler(handler) + config.RegisterEventHandler(handler2) go sharedInformers.Start(stopCh) go config.Run(stopCh) @@ -270,8 +320,8 @@ func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) { config := NewEndpointsConfig(sharedInformers.Core().InternalVersion().Endpoints(), time.Minute) handler := NewEndpointsHandlerMock() handler2 := NewEndpointsHandlerMock() - config.RegisterHandler(handler) - config.RegisterHandler(handler2) + config.RegisterEventHandler(handler) + config.RegisterEventHandler(handler2) go sharedInformers.Start(stopCh) go config.Run(stopCh) diff --git a/pkg/proxy/winuserspace/proxier_test.go b/pkg/proxy/winuserspace/proxier_test.go index 59bb6667b7ae..9334880a1d0c 100644 --- a/pkg/proxy/winuserspace/proxier_test.go +++ b/pkg/proxy/winuserspace/proxier_test.go @@ -216,14 +216,12 @@ func getPortNum(t *testing.T, addr string) int { func TestTCPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -245,14 +243,12 @@ func TestTCPProxy(t *testing.T) { func TestUDPProxy(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -274,14 +270,12 @@ func TestUDPProxy(t *testing.T) { func TestUDPProxyTimeout(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -308,19 +302,20 @@ func TestMultiPortProxy(t *testing.T) { lb := NewLoadBalancerRR() serviceP := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-p"}, Port: "p"} serviceQ := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo-q"}, Port: "q"} - lb.OnEndpointsUpdate([]*api.Endpoints{{ + lb.OnEndpointsAdd(&api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "p", Protocol: "TCP", Port: tcpServerPort}}, }}, - }, { + }) + lb.OnEndpointsAdd(&api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceQ.Name, Namespace: serviceQ.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, Ports: []api.EndpointPort{{Name: "q", Protocol: "UDP", Port: udpServerPort}}, }}, - }}) + }) listenIP := "0.0.0.0" p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) @@ -410,14 +405,12 @@ func stopProxyByName(proxier *Proxier, service ServicePortPortalName) error { func TestTCPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -456,14 +449,12 @@ func TestTCPProxyStop(t *testing.T) { func TestUDPProxyStop(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -496,14 +487,12 @@ func TestUDPProxyStop(t *testing.T) { func TestTCPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -536,14 +525,12 @@ func TestTCPProxyUpdateDelete(t *testing.T) { func TestUDPProxyUpdateDelete(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Namespace: service.Namespace, Name: service.Name}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -582,7 +569,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, }}, } - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) listenIP := "0.0.0.0" p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) @@ -610,7 +597,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { waitForNumProxyLoops(t, p, 0) // need to add endpoint here because it got clean up during service delete - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ @@ -637,7 +624,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, }}, } - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) listenIP := "0.0.0.0" p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) @@ -665,7 +652,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { waitForNumProxyLoops(t, p, 0) // need to add endpoint here because it got clean up during service delete - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) p.OnServiceUpdate([]*api.Service{{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Spec: api.ServiceSpec{ClusterIP: listenIP, Ports: []api.ServicePort{{ @@ -685,14 +672,12 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { func TestTCPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -735,14 +720,12 @@ func TestTCPProxyUpdatePort(t *testing.T) { func TestUDPProxyUpdatePort(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: udpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -782,14 +765,12 @@ func TestUDPProxyUpdatePort(t *testing.T) { func TestProxyUpdatePublicIPs(t *testing.T) { lb := NewLoadBalancerRR() service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "echo"}, Port: "p"} - lb.OnEndpointsUpdate([]*api.Endpoints{ - { - ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, - Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, - Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, - }}, - }, + lb.OnEndpointsAdd(&api.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "127.0.0.1"}}, + Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, + }}, }) listenIP := "0.0.0.0" @@ -843,7 +824,7 @@ func TestProxyUpdatePortal(t *testing.T) { Ports: []api.EndpointPort{{Name: "p", Port: tcpServerPort}}, }}, } - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) listenIP := "0.0.0.0" p, err := createProxier(lb, net.ParseIP(listenIP), netshtest.NewFake(), net.ParseIP("127.0.0.1"), time.Minute, udpIdleTimeoutForTest) @@ -894,7 +875,7 @@ func TestProxyUpdatePortal(t *testing.T) { Protocol: "TCP", }}}, }}) - lb.OnEndpointsUpdate([]*api.Endpoints{endpoint}) + lb.OnEndpointsAdd(endpoint) svcInfo, exists = p.getServiceInfo(servicePortPortalName) if !exists { t.Fatalf("service with ClusterIP set not found in the proxy") diff --git a/pkg/proxy/winuserspace/roundrobin.go b/pkg/proxy/winuserspace/roundrobin.go index c7daa68c820f..ff2026bd8fcf 100644 --- a/pkg/proxy/winuserspace/roundrobin.go +++ b/pkg/proxy/winuserspace/roundrobin.go @@ -233,65 +233,91 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn } } -// OnEndpointsUpdate manages the registered service endpoints. -// Registered endpoints are updated if found in the update set or -// unregistered if missing from the update set. -func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { +// buildPortsToEndpointsMap builds a map of portname -> all ip:ports for that +// portname. Explode Endpoints.Subsets[*] into this structure. +func buildPortsToEndpointsMap(endpoints *api.Endpoints) map[string][]hostPortPair { + portsToEndpoints := map[string][]hostPortPair{} + for i := range endpoints.Subsets { + ss := &endpoints.Subsets[i] + for i := range ss.Ports { + port := &ss.Ports[i] + for i := range ss.Addresses { + addr := &ss.Addresses[i] + portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)}) + // Ignore the protocol field - we'll get that from the Service objects. + } + } + } + return portsToEndpoints +} + +func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *api.Endpoints) { + portsToEndpoints := buildPortsToEndpointsMap(endpoints) + + lb.lock.Lock() + defer lb.lock.Unlock() + + for portname := range portsToEndpoints { + svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} + newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) + state, exists := lb.services[svcPort] + + if !exists || state == nil || len(newEndpoints) > 0 { + glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) + lb.updateAffinityMap(svcPort, newEndpoints) + // OnEndpointsAdd can be called without NewService being called externally. + // To be safe we will call it here. A new service will only be created + // if one does not already exist. The affinity will be updated + // later, once NewService is called. + state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0) + state.endpoints = slice.ShuffleStrings(newEndpoints) + + // Reset the round-robin index. + state.index = 0 + } + } +} + +func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) { + portsToEndpoints := buildPortsToEndpointsMap(endpoints) + oldPortsToEndpoints := buildPortsToEndpointsMap(oldEndpoints) registeredEndpoints := make(map[proxy.ServicePortName]bool) + lb.lock.Lock() defer lb.lock.Unlock() - // Update endpoints for services. - for i := range allEndpoints { - // svcEndpoints should NOT be modified. - svcEndpoints := allEndpoints[i] - - // We need to build a map of portname -> all ip:ports for that - // portname. Explode Endpoints.Subsets[*] into this structure. - portsToEndpoints := map[string][]hostPortPair{} - for i := range svcEndpoints.Subsets { - ss := &svcEndpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - for i := range ss.Addresses { - addr := &ss.Addresses[i] - portsToEndpoints[port.Name] = append(portsToEndpoints[port.Name], hostPortPair{addr.IP, int(port.Port)}) - // Ignore the protocol field - we'll get that from the Service objects. - } - } + for portname := range portsToEndpoints { + svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} + newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) + state, exists := lb.services[svcPort] + + curEndpoints := []string{} + if state != nil { + curEndpoints = state.endpoints } - for portname := range portsToEndpoints { - svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: svcEndpoints.Namespace, Name: svcEndpoints.Name}, Port: portname} - state, exists := lb.services[svcPort] - curEndpoints := []string{} - if state != nil { - curEndpoints = state.endpoints - } - newEndpoints := flattenValidEndpoints(portsToEndpoints[portname]) - - if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { - glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) - lb.updateAffinityMap(svcPort, newEndpoints) - // OnEndpointsUpdate can be called without NewService being called externally. - // To be safe we will call it here. A new service will only be created - // if one does not already exist. The affinity will be updated - // later, once NewService is called. - state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0) - state.endpoints = slice.ShuffleStrings(newEndpoints) - - // Reset the round-robin index. - state.index = 0 - } - registeredEndpoints[svcPort] = true + if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { + glog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) + lb.updateAffinityMap(svcPort, newEndpoints) + // OnEndpointsUpdate can be called without NewService being called externally. + // To be safe we will call it here. A new service will only be created + // if one does not already exist. The affinity will be updated + // later, once NewService is called. + state = lb.newServiceInternal(svcPort, api.ServiceAffinity(""), 0) + state.endpoints = slice.ShuffleStrings(newEndpoints) + + // Reset the round-robin index. + state.index = 0 } + registeredEndpoints[svcPort] = true } - // Remove endpoints missing from the update. - for k := range lb.services { - if _, exists := registeredEndpoints[k]; !exists { - glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", k) + + for portname := range oldPortsToEndpoints { + svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} + if _, exists := registeredEndpoints[svcPort]; !exists { + glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort) // Reset but don't delete. - state := lb.services[k] + state := lb.services[svcPort] state.endpoints = []string{} state.index = 0 state.affinity.affinityMap = map[string]*affinityState{} @@ -299,6 +325,27 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { } } +func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *api.Endpoints) { + portsToEndpoints := buildPortsToEndpointsMap(endpoints) + + lb.lock.Lock() + defer lb.lock.Unlock() + + for portname := range portsToEndpoints { + svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} + glog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort) + // If the service is still around, reset but don't delete. + if state, ok := lb.services[svcPort]; ok { + state.endpoints = []string{} + state.index = 0 + state.affinity.affinityMap = map[string]*affinityState{} + } + } +} + +func (lb *LoadBalancerRR) OnEndpointsSynced() { +} + // Tests whether two slices are equivalent. This sorts both slices in-place. func slicesEquiv(lhs, rhs []string) bool { if len(lhs) != len(rhs) { diff --git a/pkg/proxy/winuserspace/roundrobin_test.go b/pkg/proxy/winuserspace/roundrobin_test.go index 0e4356050a32..b334a3ccc241 100644 --- a/pkg/proxy/winuserspace/roundrobin_test.go +++ b/pkg/proxy/winuserspace/roundrobin_test.go @@ -67,8 +67,6 @@ func TestFilterWorks(t *testing.T) { func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() - var endpoints []*api.Endpoints - loadBalancer.OnEndpointsUpdate(endpoints) service := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "foo"}, Port: "does-not-exist"} endpoint, err := loadBalancer.NextEndpoint(service, nil, false) if err == nil { @@ -106,15 +104,14 @@ func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Name: "p", Port: 40}}, }}, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) expectEndpoint(t, loadBalancer, service, "endpoint1:40", nil) @@ -144,15 +141,14 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{{ Addresses: []api.EndpointAddress{{IP: "endpoint"}}, Ports: []api.EndpointPort{{Name: "p", Port: 1}, {Name: "p", Port: 2}, {Name: "p", Port: 3}}, }}, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) shuffledEndpoints := loadBalancer.services[service].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { @@ -172,8 +168,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -186,7 +181,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsMultiplePorts(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) shuffledEndpoints := loadBalancer.services[serviceP].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:1", "endpoint3:3") { @@ -215,8 +210,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpointsv1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -233,7 +227,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpointsv1) shuffledEndpoints := loadBalancer.services[serviceP].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint1:1", "endpoint2:2", "endpoint3:3") { @@ -255,7 +249,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again - endpoints[0] = &api.Endpoints{ + endpointsv2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -268,7 +262,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2) shuffledEndpoints = loadBalancer.services[serviceP].endpoints if !stringsInSlice(shuffledEndpoints, "endpoint4:4", "endpoint5:5") { @@ -289,8 +283,8 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, serviceQ, shuffledEndpoints[1], nil) // Clear endpoints - endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} - loadBalancer.OnEndpointsUpdate(endpoints) + endpointsv3 := &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: serviceP.Name, Namespace: serviceP.Namespace}, Subsets: nil} + loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3) endpoint, err = loadBalancer.NextEndpoint(serviceP, nil, false) if err == nil || len(endpoint) != 0 { @@ -306,8 +300,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - endpoints := make([]*api.Endpoints, 2) - endpoints[0] = &api.Endpoints{ + endpoints1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: fooServiceP.Name, Namespace: fooServiceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -316,7 +309,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - endpoints[1] = &api.Endpoints{ + endpoints2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: barServiceP.Name, Namespace: barServiceP.Namespace}, Subsets: []api.EndpointSubset{ { @@ -325,7 +318,8 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints1) + loadBalancer.OnEndpointsAdd(endpoints2) shuffledFooEndpoints := loadBalancer.services[fooServiceP].endpoints expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[0], nil) expectEndpoint(t, loadBalancer, fooServiceP, shuffledFooEndpoints[1], nil) @@ -341,7 +335,7 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { expectEndpoint(t, loadBalancer, barServiceP, shuffledBarEndpoints[1], nil) // Then update the configuration by removing foo - loadBalancer.OnEndpointsUpdate(endpoints[1:]) + loadBalancer.OnEndpointsDelete(endpoints1) endpoint, err = loadBalancer.NextEndpoint(fooServiceP, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -364,8 +358,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { // Call NewService() before OnEndpointsUpdate() loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, @@ -373,7 +366,7 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledFirst(t *testing.T) { {Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}}, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} @@ -420,15 +413,14 @@ func TestStickyLoadBalanceWorksWithNewServiceCalledSecond(t *testing.T) { } // Call OnEndpointsUpdate() before NewService() - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, {Addresses: []api.EndpointAddress{{IP: "endpoint2"}}, Ports: []api.EndpointPort{{Port: 2}}}, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} @@ -482,8 +474,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { } loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpointsv1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -492,7 +483,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpointsv1) shuffledEndpoints := loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) client1Endpoint := shuffledEndpoints[0] @@ -503,7 +494,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[2], client3) client3Endpoint := shuffledEndpoints[2] - endpoints[0] = &api.Endpoints{ + endpointsv2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -512,7 +503,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2) shuffledEndpoints = loadBalancer.services[service].endpoints if client1Endpoint == "endpoint:3" { client1Endpoint = shuffledEndpoints[0] @@ -525,7 +516,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { expectEndpoint(t, loadBalancer, service, client2Endpoint, client2) expectEndpoint(t, loadBalancer, service, client3Endpoint, client3) - endpoints[0] = &api.Endpoints{ + endpointsv3 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -534,7 +525,7 @@ func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3) shuffledEndpoints = loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, client1Endpoint, client1) expectEndpoint(t, loadBalancer, service, client2Endpoint, client2) @@ -556,8 +547,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { } loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpointsv1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -566,7 +556,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpointsv1) shuffledEndpoints := loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) @@ -577,7 +567,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again - endpoints[0] = &api.Endpoints{ + endpointsv2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ { @@ -586,7 +576,7 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsUpdate(endpointsv1, endpointsv2) shuffledEndpoints = loadBalancer.services[service].endpoints expectEndpoint(t, loadBalancer, service, shuffledEndpoints[0], client1) expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) @@ -596,8 +586,8 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { expectEndpoint(t, loadBalancer, service, shuffledEndpoints[1], client2) // Clear endpoints - endpoints[0] = &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} - loadBalancer.OnEndpointsUpdate(endpoints) + endpointsv3 := &api.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: nil} + loadBalancer.OnEndpointsUpdate(endpointsv2, endpointsv3) endpoint, err = loadBalancer.NextEndpoint(service, nil, false) if err == nil || len(endpoint) != 0 { @@ -616,8 +606,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { t.Errorf("Didn't fail with non-existent service") } loadBalancer.NewService(fooService, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 2) - endpoints[0] = &api.Endpoints{ + endpoints1 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: fooService.Name, Namespace: fooService.Namespace}, Subsets: []api.EndpointSubset{ { @@ -628,7 +617,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { } barService := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: "testnamespace", Name: "bar"}, Port: ""} loadBalancer.NewService(barService, api.ServiceAffinityClientIP, 0) - endpoints[1] = &api.Endpoints{ + endpoints2 := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: barService.Name, Namespace: barService.Namespace}, Subsets: []api.EndpointSubset{ { @@ -637,7 +626,8 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { }, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints1) + loadBalancer.OnEndpointsAdd(endpoints2) shuffledFooEndpoints := loadBalancer.services[fooService].endpoints expectEndpoint(t, loadBalancer, fooService, shuffledFooEndpoints[0], client1) @@ -659,7 +649,7 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { expectEndpoint(t, loadBalancer, barService, shuffledBarEndpoints[1], client2) // Then update the configuration by removing foo - loadBalancer.OnEndpointsUpdate(endpoints[1:]) + loadBalancer.OnEndpointsDelete(endpoints1) endpoint, err = loadBalancer.NextEndpoint(fooService, nil, false) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") @@ -685,8 +675,7 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) { // Call NewService() before OnEndpointsUpdate() loadBalancer.NewService(service, api.ServiceAffinityClientIP, 0) - endpoints := make([]*api.Endpoints, 1) - endpoints[0] = &api.Endpoints{ + endpoints := &api.Endpoints{ ObjectMeta: metav1.ObjectMeta{Name: service.Name, Namespace: service.Namespace}, Subsets: []api.EndpointSubset{ {Addresses: []api.EndpointAddress{{IP: "endpoint1"}}, Ports: []api.EndpointPort{{Port: 1}}}, @@ -694,7 +683,7 @@ func TestStickyLoadBalanceWorksWithEndpointFails(t *testing.T) { {Addresses: []api.EndpointAddress{{IP: "endpoint3"}}, Ports: []api.EndpointPort{{Port: 3}}}, }, } - loadBalancer.OnEndpointsUpdate(endpoints) + loadBalancer.OnEndpointsAdd(endpoints) client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}