Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow different services to use the same host-mode port #2177

Merged
merged 1 commit into from
May 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 50 additions & 12 deletions manager/controlapi/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package controlapi
import (
"errors"
"reflect"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -489,18 +488,32 @@ func (s *Server) checkPortConflicts(spec *api.ServiceSpec, serviceID string) err
return nil
}

pcToString := func(pc *api.PortConfig) string {
port := strconv.FormatUint(uint64(pc.PublishedPort), 10)
return port + "/" + pc.Protocol.String()
type portSpec struct {
protocol api.PortConfig_Protocol
publishedPort uint32
}

reqPorts := make(map[string]bool)
pcToStruct := func(pc *api.PortConfig) portSpec {
return portSpec{
protocol: pc.Protocol,
publishedPort: pc.PublishedPort,
}
}

ingressPorts := make(map[portSpec]struct{})
hostModePorts := make(map[portSpec]struct{})
for _, pc := range spec.Endpoint.Ports {
if pc.PublishedPort > 0 {
reqPorts[pcToString(pc)] = true
if pc.PublishedPort == 0 {
continue
}
switch pc.PublishMode {
case api.PublishModeIngress:
ingressPorts[pcToStruct(pc)] = struct{}{}
case api.PublishModeHost:
hostModePorts[pcToStruct(pc)] = struct{}{}
}
}
if len(reqPorts) == 0 {
if len(ingressPorts) == 0 && len(hostModePorts) == 0 {
return nil
}

Expand All @@ -516,22 +529,47 @@ func (s *Server) checkPortConflicts(spec *api.ServiceSpec, serviceID string) err
return err
}

isPortInUse := func(pc *api.PortConfig, service *api.Service) error {
if pc.PublishedPort == 0 {
return nil
}

switch pc.PublishMode {
case api.PublishModeHost:
if _, ok := ingressPorts[pcToStruct(pc)]; ok {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s) as a host-published port", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a host-published port

should be as an ingress port.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe it is correct as is. Since we are in the api.PublishModeHost case, this is a newly-added ingress port conflicting with an existing host-published port.

}

// Multiple services with same port in host publish mode can
// coexist - this is handled by the scheduler.
return nil
case api.PublishModeIngress:
_, ingressConflict := ingressPorts[pcToStruct(pc)]
_, hostModeConflict := hostModePorts[pcToStruct(pc)]
if ingressConflict || hostModeConflict {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s) as an ingress port", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as an ingress port

Should remove this.

Copy link
Collaborator Author

@aaronlehmann aaronlehmann May 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are confusing the new service (which the maps reflect) with the existing services we iterate over.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right.

}
}

return nil
}

for _, service := range services {
// If service ID is the same (and not "") then this is an update
if serviceID != "" && serviceID == service.ID {
continue
}
if service.Spec.Endpoint != nil {
for _, pc := range service.Spec.Endpoint.Ports {
if reqPorts[pcToString(pc)] {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s)", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
if err := isPortInUse(pc, service); err != nil {
return err
}
}
}
if service.Endpoint != nil {
for _, pc := range service.Endpoint.Ports {
if reqPorts[pcToString(pc)] {
return grpc.Errorf(codes.InvalidArgument, "port '%d' is already in use by service '%s' (%s)", pc.PublishedPort, service.Spec.Annotations.Name, service.ID)
if err := isPortInUse(pc, service); err != nil {
return err
}
}
}
Expand Down
50 changes: 50 additions & 0 deletions manager/controlapi/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,56 @@ func TestCreateService(t *testing.T) {
}}
_, err = ts.Client.CreateService(context.Background(), &api.CreateServiceRequest{Spec: spec2})
assert.NoError(t, err)

// ensure no port conflict when host ports overlap
spec = createSpec("name8", "image", 1)
spec.Endpoint = &api.EndpointSpec{Ports: []*api.PortConfig{
{PublishMode: api.PublishModeHost, PublishedPort: uint32(9101), TargetPort: uint32(9101), Protocol: api.PortConfig_Protocol(api.ProtocolTCP)},
}}
r, err = ts.Client.CreateService(context.Background(), &api.CreateServiceRequest{Spec: spec})
assert.NoError(t, err)
assert.NotEmpty(t, r.Service.ID)

spec2 = createSpec("name9", "image", 1)
spec2.Endpoint = &api.EndpointSpec{Ports: []*api.PortConfig{
{PublishMode: api.PublishModeHost, PublishedPort: uint32(9101), TargetPort: uint32(9101), Protocol: api.PortConfig_Protocol(api.ProtocolTCP)},
}}
_, err = ts.Client.CreateService(context.Background(), &api.CreateServiceRequest{Spec: spec2})
assert.NoError(t, err)

// ensure port conflict when host ports overlaps with ingress port (host port first)
spec = createSpec("name10", "image", 1)
spec.Endpoint = &api.EndpointSpec{Ports: []*api.PortConfig{
{PublishMode: api.PublishModeHost, PublishedPort: uint32(9102), TargetPort: uint32(9102), Protocol: api.PortConfig_Protocol(api.ProtocolTCP)},
}}
r, err = ts.Client.CreateService(context.Background(), &api.CreateServiceRequest{Spec: spec})
assert.NoError(t, err)
assert.NotEmpty(t, r.Service.ID)

spec2 = createSpec("name11", "image", 1)
spec2.Endpoint = &api.EndpointSpec{Ports: []*api.PortConfig{
{PublishMode: api.PublishModeIngress, PublishedPort: uint32(9102), TargetPort: uint32(9102), Protocol: api.PortConfig_Protocol(api.ProtocolTCP)},
}}
_, err = ts.Client.CreateService(context.Background(), &api.CreateServiceRequest{Spec: spec2})
assert.Error(t, err)
assert.Equal(t, codes.InvalidArgument, grpc.Code(err))

// ensure port conflict when host ports overlaps with ingress port (ingress port first)
spec = createSpec("name12", "image", 1)
spec.Endpoint = &api.EndpointSpec{Ports: []*api.PortConfig{
{PublishMode: api.PublishModeIngress, PublishedPort: uint32(9103), TargetPort: uint32(9103), Protocol: api.PortConfig_Protocol(api.ProtocolTCP)},
}}
r, err = ts.Client.CreateService(context.Background(), &api.CreateServiceRequest{Spec: spec})
assert.NoError(t, err)
assert.NotEmpty(t, r.Service.ID)

spec2 = createSpec("name13", "image", 1)
spec2.Endpoint = &api.EndpointSpec{Ports: []*api.PortConfig{
{PublishMode: api.PublishModeHost, PublishedPort: uint32(9103), TargetPort: uint32(9103), Protocol: api.PortConfig_Protocol(api.ProtocolTCP)},
}}
_, err = ts.Client.CreateService(context.Background(), &api.CreateServiceRequest{Spec: spec2})
assert.Error(t, err)
assert.Equal(t, codes.InvalidArgument, grpc.Code(err))
}

func TestSecretValidation(t *testing.T) {
Expand Down
41 changes: 41 additions & 0 deletions manager/scheduler/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,3 +260,44 @@ func (f *PlatformFilter) Explain(nodes int) string {
}
return fmt.Sprintf("unsupported platform on %d nodes", nodes)
}

// HostPortFilter checks that the node has a specific port available.
type HostPortFilter struct {
t *api.Task
}

// SetTask returns true when the filter is enabled for a given task.
func (f *HostPortFilter) SetTask(t *api.Task) bool {
if t.Endpoint != nil {
for _, port := range t.Endpoint.Ports {
if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
f.t = t
return true
}
}
}

return false
}

// Check returns true if the task can be scheduled into the given node.
func (f *HostPortFilter) Check(n *NodeInfo) bool {
for _, port := range f.t.Endpoint.Ports {
if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
portSpec := hostPortSpec{protocol: port.Protocol, publishedPort: port.PublishedPort}
if _, ok := n.usedHostPorts[portSpec]; ok {
return false
}
}
}

return true
}

// Explain returns an explanation of a failure.
func (f *HostPortFilter) Explain(nodes int) string {
if nodes == 1 {
return "host-mode port already in use on 1 node"
}
return fmt.Sprintf("host-mode port already in use on %d nodes", nodes)
}
26 changes: 26 additions & 0 deletions manager/scheduler/nodeinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,20 @@ import (
"golang.org/x/net/context"
)

// hostPortSpec specifies a used host port.
type hostPortSpec struct {
protocol api.PortConfig_Protocol
publishedPort uint32
}

// NodeInfo contains a node and some additional metadata.
type NodeInfo struct {
*api.Node
Tasks map[string]*api.Task
ActiveTasksCount int
ActiveTasksCountByService map[string]int
AvailableResources api.Resources
usedHostPorts map[hostPortSpec]struct{}

// recentFailures is a map from service ID to the timestamps of the
// most recent failures the node has experienced from replicas of that
Expand All @@ -30,6 +37,7 @@ func newNodeInfo(n *api.Node, tasks map[string]*api.Task, availableResources api
Tasks: make(map[string]*api.Task),
ActiveTasksCountByService: make(map[string]int),
AvailableResources: availableResources,
usedHostPorts: make(map[hostPortSpec]struct{}),
recentFailures: make(map[string][]time.Time),
}

Expand Down Expand Up @@ -57,6 +65,15 @@ func (nodeInfo *NodeInfo) removeTask(t *api.Task) bool {
nodeInfo.AvailableResources.MemoryBytes += reservations.MemoryBytes
nodeInfo.AvailableResources.NanoCPUs += reservations.NanoCPUs

if t.Endpoint != nil {
for _, port := range t.Endpoint.Ports {
if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
portSpec := hostPortSpec{protocol: port.Protocol, publishedPort: port.PublishedPort}
delete(nodeInfo.usedHostPorts, portSpec)
}
}
}

return true
}

Expand Down Expand Up @@ -84,6 +101,15 @@ func (nodeInfo *NodeInfo) addTask(t *api.Task) bool {
nodeInfo.AvailableResources.MemoryBytes -= reservations.MemoryBytes
nodeInfo.AvailableResources.NanoCPUs -= reservations.NanoCPUs

if t.Endpoint != nil {
for _, port := range t.Endpoint.Ports {
if port.PublishMode == api.PublishModeHost && port.PublishedPort != 0 {
portSpec := hostPortSpec{protocol: port.Protocol, publishedPort: port.PublishedPort}
nodeInfo.usedHostPorts[portSpec] = struct{}{}
}
}
}

if t.DesiredState <= api.TaskStateRunning {
nodeInfo.ActiveTasksCount++
nodeInfo.ActiveTasksCountByService[t.ServiceID]++
Expand Down
1 change: 1 addition & 0 deletions manager/scheduler/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ var (
&PluginFilter{},
&ConstraintFilter{},
&PlatformFilter{},
&HostPortFilter{},
}
)

Expand Down
Loading