diff --git a/manager/allocator/allocator_test.go b/manager/allocator/allocator_test.go index 082a5c8885..3e0454c097 100644 --- a/manager/allocator/allocator_test.go +++ b/manager/allocator/allocator_test.go @@ -96,11 +96,27 @@ func TestAllocator(t *testing.T) { Mode: api.ResolutionModeVirtualIP, Ports: []*api.PortConfig{ { - Name: "portName", + Name: "some_tcp", Protocol: api.ProtocolTCP, TargetPort: 8000, PublishedPort: 8001, }, + { + Name: "some_udp", + Protocol: api.ProtocolUDP, + TargetPort: 8000, + PublishedPort: 8001, + }, + { + Name: "auto_assigned_tcp", + Protocol: api.ProtocolTCP, + TargetPort: 9000, + }, + { + Name: "auto_assigned_udp", + Protocol: api.ProtocolUDP, + TargetPort: 9000, + }, }, }, }, @@ -247,6 +263,22 @@ func TestAllocator(t *testing.T) { assert.Equal(t, tp2.Networks[0].Network.ID, nln.ID) assert.Nil(t, tp1.Networks[0].Addresses, "Non nil addresses for task on node-local network") assert.Nil(t, tp2.Networks[0].Addresses, "Non nil addresses for task on node-local network") + // Verify service ports were allocated + s.View(func(tx store.ReadTx) { + s1 := store.GetService(tx, "testServiceID1") + if assert.NotNil(t, s1) && assert.NotNil(t, s1.Endpoint) && assert.Len(t, s1.Endpoint.Ports, 4) { + // "some_tcp" and "some_udp" + for _, i := range []int{0, 1} { + assert.EqualExportedValues(t, *s1.Spec.Endpoint.Ports[i], *s1.Endpoint.Ports[i]) + } + // "auto_assigned_tcp" and "auto_assigned_udp" + for _, i := range []int{2, 3} { + assert.Equal(t, s1.Spec.Endpoint.Ports[i].TargetPort, s1.Endpoint.Ports[i].TargetPort) + assert.GreaterOrEqual(t, s1.Endpoint.Ports[i].PublishedPort, uint32(dynamicPortStart)) + assert.LessOrEqual(t, s1.Endpoint.Ports[i].PublishedPort, uint32(dynamicPortEnd)) + } + } + }) // Add new networks/tasks/services after allocator is started. assert.NoError(t, s.Update(func(tx store.Tx) error { @@ -1533,6 +1565,476 @@ func TestNodeAttachmentOnLeadershipChange(t *testing.T) { watchNode(t, nodeWatch, false, isValidNode, node1, []string{"ingress", "net2"}) } +func TestAllocateServiceConflictingUserDefinedPorts(t *testing.T) { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + const svcID = "testID1" + // Try adding some objects to store before allocator is started + assert.NoError(t, s.Update(func(tx store.Tx) error { + // populate ingress network + in := &api.Network{ + ID: "ingress-nw-id", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "default-ingress", + }, + Ingress: true, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, + } + assert.NoError(t, store.CreateNetwork(tx, in)) + + s1 := &api.Service{ + ID: svcID, + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "service1", + }, + Endpoint: &api.EndpointSpec{ + Ports: []*api.PortConfig{ + { + Name: "some_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }, + { + Name: "some_other_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }, + }, + }, + }, + } + assert.NoError(t, store.CreateService(tx, s1)) + + return nil + })) + + serviceWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateService{}, api.EventDeleteService{}) + defer cancel() + + a, err := New(s, nil, nil) + assert.NoError(t, err) + assert.NotNil(t, a) + + go func() { + assert.NoError(t, a.Run(context.Background())) + }() + defer a.Stop() + + // Port spec is invalid; service should not be updated + watchService(t, serviceWatch, true, func(_ assert.TestingT, service *api.Service) bool { + t.Errorf("unexpected service update: %v", service) + return true + }) + + // Update the service to remove the conflicting port + assert.NoError(t, s.Update(func(tx store.Tx) error { + s1 := store.GetService(tx, svcID) + if assert.NotNil(t, s1) { + s1.Spec.Endpoint.Ports[1].TargetPort = 1235 + s1.Spec.Endpoint.Ports[1].PublishedPort = 1235 + assert.NoError(t, store.UpdateService(tx, s1)) + } + return nil + })) + watchService(t, serviceWatch, false, func(t assert.TestingT, service *api.Service) bool { + if assert.Equal(t, svcID, service.ID) && assert.NotNil(t, service.Endpoint) && assert.Len(t, service.Endpoint.Ports, 2) { + return assert.Equal(t, uint32(1235), service.Endpoint.Ports[1].PublishedPort) + } + return false + }) +} + +func TestDeallocateServiceAllocate(t *testing.T) { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + newSvc := func(id string) *api.Service { + return &api.Service{ + ID: id, + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "service1", + }, + Endpoint: &api.EndpointSpec{ + Ports: []*api.PortConfig{ + { + Name: "some_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }, + }, + }, + }, + } + } + + // Try adding some objects to store before allocator is started + assert.NoError(t, s.Update(func(tx store.Tx) error { + // populate ingress network + in := &api.Network{ + ID: "ingress-nw-id", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "default-ingress", + }, + Ingress: true, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, + } + assert.NoError(t, store.CreateNetwork(tx, in)) + assert.NoError(t, store.CreateService(tx, newSvc("testID1"))) + return nil + })) + + serviceWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateService{}, api.EventDeleteService{}) + defer cancel() + + a, err := New(s, nil, nil) + assert.NoError(t, err) + assert.NotNil(t, a) + + go func() { + assert.NoError(t, a.Run(context.Background())) + }() + defer a.Stop() + + isTestService := func(id string) func(t assert.TestingT, service *api.Service) bool { + return func(t assert.TestingT, service *api.Service) bool { + return assert.Equal(t, id, service.ID) && + assert.Len(t, service.Endpoint.Ports, 1) && + assert.Equal(t, uint32(1234), service.Endpoint.Ports[0].PublishedPort) && + assert.Len(t, service.Endpoint.VirtualIPs, 1) + } + } + // Confirm service is allocated + watchService(t, serviceWatch, false, isTestService("testID1")) + + // Deallocate the service and allocate a new one with the same port spec + assert.NoError(t, s.Update(func(tx store.Tx) error { + assert.NoError(t, store.DeleteService(tx, "testID1")) + assert.NoError(t, store.CreateService(tx, newSvc("testID2"))) + return nil + })) + // Confirm new service is allocated + watchService(t, serviceWatch, false, isTestService("testID2")) +} + +func TestServiceAddRemovePorts(t *testing.T) { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + const svcID = "testID1" + // Try adding some objects to store before allocator is started + assert.NoError(t, s.Update(func(tx store.Tx) error { + // populate ingress network + in := &api.Network{ + ID: "ingress-nw-id", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "default-ingress", + }, + Ingress: true, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, + } + assert.NoError(t, store.CreateNetwork(tx, in)) + + s1 := &api.Service{ + ID: svcID, + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "service1", + }, + Endpoint: &api.EndpointSpec{ + Ports: []*api.PortConfig{ + { + Name: "some_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }, + }, + }, + }, + } + assert.NoError(t, store.CreateService(tx, s1)) + + return nil + })) + + serviceWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateService{}, api.EventDeleteService{}) + defer cancel() + + a, err := New(s, nil, nil) + assert.NoError(t, err) + assert.NotNil(t, a) + + go func() { + assert.NoError(t, a.Run(context.Background())) + }() + defer a.Stop() + + var probedVIP string + probeTestService := func(expectPorts ...uint32) func(t assert.TestingT, service *api.Service) bool { + return func(t assert.TestingT, service *api.Service) bool { + expectedVIPCount := 0 + if len(expectPorts) > 0 { + expectedVIPCount = 1 + } + if len(service.Endpoint.VirtualIPs) > 0 { + probedVIP = service.Endpoint.VirtualIPs[0].Addr + } else { + probedVIP = "" + } + if assert.Equal(t, svcID, service.ID) && assert.Len(t, service.Endpoint.Ports, len(expectPorts)) { + var published []uint32 + for _, port := range service.Endpoint.Ports { + published = append(published, port.PublishedPort) + } + return assert.Equal(t, expectPorts, published) && assert.Len(t, service.Endpoint.VirtualIPs, expectedVIPCount) + } + + return false + } + } + // Confirm service is allocated + watchService(t, serviceWatch, false, probeTestService(1234)) + allocatedVIP := probedVIP + + // Unpublish port + assert.NoError(t, s.Update(func(tx store.Tx) error { + s1 := store.GetService(tx, svcID) + if assert.NotNil(t, s1) { + s1.Spec.Endpoint.Ports = nil + assert.NoError(t, store.UpdateService(tx, s1)) + } + return nil + })) + // Wait for unpublishing to take effect + watchService(t, serviceWatch, false, probeTestService()) + + // Publish port again and ensure VIP is not the same that was deallocated. + // Since IP allocation is serial we should receive the next available IP. + assert.NoError(t, s.Update(func(tx store.Tx) error { + s1 := store.GetService(tx, svcID) + if assert.NotNil(t, s1) { + s1.Spec.Endpoint.Ports = append(s1.Spec.Endpoint.Ports, &api.PortConfig{Name: "some_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }) + assert.NoError(t, store.UpdateService(tx, s1)) + } + return nil + })) + watchService(t, serviceWatch, false, probeTestService(1234)) + assert.NotEqual(t, allocatedVIP, probedVIP) +} + +func TestServiceUpdatePort(t *testing.T) { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + const svcID = "testID1" + // Try adding some objects to store before allocator is started + assert.NoError(t, s.Update(func(tx store.Tx) error { + // populate ingress network + in := &api.Network{ + ID: "ingress-nw-id", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "default-ingress", + }, + Ingress: true, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, + } + assert.NoError(t, store.CreateNetwork(tx, in)) + + s1 := &api.Service{ + ID: svcID, + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "service1", + }, + Endpoint: &api.EndpointSpec{ + Ports: []*api.PortConfig{ + { + Name: "some_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }, + { + Name: "some_other_tcp", + TargetPort: 1235, + }, + }, + }, + }, + } + assert.NoError(t, store.CreateService(tx, s1)) + + return nil + })) + + serviceWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateService{}, api.EventDeleteService{}) + defer cancel() + + a, err := New(s, nil, nil) + assert.NoError(t, err) + assert.NotNil(t, a) + + go func() { + assert.NoError(t, a.Run(context.Background())) + }() + defer a.Stop() + + watchService(t, serviceWatch, false, func(t assert.TestingT, service *api.Service) bool { + return assert.Equal(t, svcID, service.ID) && assert.Len(t, service.Endpoint.Ports, 2) + }) + + assert.NoError(t, s.Update(func(tx store.Tx) error { + s1 := store.GetService(tx, svcID) + if assert.NotNil(t, s1) { + s1.Spec.Endpoint.Ports[1].PublishedPort = 1235 + assert.NoError(t, store.UpdateService(tx, s1)) + } + return nil + })) + watchService(t, serviceWatch, false, func(t assert.TestingT, service *api.Service) bool { + if assert.Equal(t, svcID, service.ID) && assert.Len(t, service.Endpoint.Ports, 2) { + return assert.Equal(t, uint32(1235), service.Endpoint.Ports[1].PublishedPort) + } + return false + }) +} + +func TestServicePortAllocationIsRepeatable(t *testing.T) { + alloc := func() []*api.PortConfig { + s := store.NewMemoryStore(nil) + assert.NotNil(t, s) + defer s.Close() + + const svcID = "testID1" + // Try adding some objects to store before allocator is started + assert.NoError(t, s.Update(func(tx store.Tx) error { + // populate ingress network + in := &api.Network{ + ID: "ingress-nw-id", + Spec: api.NetworkSpec{ + Annotations: api.Annotations{ + Name: "default-ingress", + }, + Ingress: true, + }, + IPAM: &api.IPAMOptions{ + Driver: &api.Driver{}, + Configs: []*api.IPAMConfig{ + { + Subnet: "10.0.0.0/24", + Gateway: "10.0.0.1", + }, + }, + }, + DriverState: &api.Driver{}, + } + assert.NoError(t, store.CreateNetwork(tx, in)) + + s1 := &api.Service{ + ID: svcID, + Spec: api.ServiceSpec{ + Annotations: api.Annotations{ + Name: "service1", + }, + Endpoint: &api.EndpointSpec{ + Ports: []*api.PortConfig{ + { + Name: "some_tcp", + TargetPort: 1234, + PublishedPort: 1234, + }, + { + Name: "some_other_tcp", + TargetPort: 1235, + }, + }, + }, + }, + } + assert.NoError(t, store.CreateService(tx, s1)) + + return nil + })) + + serviceWatch, cancel := state.Watch(s.WatchQueue(), api.EventUpdateService{}, api.EventDeleteService{}) + defer cancel() + + a, err := New(s, nil, nil) + assert.NoError(t, err) + assert.NotNil(t, a) + + go func() { + assert.NoError(t, a.Run(context.Background())) + }() + defer a.Stop() + + var probedPorts []*api.PortConfig + probeTestService := func(t assert.TestingT, service *api.Service) bool { + if assert.Equal(t, svcID, service.ID) && assert.Len(t, service.Endpoint.Ports, 2) { + probedPorts = service.Endpoint.Ports + return true + } + return false + } + watchService(t, serviceWatch, false, probeTestService) + return probedPorts + } + + assert.Equal(t, alloc(), alloc()) +} + func isValidNode(t assert.TestingT, originalNode, updatedNode *api.Node, networks []string) bool { if !assert.Equal(t, originalNode.ID, updatedNode.ID) { diff --git a/manager/allocator/cnmallocator/networkallocator.go b/manager/allocator/cnmallocator/networkallocator.go index 7ca55f8dce..927e4f43e3 100644 --- a/manager/allocator/cnmallocator/networkallocator.go +++ b/manager/allocator/cnmallocator/networkallocator.go @@ -40,9 +40,6 @@ type cnmNetworkAllocator struct { // The driver registry for all internal and external network drivers. networkRegistry drvregistry.Networks - // The port allocator instance for allocating node ports - portAllocator *portAllocator - // Local network state used by cnmNetworkAllocator to do network management. networks map[string]*network @@ -126,12 +123,6 @@ func New(pg plugingetter.PluginGetter, netConfig *NetworkConfig) (networkallocat return nil, fmt.Errorf("failed to initialize IPAM driver plugins: %w", err) } - pa, err := newPortAllocator() - if err != nil { - return nil, err - } - - na.portAllocator = pa return na, nil } @@ -209,11 +200,8 @@ func (na *cnmNetworkAllocator) Deallocate(n *api.Network) error { } // AllocateService allocates all the network resources such as virtual -// IP and ports needed by the service. +// IP needed by the service. func (na *cnmNetworkAllocator) AllocateService(s *api.Service) (err error) { - if err = na.portAllocator.serviceAllocatePorts(s); err != nil { - return err - } defer func() { if err != nil { na.DeallocateService(s) @@ -300,7 +288,7 @@ networkLoop: } // DeallocateService de-allocates all the network resources such as -// virtual IP and ports associated with the service. +// virtual IP associated with the service. func (na *cnmNetworkAllocator) DeallocateService(s *api.Service) error { if s.Endpoint == nil { return nil @@ -316,7 +304,6 @@ func (na *cnmNetworkAllocator) DeallocateService(s *api.Service) error { } s.Endpoint.VirtualIPs = nil - na.portAllocator.serviceDeallocatePorts(s) delete(na.services, s.ID) return nil @@ -373,19 +360,8 @@ func (na *cnmNetworkAllocator) IsTaskAllocated(t *api.Task) bool { return true } -// HostPublishPortsNeedUpdate returns true if the passed service needs -// allocations for its published ports in host (non ingress) mode -func (na *cnmNetworkAllocator) HostPublishPortsNeedUpdate(s *api.Service) bool { - return na.portAllocator.hostPublishPortsNeedUpdate(s) -} - // IsServiceAllocated returns false if the passed service needs to have network resources allocated/updated. func (na *cnmNetworkAllocator) IsServiceAllocated(s *api.Service, flags ...func(*networkallocator.ServiceAllocationOpts)) bool { - var options networkallocator.ServiceAllocationOpts - for _, flag := range flags { - flag(&options) - } - specNetworks := serviceNetworks(s) // If endpoint mode is VIP and allocator does not have the @@ -447,10 +423,6 @@ func (na *cnmNetworkAllocator) IsServiceAllocated(s *api.Service, flags ...func( } } - if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) || - (s.Endpoint != nil && len(s.Endpoint.Ports) != 0) { - return na.portAllocator.isPortsAllocatedOnInit(s, options.OnInit) - } return true } diff --git a/manager/allocator/cnmallocator/networkallocator_test.go b/manager/allocator/cnmallocator/networkallocator_test.go index 8598fc6b1a..0841f090df 100644 --- a/manager/allocator/cnmallocator/networkallocator_test.go +++ b/manager/allocator/cnmallocator/networkallocator_test.go @@ -563,11 +563,7 @@ func TestAllocateService(t *testing.T) { err = na.AllocateService(s) assert.NoError(t, err) - assert.Equal(t, 2, len(s.Endpoint.Ports)) - assert.True(t, s.Endpoint.Ports[0].PublishedPort >= dynamicPortStart && - s.Endpoint.Ports[0].PublishedPort <= dynamicPortEnd) - assert.True(t, s.Endpoint.Ports[1].PublishedPort >= dynamicPortStart && - s.Endpoint.Ports[1].PublishedPort <= dynamicPortEnd) + assert.Len(t, s.Endpoint.Ports, 0) // Network allocator is not responsible for allocating ports. assert.Equal(t, 1, len(s.Endpoint.VirtualIPs)) @@ -579,94 +575,6 @@ func TestAllocateService(t *testing.T) { assert.Equal(t, true, subnet.Contains(ip)) } -func TestAllocateServiceUserDefinedPorts(t *testing.T) { - na := newNetworkAllocator(t) - s := &api.Service{ - ID: "testID1", - Spec: api.ServiceSpec{ - Endpoint: &api.EndpointSpec{ - Ports: []*api.PortConfig{ - { - Name: "some_tcp", - TargetPort: 1234, - PublishedPort: 1234, - }, - { - Name: "some_udp", - TargetPort: 1234, - PublishedPort: 1234, - Protocol: api.ProtocolUDP, - }, - }, - }, - }, - } - - err := na.AllocateService(s) - assert.NoError(t, err) - assert.Equal(t, 2, len(s.Endpoint.Ports)) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[1].PublishedPort) -} - -func TestAllocateServiceConflictingUserDefinedPorts(t *testing.T) { - na := newNetworkAllocator(t) - s := &api.Service{ - ID: "testID1", - Spec: api.ServiceSpec{ - Endpoint: &api.EndpointSpec{ - Ports: []*api.PortConfig{ - { - Name: "some_tcp", - TargetPort: 1234, - PublishedPort: 1234, - }, - { - Name: "some_other_tcp", - TargetPort: 1234, - PublishedPort: 1234, - }, - }, - }, - }, - } - - err := na.AllocateService(s) - assert.Error(t, err) -} - -func TestDeallocateServiceAllocate(t *testing.T) { - na := newNetworkAllocator(t) - s := &api.Service{ - ID: "testID1", - Spec: api.ServiceSpec{ - Endpoint: &api.EndpointSpec{ - Ports: []*api.PortConfig{ - { - Name: "some_tcp", - TargetPort: 1234, - PublishedPort: 1234, - }, - }, - }, - }, - } - - err := na.AllocateService(s) - assert.NoError(t, err) - assert.Equal(t, 1, len(s.Endpoint.Ports)) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - - err = na.DeallocateService(s) - assert.NoError(t, err) - assert.Equal(t, 0, len(s.Endpoint.Ports)) - // Allocate again. - err = na.AllocateService(s) - assert.NoError(t, err) - assert.Equal(t, 1, len(s.Endpoint.Ports)) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) -} - func TestDeallocateServiceAllocateIngressMode(t *testing.T) { na := newNetworkAllocator(t) @@ -705,8 +613,6 @@ func TestDeallocateServiceAllocateIngressMode(t *testing.T) { err = na.AllocateService(s) assert.NoError(t, err) - assert.Len(t, s.Endpoint.Ports, 1) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) assert.Len(t, s.Endpoint.VirtualIPs, 1) err = na.DeallocateService(s) @@ -719,129 +625,7 @@ func TestDeallocateServiceAllocateIngressMode(t *testing.T) { err = na.AllocateService(s) assert.NoError(t, err) - assert.Len(t, s.Endpoint.Ports, 1) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - assert.Len(t, s.Endpoint.VirtualIPs, 1) -} - -func TestServiceAddRemovePortsIngressMode(t *testing.T) { - na := newNetworkAllocator(t) - - n := &api.Network{ - ID: "testNetID1", - Spec: api.NetworkSpec{ - Annotations: api.Annotations{ - Name: "test", - }, - Ingress: true, - }, - } - - err := na.Allocate(n) - assert.NoError(t, err) - - s := &api.Service{ - ID: "testID1", - Spec: api.ServiceSpec{ - Endpoint: &api.EndpointSpec{ - Ports: []*api.PortConfig{ - { - Name: "some_tcp", - TargetPort: 1234, - PublishedPort: 1234, - PublishMode: api.PublishModeIngress, - }, - }, - }, - }, - Endpoint: &api.Endpoint{}, - } - - s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs, - &api.Endpoint_VirtualIP{NetworkID: n.ID}) - - err = na.AllocateService(s) - assert.NoError(t, err) - assert.Len(t, s.Endpoint.Ports, 1) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) assert.Len(t, s.Endpoint.VirtualIPs, 1) - allocatedVIP := s.Endpoint.VirtualIPs[0].Addr - - // Unpublish port - s.Spec.Endpoint.Ports = s.Spec.Endpoint.Ports[:0] - err = na.AllocateService(s) - assert.NoError(t, err) - assert.Len(t, s.Endpoint.Ports, 0) - assert.Len(t, s.Endpoint.VirtualIPs, 0) - - // Publish port again and ensure VIP is not the same that was deallocated. - // Since IP allocation is serial we should receive the next available IP. - s.Spec.Endpoint.Ports = append(s.Spec.Endpoint.Ports, &api.PortConfig{Name: "some_tcp", - TargetPort: 1234, - PublishedPort: 1234, - PublishMode: api.PublishModeIngress, - }) - s.Endpoint.VirtualIPs = append(s.Endpoint.VirtualIPs, - &api.Endpoint_VirtualIP{NetworkID: n.ID}) - err = na.AllocateService(s) - assert.NoError(t, err) - assert.Len(t, s.Endpoint.Ports, 1) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - assert.Len(t, s.Endpoint.VirtualIPs, 1) - assert.NotEqual(t, allocatedVIP, s.Endpoint.VirtualIPs[0].Addr) -} - -func TestServiceUpdate(t *testing.T) { - na1 := newNetworkAllocator(t) - na2 := newNetworkAllocator(t) - s := &api.Service{ - ID: "testID1", - Spec: api.ServiceSpec{ - Endpoint: &api.EndpointSpec{ - Ports: []*api.PortConfig{ - { - Name: "some_tcp", - TargetPort: 1234, - PublishedPort: 1234, - }, - { - Name: "some_other_tcp", - TargetPort: 1235, - PublishedPort: 0, - }, - }, - }, - }, - } - - err := na1.AllocateService(s) - assert.NoError(t, err) - assert.True(t, na1.IsServiceAllocated(s)) - assert.Equal(t, 2, len(s.Endpoint.Ports)) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - assert.NotEqual(t, 0, s.Endpoint.Ports[1].PublishedPort) - - // Cache the secode node port - allocatedPort := s.Endpoint.Ports[1].PublishedPort - - // Now allocate the same service in another allocator instance - err = na2.AllocateService(s) - assert.NoError(t, err) - assert.True(t, na2.IsServiceAllocated(s)) - assert.Equal(t, 2, len(s.Endpoint.Ports)) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - // Make sure we got the same port - assert.Equal(t, allocatedPort, s.Endpoint.Ports[1].PublishedPort) - - s.Spec.Endpoint.Ports[1].PublishedPort = 1235 - assert.False(t, na1.IsServiceAllocated(s)) - - err = na1.AllocateService(s) - assert.NoError(t, err) - assert.True(t, na1.IsServiceAllocated(s)) - assert.Equal(t, 2, len(s.Endpoint.Ports)) - assert.Equal(t, uint32(1234), s.Endpoint.Ports[0].PublishedPort) - assert.Equal(t, uint32(1235), s.Endpoint.Ports[1].PublishedPort) } func TestServiceNetworkUpdate(t *testing.T) { diff --git a/manager/allocator/network.go b/manager/allocator/network.go index 673da84996..721bf3f080 100644 --- a/manager/allocator/network.go +++ b/manager/allocator/network.go @@ -37,6 +37,9 @@ type networkContext struct { // the actual network allocation. nwkAllocator networkallocator.NetworkAllocator + // The port allocator instance for allocating node ports + portAllocator *portAllocator + // A set of tasks which are ready to be allocated as a batch. This is // distinct from "unallocatedTasks" which are tasks that failed to // allocate on the first try, being held for a future retry. @@ -95,6 +98,7 @@ func (a *Allocator) doNetworkInit(ctx context.Context) (err error) { nc := &networkContext{ nwkAllocator: na, + portAllocator: newPortAllocator(), pendingTasks: make(map[string]*api.Task), unallocatedTasks: make(map[string]*api.Task), unallocatedServices: make(map[string]*api.Service), @@ -233,7 +237,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { break } - if nc.nwkAllocator.IsServiceAllocated(s) { + if nc.isServiceAllocated(s) { break } @@ -261,8 +265,8 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { break } - if nc.nwkAllocator.IsServiceAllocated(s) { - if !nc.nwkAllocator.HostPublishPortsNeedUpdate(s) { + if nc.isServiceAllocated(s) { + if !nc.portAllocator.hostPublishPortsNeedUpdate(s) { break } updatePortsInHostPublishMode(s) @@ -284,7 +288,7 @@ func (a *Allocator) doNetworkAlloc(ctx context.Context, ev events.Event) { case api.EventDeleteService: s := v.Service.Copy() - if err := nc.nwkAllocator.DeallocateService(s); err != nil { + if err := nc.deallocateService(s); err != nil { log.G(ctx).WithError(err).Errorf("Failed deallocation during delete of service %s", s.ID) } else { nc.somethingWasDeallocated = true @@ -681,7 +685,7 @@ func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly var allocatedServices []*api.Service for _, s := range services { - if nc.nwkAllocator.IsServiceAllocated(s, networkallocator.OnInit) { + if nc.isServiceAllocated(s, networkallocator.OnInit) { continue } if existingAddressesOnly && @@ -713,6 +717,23 @@ func (a *Allocator) allocateServices(ctx context.Context, existingAddressesOnly return nil } +// isServiceAllocated returns false if the passed service needs to have network resources allocated/updated. +func (nc *networkContext) isServiceAllocated(s *api.Service, flags ...func(*networkallocator.ServiceAllocationOpts)) bool { + if !nc.nwkAllocator.IsServiceAllocated(s, flags...) { + return false + } + + var options networkallocator.ServiceAllocationOpts + for _, flag := range flags { + flag(&options) + } + if (s.Spec.Endpoint != nil && len(s.Spec.Endpoint.Ports) != 0) || + (s.Endpoint != nil && len(s.Endpoint.Ports) != 0) { + return nc.portAllocator.isPortsAllocatedOnInit(s, options.OnInit) + } + return true +} + // allocateTasks allocates tasks in the store so far before we started watching. func (a *Allocator) allocateTasks(ctx context.Context, existingAddressesOnly bool) error { var ( @@ -815,7 +836,7 @@ func taskReadyForNetworkVote(t *api.Task, s *api.Service, nc *networkContext) bo // network configured or service endpoints have been // allocated. return (len(t.Networks) == 0 || nc.nwkAllocator.IsTaskAllocated(t)) && - (s == nil || nc.nwkAllocator.IsServiceAllocated(s)) + (s == nil || nc.isServiceAllocated(s)) } func taskUpdateNetworks(t *api.Task, networks []*api.NetworkAttachment) { @@ -1200,13 +1221,13 @@ func (a *Allocator) allocateService(ctx context.Context, s *api.Service, existin // is not there // service has no user-defined endpoints while has already allocated network resources, // need deallocated. - if err := nc.nwkAllocator.DeallocateService(s); err != nil { + if err := nc.deallocateService(s); err != nil { return err } nc.somethingWasDeallocated = true } - if err := nc.nwkAllocator.AllocateService(s); err != nil { + if err := nc.allocateService(s); err != nil { nc.unallocatedServices[s.ID] = s return err } @@ -1229,6 +1250,26 @@ func (a *Allocator) allocateService(ctx context.Context, s *api.Service, existin return nil } +func (nc *networkContext) allocateService(s *api.Service) error { + if err := nc.portAllocator.serviceAllocatePorts(s); err != nil { + return err + } + if err := nc.nwkAllocator.AllocateService(s); err != nil { + nc.portAllocator.serviceDeallocatePorts(s) + return err + } + + return nil +} + +func (nc *networkContext) deallocateService(s *api.Service) error { + if err := nc.nwkAllocator.DeallocateService(s); err != nil { + return err + } + nc.portAllocator.serviceDeallocatePorts(s) + return nil +} + func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Batch, s *api.Service) error { if err := batch.Update(func(tx store.Tx) error { err := store.UpdateService(tx, s) @@ -1241,7 +1282,7 @@ func (a *Allocator) commitAllocatedService(ctx context.Context, batch *store.Bat return errors.Wrapf(err, "failed updating state in store transaction for service %s", s.ID) }); err != nil { - if err := a.netCtx.nwkAllocator.DeallocateService(s); err != nil { + if err := a.netCtx.deallocateService(s); err != nil { log.G(ctx).WithError(err).Errorf("failed rolling back allocation of service %s", s.ID) } @@ -1298,7 +1339,7 @@ func (a *Allocator) allocateTask(ctx context.Context, t *api.Task) (err error) { return } - if !nc.nwkAllocator.IsServiceAllocated(s) { + if !nc.isServiceAllocated(s) { err = fmt.Errorf("service %s to which task %s belongs has pending allocations", s.ID, t.ID) return } @@ -1423,7 +1464,7 @@ func (a *Allocator) procUnallocatedServices(ctx context.Context) { nc := a.netCtx var allocatedServices []*api.Service for _, s := range nc.unallocatedServices { - if !nc.nwkAllocator.IsServiceAllocated(s) { + if !nc.isServiceAllocated(s) { if err := a.allocateService(ctx, s, false); err != nil { log.G(ctx).WithError(err).Debugf("Failed allocation of unallocated service %s", s.ID) continue diff --git a/manager/allocator/networkallocator/networkallocator.go b/manager/allocator/networkallocator/networkallocator.go index bb7085ce00..293c6e875d 100644 --- a/manager/allocator/networkallocator/networkallocator.go +++ b/manager/allocator/networkallocator/networkallocator.go @@ -61,10 +61,6 @@ type NetworkAllocator interface { // virtual IP and ports associated with the service. DeallocateService(s *api.Service) error - // HostPublishPortsNeedUpdate returns true if the passed service needs - // allocations for its published ports in host (non ingress) mode - HostPublishPortsNeedUpdate(s *api.Service) bool - // // Task Allocation // diff --git a/manager/allocator/cnmallocator/portallocator.go b/manager/allocator/portallocator.go similarity index 97% rename from manager/allocator/cnmallocator/portallocator.go rename to manager/allocator/portallocator.go index 818613fe81..4fa16d7a9b 100644 --- a/manager/allocator/cnmallocator/portallocator.go +++ b/manager/allocator/portallocator.go @@ -1,4 +1,4 @@ -package cnmallocator +package allocator import ( "github.com/moby/swarmkit/v2/api" @@ -101,36 +101,31 @@ func (ps allocatedPorts) delState(p *api.PortConfig) *api.PortConfig { return nil } -func newPortAllocator() (*portAllocator, error) { +func newPortAllocator() *portAllocator { portSpaces := make(map[api.PortConfig_Protocol]*portSpace) for _, protocol := range []api.PortConfig_Protocol{api.ProtocolTCP, api.ProtocolUDP, api.ProtocolSCTP} { - ps, err := newPortSpace(protocol) - if err != nil { - return nil, err - } - - portSpaces[protocol] = ps + portSpaces[protocol] = newPortSpace(protocol) } - return &portAllocator{portSpaces: portSpaces}, nil + return &portAllocator{portSpaces: portSpaces} } -func newPortSpace(protocol api.PortConfig_Protocol) (*portSpace, error) { +func newPortSpace(protocol api.PortConfig_Protocol) *portSpace { master, err := idm.New(masterPortStart, masterPortEnd) if err != nil { - return nil, err + panic(err) } dynamic, err := idm.New(dynamicPortStart, dynamicPortEnd) if err != nil { - return nil, err + panic(err) } return &portSpace{ protocol: protocol, masterPortSpace: master, dynamicPortSpace: dynamic, - }, nil + } } // getPortConfigKey returns a map key for doing set operations with diff --git a/manager/allocator/cnmallocator/portallocator_test.go b/manager/allocator/portallocator_test.go similarity index 98% rename from manager/allocator/cnmallocator/portallocator_test.go rename to manager/allocator/portallocator_test.go index b514f40077..e85ffbe79d 100644 --- a/manager/allocator/cnmallocator/portallocator_test.go +++ b/manager/allocator/portallocator_test.go @@ -1,4 +1,4 @@ -package cnmallocator +package allocator import ( "testing" @@ -180,8 +180,7 @@ func TestReconcilePortConfigs(t *testing.T) { } func TestAllocateServicePorts(t *testing.T) { - pa, err := newPortAllocator() - assert.NoError(t, err) + pa := newPortAllocator() // Service has no endpoint in ServiceSpec s := &api.Service{ @@ -200,7 +199,7 @@ func TestAllocateServicePorts(t *testing.T) { }, } - err = pa.serviceAllocatePorts(s) + err := pa.serviceAllocatePorts(s) assert.NoError(t, err) // Service has a published port 10001 in ServiceSpec @@ -265,8 +264,7 @@ func TestAllocateServicePorts(t *testing.T) { } func TestHostPublishPortsNeedUpdate(t *testing.T) { - pa, err := newPortAllocator() - assert.NoError(t, err) + pa := newPortAllocator() type Data struct { name string @@ -494,8 +492,7 @@ func TestHostPublishPortsNeedUpdate(t *testing.T) { } func TestIsPortsAllocated(t *testing.T) { - pa, err := newPortAllocator() - assert.NoError(t, err) + pa := newPortAllocator() type Data struct { name string @@ -886,8 +883,7 @@ func TestIsPortsAllocated(t *testing.T) { } func TestAllocate(t *testing.T) { - pSpace, err := newPortSpace(api.ProtocolTCP) - assert.NoError(t, err) + pSpace := newPortSpace(api.ProtocolTCP) pConfig := &api.PortConfig{ Name: "test1", @@ -897,7 +893,7 @@ func TestAllocate(t *testing.T) { } // first consume 30000 in dynamicPortSpace - err = pSpace.allocate(pConfig) + err := pSpace.allocate(pConfig) assert.NoError(t, err) pConfig = &api.PortConfig{