Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix service namespace clash #5192

Merged
merged 1 commit into from Mar 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 21 additions & 14 deletions pkg/proxy/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/config"
"github.com/golang/glog"
)
Expand Down Expand Up @@ -81,7 +82,7 @@ type EndpointsConfig struct {
// It immediately runs the created EndpointsConfig.
func NewEndpointsConfig() *EndpointsConfig {
updates := make(chan struct{})
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[string]api.Endpoints)}
store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)}
mux := config.NewMux(store)
bcaster := config.NewBroadcaster()
go watchForUpdates(bcaster, store, updates)
Expand Down Expand Up @@ -112,34 +113,37 @@ func (c *EndpointsConfig) Config() []api.Endpoints {

type endpointsStore struct {
endpointLock sync.RWMutex
endpoints map[string]map[string]api.Endpoints
endpoints map[string]map[types.NamespacedName]api.Endpoints
updates chan<- struct{}
}

func (s *endpointsStore) Merge(source string, change interface{}) error {
s.endpointLock.Lock()
endpoints := s.endpoints[source]
if endpoints == nil {
endpoints = make(map[string]api.Endpoints)
endpoints = make(map[types.NamespacedName]api.Endpoints)
}
update := change.(EndpointsUpdate)
switch update.Op {
case ADD:
glog.V(4).Infof("Adding new endpoint from source %s : %+v", source, update.Endpoints)
for _, value := range update.Endpoints {
endpoints[value.Name] = value
name := types.NamespacedName{value.Namespace, value.Name}
endpoints[name] = value
}
case REMOVE:
glog.V(4).Infof("Removing an endpoint %+v", update)
for _, value := range update.Endpoints {
delete(endpoints, value.Name)
name := types.NamespacedName{value.Namespace, value.Name}
delete(endpoints, name)
}
case SET:
glog.V(4).Infof("Setting endpoints %+v", update)
// Clear the old map entries by just creating a new map
endpoints = make(map[string]api.Endpoints)
endpoints = make(map[types.NamespacedName]api.Endpoints)
for _, value := range update.Endpoints {
endpoints[value.Name] = value
name := types.NamespacedName{value.Namespace, value.Name}
endpoints[name] = value
}
default:
glog.V(4).Infof("Received invalid update type: %v", update)
Expand Down Expand Up @@ -176,7 +180,7 @@ type ServiceConfig struct {
// It immediately runs the created ServiceConfig.
func NewServiceConfig() *ServiceConfig {
updates := make(chan struct{})
store := &serviceStore{updates: updates, services: make(map[string]map[string]api.Service)}
store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)}
mux := config.NewMux(store)
bcaster := config.NewBroadcaster()
go watchForUpdates(bcaster, store, updates)
Expand Down Expand Up @@ -207,34 +211,37 @@ func (c *ServiceConfig) Config() []api.Service {

type serviceStore struct {
serviceLock sync.RWMutex
services map[string]map[string]api.Service
services map[string]map[types.NamespacedName]api.Service
updates chan<- struct{}
}

func (s *serviceStore) Merge(source string, change interface{}) error {
s.serviceLock.Lock()
services := s.services[source]
if services == nil {
services = make(map[string]api.Service)
services = make(map[types.NamespacedName]api.Service)
}
update := change.(ServiceUpdate)
switch update.Op {
case ADD:
glog.V(4).Infof("Adding new service from source %s : %+v", source, update.Services)
for _, value := range update.Services {
services[value.Name] = value
name := types.NamespacedName{value.Namespace, value.Name}
services[name] = value
}
case REMOVE:
glog.V(4).Infof("Removing a service %+v", update)
for _, value := range update.Services {
delete(services, value.Name)
name := types.NamespacedName{value.Namespace, value.Name}
delete(services, name)
}
case SET:
glog.V(4).Infof("Setting services %+v", update)
// Clear the old map entries by just creating a new map
services = make(map[string]api.Service)
services = make(map[types.NamespacedName]api.Service)
for _, value := range update.Services {
services[value.Name] = value
name := types.NamespacedName{value.Namespace, value.Name}
services[name] = value
}
default:
glog.V(4).Infof("Received invalid update type: %v", update)
Expand Down
32 changes: 16 additions & 16 deletions pkg/proxy/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestNewServiceAddedAndNotified(t *testing.T) {
handler := NewServiceHandlerMock()
handler.Wait(1)
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)

Expand All @@ -147,24 +147,24 @@ func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
channel := config.Channel("one")
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
handler.Wait(1)
channel <- serviceUpdate
handler.ValidateServices(t, serviceUpdate.Services)

serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
handler.Wait(1)
channel <- serviceUpdate2
services := []api.Service{serviceUpdate2.Services[0], serviceUpdate.Services[0]}
handler.ValidateServices(t, services)

serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}})
serviceUpdate3 := CreateServiceUpdate(REMOVE, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}})
handler.Wait(1)
channel <- serviceUpdate3
services = []api.Service{serviceUpdate2.Services[0]}
handler.ValidateServices(t, services)

serviceUpdate4 := CreateServiceUpdate(SET, api.Service{ObjectMeta: api.ObjectMeta{Name: "foobar"}, Spec: api.ServiceSpec{Port: 99}})
serviceUpdate4 := CreateServiceUpdate(SET, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foobar"}, Spec: api.ServiceSpec{Port: 99}})
handler.Wait(1)
channel <- serviceUpdate4
services = []api.Service{serviceUpdate4.Services[0]}
Expand All @@ -180,8 +180,8 @@ func TestNewMultipleSourcesServicesAddedAndNotified(t *testing.T) {
}
handler := NewServiceHandlerMock()
config.RegisterHandler(handler)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
handler.Wait(2)
channelOne <- serviceUpdate1
channelTwo <- serviceUpdate2
Expand All @@ -197,8 +197,8 @@ func TestNewMultipleSourcesServicesMultipleHandlersAddedAndNotified(t *testing.T
handler2 := NewServiceHandlerMock()
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
serviceUpdate1 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, Spec: api.ServiceSpec{Port: 10}})
serviceUpdate2 := CreateServiceUpdate(ADD, api.Service{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, Spec: api.ServiceSpec{Port: 20}})
handler.Wait(2)
handler2.Wait(2)
channelOne <- serviceUpdate1
Expand All @@ -217,11 +217,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddedAndNotified(t *testing.
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}},
})
handler.Wait(2)
Expand All @@ -243,11 +243,11 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
config.RegisterHandler(handler)
config.RegisterHandler(handler2)
endpointsUpdate1 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint1"}, {IP: "endpoint2"}},
})
endpointsUpdate2 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
Endpoints: []api.Endpoint{{IP: "endpoint3"}, {IP: "endpoint4"}},
})
handler.Wait(2)
Expand All @@ -261,7 +261,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t

// Add one more
endpointsUpdate3 := CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foobar"},
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
Endpoints: []api.Endpoint{{IP: "endpoint5"}, {IP: "endpoint6"}},
})
handler.Wait(1)
Expand All @@ -273,7 +273,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t

// Update the "foo" service with new endpoints
endpointsUpdate1 = CreateEndpointsUpdate(ADD, api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint7"}},
})
handler.Wait(1)
Expand All @@ -284,7 +284,7 @@ func TestNewMultipleSourcesEndpointsMultipleHandlersAddRemoveSetAndNotified(t *t
handler2.ValidateEndpoints(t, endpoints)

// Remove "bar" service
endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}})
endpointsUpdate2 = CreateEndpointsUpdate(REMOVE, api.Endpoints{ObjectMeta: api.ObjectMeta{Namespace: "testnamespace", Name: "bar"}})
handler.Wait(1)
handler2.Wait(1)
channelTwo <- endpointsUpdate2
Expand Down
7 changes: 4 additions & 3 deletions pkg/proxy/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"net"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
)

// LoadBalancer is an interface for distributing incoming requests to service endpoints.
type LoadBalancer interface {
// NextEndpoint returns the endpoint to handle a request for the given
// service and source address.
NextEndpoint(service string, srcAddr net.Addr) (string, error)
NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service string)
NextEndpoint(service types.NamespacedName, srcAddr net.Addr) (string, error)
NewService(service types.NamespacedName, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service types.NamespacedName)
}