From bd7cb034dbc1752dcfc27daae4801edcea6d6c3d Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Wed, 21 Feb 2024 18:19:44 -0500 Subject: [PATCH 1/3] manager/allocator: add port-allocation tests Replicate the port allocation-related tests from the CNM network allocator's test corpus in preparation for lifting port allocation into allocator.Allocator. Signed-off-by: Cory Snider --- manager/allocator/allocator_test.go | 517 +++++++++++++++++++++++++++- 1 file changed, 516 insertions(+), 1 deletion(-) diff --git a/manager/allocator/allocator_test.go b/manager/allocator/allocator_test.go index 082a5c8885..378584fee9 100644 --- a/manager/allocator/allocator_test.go +++ b/manager/allocator/allocator_test.go @@ -21,6 +21,19 @@ func init() { retryInterval = 5 * time.Millisecond } +// Temporary copy of constants from cnmallocator/portallocator.go +// to allow tests to build before portallocator.go is moved into +// this package. +const ( + // Start of the dynamic port range from which node ports will + // be allocated when the user did not specify a port. + dynamicPortStart = 30000 + + // End of the dynamic port range from which node ports will be + // allocated when the user did not specify a port. + dynamicPortEnd = 32767 +) + func TestAllocator(t *testing.T) { s := store.NewMemoryStore(nil) assert.NotNil(t, s) @@ -96,11 +109,27 @@ func TestAllocator(t *testing.T) { Mode: api.ResolutionModeVirtualIP, Ports: []*api.PortConfig{ { - Name: "portName", + Name: "some_tcp", Protocol: api.ProtocolTCP, TargetPort: 8000, PublishedPort: 8001, }, + { + Name: "some_udp", + Protocol: api.ProtocolUDP, + TargetPort: 8000, + PublishedPort: 8001, + }, + { + Name: "auto_assigned_tcp", + Protocol: api.ProtocolTCP, + TargetPort: 9000, + }, + { + Name: "auto_assigned_udp", + Protocol: api.ProtocolUDP, + TargetPort: 9000, + }, }, }, }, @@ -247,6 +276,22 @@ func TestAllocator(t *testing.T) { assert.Equal(t, tp2.Networks[0].Network.ID, nln.ID) assert.Nil(t, tp1.Networks[0].Addresses, "Non nil addresses for task on node-local network") assert.Nil(t, tp2.Networks[0].Addresses, "Non nil addresses for task on node-local network") + // Verify service ports were allocated + s.View(func(tx store.ReadTx) { + s1 := store.GetService(tx, "testServiceID1") + if assert.NotNil(t, s1) && assert.NotNil(t, s1.Endpoint) && assert.Len(t, s1.Endpoint.Ports, 4) { + // "some_tcp" and "some_udp" + for _, i := range []int{0, 1} { + assert.EqualExportedValues(t, *s1.Spec.Endpoint.Ports[i], *s1.Endpoint.Ports[i]) + } + // "auto_assigned_tcp" and "auto_assigned_udp" + for _, i := range []int{2, 3} { + assert.Equal(t, s1.Spec.Endpoint.Ports[i].TargetPort, s1.Endpoint.Ports[i].TargetPort) + assert.GreaterOrEqual(t, s1.Endpoint.Ports[i].PublishedPort, uint32(dynamicPortStart)) + assert.LessOrEqual(t, s1.Endpoint.Ports[i].PublishedPort, uint32(dynamicPortEnd)) + } + } + }) // Add new networks/tasks/services after allocator is started. assert.NoError(t, s.Update(func(tx store.Tx) error { @@ -1533,6 +1578,476 @@ func TestNodeAttachmentOnLeadershipChange(t *testing.T) { watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "net2"}) } +func TestAllocateServiceConflictingUserDefinedPorts(t *testing.T) { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + const svcID = "testID1" + // Try adding some objects to store before allocator is started + assert.NoError(t, s.Update(func(tx store.Tx) error { + // populate ingress network + in := &api.Network{ + ID: "ingress-nw-id", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "default-ingress", + }, + Ingress: true, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, + } + assert.NoError(t, store.CreateNetwork(tx, in)) + + s1 := &api.Service{ + ID: svcID, + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "service1", + }, + Endpoint: &api.EndpointSpec{ + Ports: []*api.PortConfig{ + { + Name: "some_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }, + { + Name: "some_other_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }, + }, + }, + }, + } + assert.NoError(t, store.CreateService(tx, s1)) + + return nil + })) + + serviceWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateService{}, api.EventDeleteService{}) + defer cancel() + + a, err := New(s, nil, nil) + assert.NoError(t, err) + assert.NotNil(t, a) + + go func() { + assert.NoError(t, a.Run(context.Background())) + }() + defer a.Stop() + + // Port spec is invalid; service should not be updated + watchService(t, serviceWatch, true, func(_ assert.TestingT, service *api.Service) bool { + t.Errorf("unexpected service update: %v", service) + return true + }) + + // Update the service to remove the conflicting port + assert.NoError(t, s.Update(func(tx store.Tx) error { + s1 := store.GetService(tx, svcID) + if assert.NotNil(t, s1) { + s1.Spec.Endpoint.Ports[1].TargetPort = 1235 + s1.Spec.Endpoint.Ports[1].PublishedPort = 1235 + assert.NoError(t, store.UpdateService(tx, s1)) + } + return nil + })) + watchService(t, serviceWatch, false, func(t assert.TestingT, service *api.Service) bool { + if assert.Equal(t, svcID, service.ID) && assert.NotNil(t, service.Endpoint) && assert.Len(t, service.Endpoint.Ports, 2) { + return assert.Equal(t, uint32(1235), service.Endpoint.Ports[1].PublishedPort) + } + return false + }) +} + +func TestDeallocateServiceAllocate(t *testing.T) { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + newSvc := func(id string) *api.Service { + return &api.Service{ + ID: id, + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "service1", + }, + Endpoint: &api.EndpointSpec{ + Ports: []*api.PortConfig{ + { + Name: "some_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }, + }, + }, + }, + } + } + + // Try adding some objects to store before allocator is started + assert.NoError(t, s.Update(func(tx store.Tx) error { + // populate ingress network + in := &api.Network{ + ID: "ingress-nw-id", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "default-ingress", + }, + Ingress: true, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, + } + assert.NoError(t, store.CreateNetwork(tx, in)) + assert.NoError(t, store.CreateService(tx, newSvc("testID1"))) + return nil + })) + + serviceWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateService{}, api.EventDeleteService{}) + defer cancel() + + a, err := New(s, nil, nil) + assert.NoError(t, err) + assert.NotNil(t, a) + + go func() { + assert.NoError(t, a.Run(context.Background())) + }() + defer a.Stop() + + isTestService := func(id string) func(t assert.TestingT, service *api.Service) bool { + return func(t assert.TestingT, service *api.Service) bool { + return assert.Equal(t, id, service.ID) && + assert.Len(t, service.Endpoint.Ports, 1) && + assert.Equal(t, uint32(1234), service.Endpoint.Ports[0].PublishedPort) && + assert.Len(t, service.Endpoint.VirtualIPs, 1) + } + } + // Confirm service is allocated + watchService(t, serviceWatch, false, isTestService("testID1")) + + // Deallocate the service and allocate a new one with the same port spec + assert.NoError(t, s.Update(func(tx store.Tx) error { + assert.NoError(t, store.DeleteService(tx, "testID1")) + assert.NoError(t, store.CreateService(tx, newSvc("testID2"))) + return nil + })) + // Confirm new service is allocated + watchService(t, serviceWatch, false, isTestService("testID2")) +} + +func TestServiceAddRemovePorts(t *testing.T) { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + const svcID = "testID1" + // Try adding some objects to store before allocator is started + assert.NoError(t, s.Update(func(tx store.Tx) error { + // populate ingress network + in := &api.Network{ + ID: "ingress-nw-id", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "default-ingress", + }, + Ingress: true, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, + } + assert.NoError(t, store.CreateNetwork(tx, in)) + + s1 := &api.Service{ + ID: svcID, + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "service1", + }, + Endpoint: &api.EndpointSpec{ + Ports: []*api.PortConfig{ + { + Name: "some_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }, + }, + }, + }, + } + assert.NoError(t, store.CreateService(tx, s1)) + + return nil + })) + + serviceWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateService{}, api.EventDeleteService{}) + defer cancel() + + a, err := New(s, nil, nil) + assert.NoError(t, err) + assert.NotNil(t, a) + + go func() { + assert.NoError(t, a.Run(context.Background())) + }() + defer a.Stop() + + var probedVIP string + probeTestService := func(expectPorts ...uint32) func(t assert.TestingT, service *api.Service) bool { + return func(t assert.TestingT, service *api.Service) bool { + expectedVIPCount := 0 + if len(expectPorts) > 0 { + expectedVIPCount = 1 + } + if len(service.Endpoint.VirtualIPs) > 0 { + probedVIP = service.Endpoint.VirtualIPs[0].Addr + } else { + probedVIP = "" + } + if assert.Equal(t, svcID, service.ID) && assert.Len(t, service.Endpoint.Ports, len(expectPorts)) { + var published []uint32 + for _, port := range service.Endpoint.Ports { + published = append(published, port.PublishedPort) + } + return assert.Equal(t, expectPorts, published) && assert.Len(t, service.Endpoint.VirtualIPs, expectedVIPCount) + } + + return false + } + } + // Confirm service is allocated + watchService(t, serviceWatch, false, probeTestService(1234)) + allocatedVIP := probedVIP + + // Unpublish port + assert.NoError(t, s.Update(func(tx store.Tx) error { + s1 := store.GetService(tx, svcID) + if assert.NotNil(t, s1) { + s1.Spec.Endpoint.Ports = nil + assert.NoError(t, store.UpdateService(tx, s1)) + } + return nil + })) + // Wait for unpublishing to take effect + watchService(t, serviceWatch, false, probeTestService()) + + // Publish port again and ensure VIP is not the same that was deallocated. + // Since IP allocation is serial we should receive the next available IP. + assert.NoError(t, s.Update(func(tx store.Tx) error { + s1 := store.GetService(tx, svcID) + if assert.NotNil(t, s1) { + s1.Spec.Endpoint.Ports = append(s1.Spec.Endpoint.Ports, &api.PortConfig{Name: "some_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }) + assert.NoError(t, store.UpdateService(tx, s1)) + } + return nil + })) + watchService(t, serviceWatch, false, probeTestService(1234)) + assert.NotEqual(t, allocatedVIP, probedVIP) +} + +func TestServiceUpdatePort(t *testing.T) { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + const svcID = "testID1" + // Try adding some objects to store before allocator is started + assert.NoError(t, s.Update(func(tx store.Tx) error { + // populate ingress network + in := &api.Network{ + ID: "ingress-nw-id", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "default-ingress", + }, + Ingress: true, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, + } + assert.NoError(t, store.CreateNetwork(tx, in)) + + s1 := &api.Service{ + ID: svcID, + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "service1", + }, + Endpoint: &api.EndpointSpec{ + Ports: []*api.PortConfig{ + { + Name: "some_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }, + { + Name: "some_other_tcp", + TargetPort: 1235, + }, + }, + }, + }, + } + assert.NoError(t, store.CreateService(tx, s1)) + + return nil + })) + + serviceWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateService{}, api.EventDeleteService{}) + defer cancel() + + a, err := New(s, nil, nil) + assert.NoError(t, err) + assert.NotNil(t, a) + + go func() { + assert.NoError(t, a.Run(context.Background())) + }() + defer a.Stop() + + watchService(t, serviceWatch, false, func(t assert.TestingT, service *api.Service) bool { + return assert.Equal(t, svcID, service.ID) && assert.Len(t, service.Endpoint.Ports, 2) + }) + + assert.NoError(t, s.Update(func(tx store.Tx) error { + s1 := store.GetService(tx, svcID) + if assert.NotNil(t, s1) { + s1.Spec.Endpoint.Ports[1].PublishedPort = 1235 + assert.NoError(t, store.UpdateService(tx, s1)) + } + return nil + })) + watchService(t, serviceWatch, false, func(t assert.TestingT, service *api.Service) bool { + if assert.Equal(t, svcID, service.ID) && assert.Len(t, service.Endpoint.Ports, 2) { + return assert.Equal(t, uint32(1235), service.Endpoint.Ports[1].PublishedPort) + } + return false + }) +} + +func TestServicePortAllocationIsRepeatable(t *testing.T) { + alloc := func() []*api.PortConfig { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + const svcID = "testID1" + // Try adding some objects to store before allocator is started + assert.NoError(t, s.Update(func(tx store.Tx) error { + // populate ingress network + in := &api.Network{ + ID: "ingress-nw-id", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "default-ingress", + }, + Ingress: true, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, + } + assert.NoError(t, store.CreateNetwork(tx, in)) + + s1 := &api.Service{ + ID: svcID, + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "service1", + }, + Endpoint: &api.EndpointSpec{ + Ports: []*api.PortConfig{ + { + Name: "some_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }, + { + Name: "some_other_tcp", + TargetPort: 1235, + }, + }, + }, + }, + } + assert.NoError(t, store.CreateService(tx, s1)) + + return nil + })) + + serviceWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateService{}, api.EventDeleteService{}) + defer cancel() + + a, err := New(s, nil, nil) + assert.NoError(t, err) + assert.NotNil(t, a) + + go func() { + assert.NoError(t, a.Run(context.Background())) + }() + defer a.Stop() + + var probedPorts []*api.PortConfig + probeTestService := func(t assert.TestingT, service *api.Service) bool { + if assert.Equal(t, svcID, service.ID) && assert.Len(t, service.Endpoint.Ports, 2) { + probedPorts = service.Endpoint.Ports + return true + } + return false + } + watchService(t, serviceWatch, false, probeTestService) + return probedPorts + } + + assert.Equal(t, alloc(), alloc()) +} + func isValidNode(t assert.TestingT, originalNode, updatedNode *api.Node, networks []string) bool { if !assert.Equal(t, originalNode.ID, updatedNode.ID) { From 80fc22d1b294c0d2f94af5bec054e7ddbdd2ff26 Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Tue, 20 Feb 2024 17:19:15 -0500 Subject: [PATCH 2/3] cnmallocator: make newPortAllocator() infallible The portAllocator constructor function newPortAllocator() takes no arguments; all its initial state is taken from hardcoded constants in the source code. Any failure to construct a port allocator must therefore be the result of a bug in the code, which can only be fixed with a code change. As there is nothing the caller can do at runtime to cause the failure, there is little value in returning the error to the caller. Change the constructor to panic on failure instead. Signed-off-by: Cory Snider --- .../cnmallocator/networkallocator.go | 8 ++------ .../allocator/cnmallocator/portallocator.go | 19 +++++++------------ .../cnmallocator/portallocator_test.go | 16 ++++++---------- 3 files changed, 15 insertions(+), 28 deletions(-) diff --git a/manager/allocator/cnmallocator/networkallocator.go b/manager/allocator/cnmallocator/networkallocator.go index 7ca55f8dce..3c9d0c4617 100644 --- a/manager/allocator/cnmallocator/networkallocator.go +++ b/manager/allocator/cnmallocator/networkallocator.go @@ -108,6 +108,8 @@ func New(pg plugingetter.PluginGetter, netConfig *NetworkConfig) (networkallocat tasks: make(map[string]struct{}), nodes: make(map[string]map[string]struct{}), pg: pg, + + portAllocator: newPortAllocator(), } for ntype, i := range initializers { @@ -126,12 +128,6 @@ func New(pg plugingetter.PluginGetter, netConfig *NetworkConfig) (networkallocat return nil, fmt.Errorf("failed to initialize IPAM driver plugins: %w", err) } - pa, err := newPortAllocator() - if err != nil { - return nil, err - } - - na.portAllocator = pa return na, nil } diff --git a/manager/allocator/cnmallocator/portallocator.go b/manager/allocator/cnmallocator/portallocator.go index 818613fe81..2ebb35b33e 100644 --- a/manager/allocator/cnmallocator/portallocator.go +++ b/manager/allocator/cnmallocator/portallocator.go @@ -101,36 +101,31 @@ func (ps allocatedPorts) delState(p *api.PortConfig) *api.PortConfig { return nil } -func newPortAllocator() (*portAllocator, error) { +func newPortAllocator() *portAllocator { portSpaces := make(map[api.PortConfig_Protocol]*portSpace) for _, protocol := range []api.PortConfig_Protocol{api.ProtocolTCP, api.ProtocolUDP, api.ProtocolSCTP} { - ps, err := newPortSpace(protocol) - if err != nil { - return nil, err - } - - portSpaces[protocol] = ps + portSpaces[protocol] = newPortSpace(protocol) } - return &portAllocator{portSpaces: portSpaces}, nil + return &portAllocator{portSpaces: portSpaces} } -func newPortSpace(protocol api.PortConfig_Protocol) (*portSpace, error) { +func newPortSpace(protocol api.PortConfig_Protocol) *portSpace { master, err := idm.New(masterPortStart, masterPortEnd) if err != nil { - return nil, err + panic(err) } dynamic, err := idm.New(dynamicPortStart, dynamicPortEnd) if err != nil { - return nil, err + panic(err) } return &portSpace{ protocol: protocol, masterPortSpace: master, dynamicPortSpace: dynamic, - }, nil + } } // getPortConfigKey returns a map key for doing set operations with diff --git a/manager/allocator/cnmallocator/portallocator_test.go b/manager/allocator/cnmallocator/portallocator_test.go index b514f40077..0970b37931 100644 --- a/manager/allocator/cnmallocator/portallocator_test.go +++ b/manager/allocator/cnmallocator/portallocator_test.go @@ -180,8 +180,7 @@ func TestReconcilePortConfigs(t *testing.T) { } func TestAllocateServicePorts(t *testing.T) { - pa, err := newPortAllocator() - assert.NoError(t, err) + pa := newPortAllocator() // Service has no endpoint in ServiceSpec s := &api.Service{ @@ -200,7 +199,7 @@ func TestAllocateServicePorts(t *testing.T) { }, } - err = pa.serviceAllocatePorts(s) + err := pa.serviceAllocatePorts(s) assert.NoError(t, err) // Service has a published port 10001 in ServiceSpec @@ -265,8 +264,7 @@ func TestAllocateServicePorts(t *testing.T) { } func TestHostPublishPortsNeedUpdate(t *testing.T) { - pa, err := newPortAllocator() - assert.NoError(t, err) + pa := newPortAllocator() type Data struct { name string @@ -494,8 +492,7 @@ func TestHostPublishPortsNeedUpdate(t *testing.T) { } func TestIsPortsAllocated(t *testing.T) { - pa, err := newPortAllocator() - assert.NoError(t, err) + pa := newPortAllocator() type Data struct { name string @@ -886,8 +883,7 @@ func TestIsPortsAllocated(t *testing.T) { } func TestAllocate(t *testing.T) { - pSpace, err := newPortSpace(api.ProtocolTCP) - assert.NoError(t, err) + pSpace := newPortSpace(api.ProtocolTCP) pConfig := &api.PortConfig{ Name: "test1", @@ -897,7 +893,7 @@ func TestAllocate(t *testing.T) { } // first consume 30000 in dynamicPortSpace - err = pSpace.allocate(pConfig) + err := pSpace.allocate(pConfig) assert.NoError(t, err) pConfig = &api.PortConfig{ From 6531bf81cbf3b7f9f6f14f46cb9c11c9cdeb7ae1 Mon Sep 17 00:00:00 2001 From: Cory Snider Date: Wed, 21 Feb 2024 13:41:39 -0500 Subject: [PATCH 3/3] manager/allocator: lift portAllocator out of CNM The port allocation logic does not depend on the network allocator implementation in any meaningful way. It has no knowledge of the CNM network allocator's state, and it does not need to change if the network allocator changes. Allocating node ports is fundamentally a seaparate concern from allocating network resources for services and tasks. Therefore the low-level network allocator should not be responsible for allocating both. Lift the port allocator into the Allocator's network context as a sibling of the low-level network allocator. Signed-off-by: Cory Snider --- manager/allocator/allocator_test.go | 13 -- .../cnmallocator/networkallocator.go | 28 +-- .../cnmallocator/networkallocator_test.go | 218 +----------------- manager/allocator/network.go | 63 ++++- .../networkallocator/networkallocator.go | 4 - .../{cnmallocator => }/portallocator.go | 2 +- .../{cnmallocator => }/portallocator_test.go | 2 +- 7 files changed, 57 insertions(+), 273 deletions(-) rename manager/allocator/{cnmallocator => }/portallocator.go (99%) rename manager/allocator/{cnmallocator => }/portallocator_test.go (99%) diff --git a/manager/allocator/allocator_test.go b/manager/allocator/allocator_test.go index 378584fee9..3e0454c097 100644 --- a/manager/allocator/allocator_test.go +++ b/manager/allocator/allocator_test.go @@ -21,19 +21,6 @@ func init() { retryInterval = 5 * time.Millisecond } -// Temporary copy of constants from cnmallocator/portallocator.go -// to allow tests to build before portallocator.go is moved into -// this package. -const ( - // Start of the dynamic port range from which node ports will - // be allocated when the user did not specify a port. - dynamicPortStart = 30000 - - // End of the dynamic port range from which node ports will be - // allocated when the user did not specify a port. - dynamicPortEnd = 32767 -) - func TestAllocator(t *testing.T) { s := store.NewMemoryStore(nil) assert.NotNil(t, s) diff --git a/manager/allocator/cnmallocator/networkallocator.go b/manager/allocator/cnmallocator/networkallocator.go index 3c9d0c4617..927e4f43e3 100644 --- a/manager/allocator/cnmallocator/networkallocator.go +++ b/manager/allocator/cnmallocator/networkallocator.go @@ -40,9 +40,6 @@ type cnmNetworkAllocator struct { // The driver registry for all internal and external network drivers. networkRegistry drvregistry.Networks - // The port allocator instance for allocating node ports - portAllocator *portAllocator - // Local network state used by cnmNetworkAllocator to do network management. networks map[string]*network @@ -108,8 +105,6 @@ func New(pg plugingetter.PluginGetter, netConfig *NetworkConfig) (networkallocat tasks: make(map[string]struct{}), nodes: make(map[string]map[string]struct{}), pg: pg, - - portAllocator: newPortAllocator(), } for ntype, i := range initializers { @@ -205,11 +200,8 @@ func (na *cnmNetworkAllocator) Deallocate(n *api.Network) error { } // AllocateService allocates all the network resources such as virtual -// IP and ports needed by the service. +// IP needed by the service. func (na *cnmNetworkAllocator) AllocateService(s *api.Service) (err error) { - if err = na.portAllocator.serviceAllocatePorts(s); err != nil { - return err - } defer func() { if err != nil { na.DeallocateService(s) @@ -296,7 +288,7 @@ networkLoop: } // DeallocateService de-allocates all the network resources such as -// virtual IP and ports associated with the service. +// virtual IP associated with the service. func (na *cnmNetworkAllocator) DeallocateService(s *api.Service) error { if s.Endpoint == nil { return nil @@ -312,7 +304,6 @@ func (na *cnmNetworkAllocator) DeallocateService(s *api.Service) error { } s.Endpoint.VirtualIPs = nil - na.portAllocator.serviceDeallocatePorts(s) delete(na.services, s.ID) return nil @@ -369,19 +360,8 @@ func (na *cnmNetworkAllocator) IsTaskAllocated(t *api.Task) bool { return true } -// HostPublishPortsNeedUpdate returns true if the passed service needs -// allocations for its published ports in host (non ingress) mode -func (na *cnmNetworkAllocator) HostPublishPortsNeedUpdate(s *api.Service) bool { - return na.portAllocator.hostPublishPortsNeedUpdate(s) -} - // IsServiceAllocated returns false if the passed service needs to have network resources allocated/updated. func (na *cnmNetworkAllocator) IsServiceAllocated(s *api.Service, flags ...func(*networkallocator.ServiceAllocationOpts)) bool { - var options networkallocator.ServiceAllocationOpts - for _, flag := range flags { - flag(&options) - } - specNetworks := serviceNetworks(s) // If endpoint mode is VIP and allocator does not have the @@ -443,10 +423,6 @@ func (na *cnmNetworkAllocator) IsServiceAllocated(s *api.Service, flags ...func( } } - if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) || - (s.Endpoint != nil && len(s.Endpoint.Ports) != 0) { - return na.portAllocator.isPortsAllocatedOnInit(s, options.OnInit) - } return true } diff --git a/manager/allocator/cnmallocator/networkallocator_test.go b/manager/allocator/cnmallocator/networkallocator_test.go index 8598fc6b1a..0841f090df 100644 --- a/manager/allocator/cnmallocator/networkallocator_test.go +++ b/manager/allocator/cnmallocator/networkallocator_test.go @@ -563,11 +563,7 @@ func TestAllocateService(t *testing.T) { err = na.AllocateService(s) assert.NoError(t, err) - assert.Equal(t, 2, len(s.Endpoint.Ports)) - assert.True(t, s.Endpoint.Ports[0].PublishedPort >= dynamicPortStart && - s.Endpoint.Ports[0].PublishedPort <= dynamicPortEnd) - assert.True(t, s.Endpoint.Ports[1].PublishedPort >= dynamicPortStart && - s.Endpoint.Ports[1].PublishedPort <= dynamicPortEnd) + assert.Len(t, s.Endpoint.Ports, 0) // Network allocator is not responsible for allocating ports. assert.Equal(t, 1, len(s.Endpoint.VirtualIPs)) @@ -579,94 +575,6 @@ func TestAllocateService(t *testing.T) { assert.Equal(t, true, subnet.Contains(ip)) } -func TestAllocateServiceUserDefinedPorts(t *testing.T) { - na := newNetworkAllocator(t) - s := &api.Service{ - ID: "testID1", - Spec: api.ServiceSpec{ - Endpoint: &api.EndpointSpec{ - Ports: []*api.PortConfig{ - { - Name: "some_tcp", - TargetPort: 1234, - PublishedPort: 1234, - }, - { - Name: "some_udp", - TargetPort: 1234, - PublishedPort: 1234, - Protocol: api.ProtocolUDP, - }, - }, - }, - }, - } - - err := na.AllocateService(s) - assert.NoError(t, err) - assert.Equal(t, 2, len(s.Endpoint.Ports)) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[1].PublishedPort) -} - -func TestAllocateServiceConflictingUserDefinedPorts(t *testing.T) { - na := newNetworkAllocator(t) - s := &api.Service{ - ID: "testID1", - Spec: api.ServiceSpec{ - Endpoint: &api.EndpointSpec{ - Ports: []*api.PortConfig{ - { - Name: "some_tcp", - TargetPort: 1234, - PublishedPort: 1234, - }, - { - Name: "some_other_tcp", - TargetPort: 1234, - PublishedPort: 1234, - }, - }, - }, - }, - } - - err := na.AllocateService(s) - assert.Error(t, err) -} - -func TestDeallocateServiceAllocate(t *testing.T) { - na := newNetworkAllocator(t) - s := &api.Service{ - ID: "testID1", - Spec: api.ServiceSpec{ - Endpoint: &api.EndpointSpec{ - Ports: []*api.PortConfig{ - { - Name: "some_tcp", - TargetPort: 1234, - PublishedPort: 1234, - }, - }, - }, - }, - } - - err := na.AllocateService(s) - assert.NoError(t, err) - assert.Equal(t, 1, len(s.Endpoint.Ports)) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - - err = na.DeallocateService(s) - assert.NoError(t, err) - assert.Equal(t, 0, len(s.Endpoint.Ports)) - // Allocate again. - err = na.AllocateService(s) - assert.NoError(t, err) - assert.Equal(t, 1, len(s.Endpoint.Ports)) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) -} - func TestDeallocateServiceAllocateIngressMode(t *testing.T) { na := newNetworkAllocator(t) @@ -705,8 +613,6 @@ func TestDeallocateServiceAllocateIngressMode(t *testing.T) { err = na.AllocateService(s) assert.NoError(t, err) - assert.Len(t, s.Endpoint.Ports, 1) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) assert.Len(t, s.Endpoint.VirtualIPs, 1) err = na.DeallocateService(s) @@ -719,129 +625,7 @@ func TestDeallocateServiceAllocateIngressMode(t *testing.T) { err = na.AllocateService(s) assert.NoError(t, err) - assert.Len(t, s.Endpoint.Ports, 1) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - assert.Len(t, s.Endpoint.VirtualIPs, 1) -} - -func TestServiceAddRemovePortsIngressMode(t *testing.T) { - na := newNetworkAllocator(t) - - n := &api.Network{ - ID: "testNetID1", - Spec: api.NetworkSpec{ - Annotations: api.Annotations{ - Name: "test", - }, - Ingress: true, - }, - } - - err := na.Allocate(n) - assert.NoError(t, err) - - s := &api.Service{ - ID: "testID1", - Spec: api.ServiceSpec{ - Endpoint: &api.EndpointSpec{ - Ports: []*api.PortConfig{ - { - Name: "some_tcp", - TargetPort: 1234, - PublishedPort: 1234, - PublishMode: api.PublishModeIngress, - }, - }, - }, - }, - Endpoint: &api.Endpoint{}, - } - - s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs, - &api.Endpoint_VirtualIP{NetworkID: n.ID}) - - err = na.AllocateService(s) - assert.NoError(t, err) - assert.Len(t, s.Endpoint.Ports, 1) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) assert.Len(t, s.Endpoint.VirtualIPs, 1) - allocatedVIP := s.Endpoint.VirtualIPs[0].Addr - - // Unpublish port - s.Spec.Endpoint.Ports = s.Spec.Endpoint.Ports[:0] - err = na.AllocateService(s) - assert.NoError(t, err) - assert.Len(t, s.Endpoint.Ports, 0) - assert.Len(t, s.Endpoint.VirtualIPs, 0) - - // Publish port again and ensure VIP is not the same that was deallocated. - // Since IP allocation is serial we should receive the next available IP. - s.Spec.Endpoint.Ports = append(s.Spec.Endpoint.Ports, &api.PortConfig{Name: "some_tcp", - TargetPort: 1234, - PublishedPort: 1234, - PublishMode: api.PublishModeIngress, - }) - s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs, - &api.Endpoint_VirtualIP{NetworkID: n.ID}) - err = na.AllocateService(s) - assert.NoError(t, err) - assert.Len(t, s.Endpoint.Ports, 1) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - assert.Len(t, s.Endpoint.VirtualIPs, 1) - assert.NotEqual(t, allocatedVIP, s.Endpoint.VirtualIPs[0].Addr) -} - -func TestServiceUpdate(t *testing.T) { - na1 := newNetworkAllocator(t) - na2 := newNetworkAllocator(t) - s := &api.Service{ - ID: "testID1", - Spec: api.ServiceSpec{ - Endpoint: &api.EndpointSpec{ - Ports: []*api.PortConfig{ - { - Name: "some_tcp", - TargetPort: 1234, - PublishedPort: 1234, - }, - { - Name: "some_other_tcp", - TargetPort: 1235, - PublishedPort: 0, - }, - }, - }, - }, - } - - err := na1.AllocateService(s) - assert.NoError(t, err) - assert.True(t, na1.IsServiceAllocated(s)) - assert.Equal(t, 2, len(s.Endpoint.Ports)) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - assert.NotEqual(t, 0, s.Endpoint.Ports[1].PublishedPort) - - // Cache the secode node port - allocatedPort := s.Endpoint.Ports[1].PublishedPort - - // Now allocate the same service in another allocator instance - err = na2.AllocateService(s) - assert.NoError(t, err) - assert.True(t, na2.IsServiceAllocated(s)) - assert.Equal(t, 2, len(s.Endpoint.Ports)) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - // Make sure we got the same port - assert.Equal(t, allocatedPort, s.Endpoint.Ports[1].PublishedPort) - - s.Spec.Endpoint.Ports[1].PublishedPort = 1235 - assert.False(t, na1.IsServiceAllocated(s)) - - err = na1.AllocateService(s) - assert.NoError(t, err) - assert.True(t, na1.IsServiceAllocated(s)) - assert.Equal(t, 2, len(s.Endpoint.Ports)) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - assert.Equal(t, uint32(1235), s.Endpoint.Ports[1].PublishedPort) } func TestServiceNetworkUpdate(t *testing.T) { diff --git a/manager/allocator/network.go b/manager/allocator/network.go index 673da84996..721bf3f080 100644 --- a/manager/allocator/network.go +++ b/manager/allocator/network.go @@ -37,6 +37,9 @@ type networkContext struct { // the actual network allocation. nwkAllocator networkallocator.NetworkAllocator + // The port allocator instance for allocating node ports + portAllocator *portAllocator + // A set of tasks which are ready to be allocated as a batch. This is // distinct from "unallocatedTasks" which are tasks that failed to // allocate on the first try, being held for a future retry. @@ -95,6 +98,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { nc := &networkContext{ nwkAllocator: na, + portAllocator: newPortAllocator(), pendingTasks: make(map[string]*api.Task), unallocatedTasks: make(map[string]*api.Task), unallocatedServices: make(map[string]*api.Service), @@ -233,7 +237,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { break } - if nc.nwkAllocator.IsServiceAllocated(s) { + if nc.isServiceAllocated(s) { break } @@ -261,8 +265,8 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { break } - if nc.nwkAllocator.IsServiceAllocated(s) { - if !nc.nwkAllocator.HostPublishPortsNeedUpdate(s) { + if nc.isServiceAllocated(s) { + if !nc.portAllocator.hostPublishPortsNeedUpdate(s) { break } updatePortsInHostPublishMode(s) @@ -284,7 +288,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { case api.EventDeleteService: s := v.Service.Copy() - if err := nc.nwkAllocator.DeallocateService(s); err != nil { + if err := nc.deallocateService(s); err != nil { log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID) } else { nc.somethingWasDeallocated = true @@ -681,7 +685,7 @@ func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly var allocatedServices []*api.Service for _, s := range services { - if nc.nwkAllocator.IsServiceAllocated(s, networkallocator.OnInit) { + if nc.isServiceAllocated(s, networkallocator.OnInit) { continue } if existingAddressesOnly && @@ -713,6 +717,23 @@ func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly return nil } +// isServiceAllocated returns false if the passed service needs to have network resources allocated/updated. +func (nc *networkContext) isServiceAllocated(s *api.Service, flags ...func(*networkallocator.ServiceAllocationOpts)) bool { + if !nc.nwkAllocator.IsServiceAllocated(s, flags...) { + return false + } + + var options networkallocator.ServiceAllocationOpts + for _, flag := range flags { + flag(&options) + } + if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) || + (s.Endpoint != nil && len(s.Endpoint.Ports) != 0) { + return nc.portAllocator.isPortsAllocatedOnInit(s, options.OnInit) + } + return true +} + // allocateTasks allocates tasks in the store so far before we started watching. func (a *Allocator) allocateTasks(ctx context.Context, existingAddressesOnly bool) error { var ( @@ -815,7 +836,7 @@ func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bo // network configured or service endpoints have been // allocated. return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) && - (s == nil || nc.nwkAllocator.IsServiceAllocated(s)) + (s == nil || nc.isServiceAllocated(s)) } func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) { @@ -1200,13 +1221,13 @@ func (a *Allocator) allocateService(ctx context.Context, s *api.Service, existin // is not there // service has no user-defined endpoints while has already allocated network resources, // need deallocated. - if err := nc.nwkAllocator.DeallocateService(s); err != nil { + if err := nc.deallocateService(s); err != nil { return err } nc.somethingWasDeallocated = true } - if err := nc.nwkAllocator.AllocateService(s); err != nil { + if err := nc.allocateService(s); err != nil { nc.unallocatedServices[s.ID] = s return err } @@ -1229,6 +1250,26 @@ func (a *Allocator) allocateService(ctx context.Context, s *api.Service, existin return nil } +func (nc *networkContext) allocateService(s *api.Service) error { + if err := nc.portAllocator.serviceAllocatePorts(s); err != nil { + return err + } + if err := nc.nwkAllocator.AllocateService(s); err != nil { + nc.portAllocator.serviceDeallocatePorts(s) + return err + } + + return nil +} + +func (nc *networkContext) deallocateService(s *api.Service) error { + if err := nc.nwkAllocator.DeallocateService(s); err != nil { + return err + } + nc.portAllocator.serviceDeallocatePorts(s) + return nil +} + func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Batch, s *api.Service) error { if err := batch.Update(func(tx store.Tx) error { err := store.UpdateService(tx, s) @@ -1241,7 +1282,7 @@ func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Bat return errors.Wrapf(err, "failed updating state in store transaction for service %s", s.ID) }); err != nil { - if err := a.netCtx.nwkAllocator.DeallocateService(s); err != nil { + if err := a.netCtx.deallocateService(s); err != nil { log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID) } @@ -1298,7 +1339,7 @@ func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) { return } - if !nc.nwkAllocator.IsServiceAllocated(s) { + if !nc.isServiceAllocated(s) { err = fmt.Errorf("service %s to which task %s belongs has pending allocations", s.ID, t.ID) return } @@ -1423,7 +1464,7 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) { nc := a.netCtx var allocatedServices []*api.Service for _, s := range nc.unallocatedServices { - if !nc.nwkAllocator.IsServiceAllocated(s) { + if !nc.isServiceAllocated(s) { if err := a.allocateService(ctx, s, false); err != nil { log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID) continue diff --git a/manager/allocator/networkallocator/networkallocator.go b/manager/allocator/networkallocator/networkallocator.go index bb7085ce00..293c6e875d 100644 --- a/manager/allocator/networkallocator/networkallocator.go +++ b/manager/allocator/networkallocator/networkallocator.go @@ -61,10 +61,6 @@ type NetworkAllocator interface { // virtual IP and ports associated with the service. DeallocateService(s *api.Service) error - // HostPublishPortsNeedUpdate returns true if the passed service needs - // allocations for its published ports in host (non ingress) mode - HostPublishPortsNeedUpdate(s *api.Service) bool - // // Task Allocation // diff --git a/manager/allocator/cnmallocator/portallocator.go b/manager/allocator/portallocator.go similarity index 99% rename from manager/allocator/cnmallocator/portallocator.go rename to manager/allocator/portallocator.go index 2ebb35b33e..4fa16d7a9b 100644 --- a/manager/allocator/cnmallocator/portallocator.go +++ b/manager/allocator/portallocator.go @@ -1,4 +1,4 @@ -package cnmallocator +package allocator import ( "github.com/moby/swarmkit/v2/api" diff --git a/manager/allocator/cnmallocator/portallocator_test.go b/manager/allocator/portallocator_test.go similarity index 99% rename from manager/allocator/cnmallocator/portallocator_test.go rename to manager/allocator/portallocator_test.go index 0970b37931..e85ffbe79d 100644 --- a/manager/allocator/cnmallocator/portallocator_test.go +++ b/manager/allocator/portallocator_test.go @@ -1,4 +1,4 @@ -package cnmallocator +package allocator import ( "testing"